Henry Robinson has posted comments on this change. Change subject: IMPALA-4014: Introduce query-wide execution state. ......................................................................
Patch Set 10: (51 comments) http://gerrit.cloudera.org:8080/#/c/3817/10/be/src/runtime/fragment-instance-exec-state.h File be/src/runtime/fragment-instance-exec-state.h: Line 32: /// Execution state of a single plan fragment. instance PS10, Line 33: FragmentInstanceExecState Let's consider FragmentInstanceState and QueryState (if they're not executing, what are they doing having state?) PS10, Line 39: lambda( : boost::mem_fn(&FragmentInstanceExecState::ReportStatusCb), : this, _1, _2, _3) Try and find a clearer way to write this. If you don't want to write the lambda in the constructor list, consider a static function that returns a lambda given 'this' as an input: std::function<void(const Status&, RuntimeProfile* bool)> MakeCb(FragmentInstanceExecState* fragment_instance) { return [fragment_instance](const Status& s, RuntimeProfile* profile, bool done) { fragment_instance->ReportStatusCb(s, profile, done); } } Please leave a TODO to aggregate all the reports into a single CB managed by the QES (and file a JIRA as well). PS10, Line 80: ImpalaBackendClientCache* client_cache_; Is this necessary given that ExecEnv can give us the same pointer? PS10, Line 77: QueryExecState* query_exec_state_; // not owned : TPlanFragmentInstanceCtx fragment_instance_ctx_; : FragmentInstanceExecutor executor_; : ImpalaBackendClientCache* client_cache_; : TExecPlanFragmentParams exec_params_; Good opportunity to add some comments for these. PS10, Line 83: plan fragment fragment instance. http://gerrit.cloudera.org:8080/#/c/3817/10/be/src/runtime/fragment-instance-executor.cc File be/src/runtime/fragment-instance-executor.cc: PS10, Line 80: Close Since we are aiming for deterministic, manual management of finst lifetimes, how about moving this out of the destructor and ensuring that the FInst calls Close() explicitly? Line 349: // We also want to call sink_->Close() here rather than in FragmentInstanceExecutor::Close long line http://gerrit.cloudera.org:8080/#/c/3817/10/be/src/runtime/fragment-instance-executor.h File be/src/runtime/fragment-instance-executor.h: Line 48: /// FragmentInstanceExecutor handles all aspects of the execution of a single plan fragment, long line PS10, Line 155: ExecEnv* exec_env_; // not owned Can you remove this? ExecEnv::GetInstance() works fine. http://gerrit.cloudera.org:8080/#/c/3817/10/be/src/runtime/query-exec-state.cc File be/src/runtime/query-exec-state.cc: PS10, Line 60: remove space PS10, Line 73: DCHECK_EQ(fragment_inst_exec_state_map_.find(GetInstanceIdx(fragment_instance_id)), : fragment_inst_exec_state_map_) these aren't the same type (does this compile?). PS10, Line 76: FragmentInstanceExecState* fragment_inst_exec_state = : new FragmentInstanceExecState(exec_params, this, ExecEnv::GetInstance()); Presumably this should be managed by obj_pool. If not, the value type of the map should be unique_ptr<...>. PS10, Line 80: his RPC returns This isn't an RPC any more. Update comment. PS10, Line 113: lexical_cast PrintId() PS10, Line 128: lexical_cast PrintId() http://gerrit.cloudera.org:8080/#/c/3817/10/be/src/runtime/query-exec-state.h File be/src/runtime/query-exec-state.h: Line 38: /// This class is responsible for creating, running and destroying FragmentInstanceExecStates. Talk about lifetime here: when is this created, when is it destroyed, what are the guarantees about the subordinate data structures like finstexecstates. Can this outlive a query on the coordinator? etc. PS10, Line 41: // Special constructor for the coordinator. TODO: remove. PS10, Line 59: CleanFragmentInstanceStates Don't talk about implementation details (cleaning the map). When would this be called? Line 61: /// Registers a new FragmentInstanceExecState and launches the thread that calls Prepare() and long line. PS10, Line 90: std::unique_ptr<ObjectPool*> obj_pool_ Comment broadly what this is used to manage, since that affects lifecycles. PS10, Line 90: * do you mean unique_ptr<ObjectPool> ? PS10, Line 90: Owned redundant wrt unique_ptr PS10, Line 94: active expand. What does it mean for a fragment to be active? PS10, Line 105: shared_ptr Update comment. PS10, Line 109: FragmentInstanceExecState Are the exec states managed by the obj pool? If so, say so otherwise the lifecycle is not clear. PS10, Line 113: shared pointer update comment http://gerrit.cloudera.org:8080/#/c/3817/10/be/src/service/client-request-exec-state.cc File be/src/service/client-request-exec-state.cc: Line 17: #include "service/client-request-exec-state.h" add a blank line before this one http://gerrit.cloudera.org:8080/#/c/3817/10/be/src/service/client-request-exec-state.h File be/src/service/client-request-exec-state.h: PS10, Line 48: This is responsible for creating the : /// coordinator object Seems unnecessary. http://gerrit.cloudera.org:8080/#/c/3817/10/be/src/service/impala-internal-service.h File be/src/service/impala-internal-service.h: PS10, Line 48: fragment_mgr_->CancelPlanFragment(params.query_id, : params.fragment_instance_id); Copy the one-line idiom from other RPCs, and set the TStatus directly without using the intermediate variable. PS10, Line 74: Ignoring returned status here as failing to publish a filter is not fatal. It would be better to add a TStatus to the return struct, and have the caller decide what's fatal and what isn't. http://gerrit.cloudera.org:8080/#/c/3817/10/be/src/service/query-exec-mgr.cc File be/src/service/query-exec-mgr.cc: PS10, Line 40: bind<void>(mem_fn( please avoid in favour of lambda expressions, to reduce dependency on boost::bind. PS10, Line 74: fragment_inst_exec_state->set_exec_thread(new Thread("query-exec-state", : Substitute("exec-plan-fragment-$0", PrintId(fragment_id)), : &QueryExecMgr::ExecInstance, this, fragment_inst_exec_state)); Why is the QEM running the thread, and not the QES? If it's to ensure a reference exists to the QES, have the fragment take it during RegisterFragmentInstance? Line 123: DCHECK(i->second->num_current_references_ != 0); DCHECK_GT Line 142: query_exec_state_map_.find(query_id); one line? PS10, Line 146: std remove std:: PS10, Line 148: ++(i->second->num_current_references_); Who returns this reference? The caller of FindOrCreate...() can't tell if it got a reference (create) or just got a pointer (find). PS10, Line 159: lexical_cast PrintId PS10, Line 167: this QESGuard() takes a QEM*. Did this compile? http://gerrit.cloudera.org:8080/#/c/3817/10/be/src/service/query-exec-mgr.h File be/src/service/query-exec-mgr.h: PS10, Line 31: shared 'Shared' suggests ownership. Better is to describe that the QES manages, or owns, the fragment instances. Line 45: /// does for RegisterFragmentInstanceWithQuery()'s return value. I think after the batching change the QEM shouldn't ever know about fragment instances. Instead it should have an API that says 'register this query' with a single data structure. PS10, Line 46: QueryExecMgr please instrument this class with more metrics as needed (e.g. number of running queries). PS10, Line 76: Status CancelPlanFragment(const TUniqueId& query_id, : const TUniqueId& fragment_instance_id); Remind me why we have this shorthand for: GetQueryExecState(query_id)->CancePlanFragment(fragment_id) ? PS10, Line 84: c'tor this comment is describing a class, not a c'tor. Not sure what you're trying to say here. PS10, Line 87: nothing imprecise - talk in terms of invariants on methods, e.g. query_exec_state() will return nullptr if the requested QES does not exist, otherwise it will return a pointer to that QES and the lifetime of that QES is guaranteed to be at least as long as that of the referring QESGuard. PS10, Line 90: /// This class makes sure that the caller cannot directly access the QueryExecState : /// and allowing access to the QES only through the APIs provided from this class. : /// This is so that the caller doesn't accidentaly try and transfer ownership of the : /// QES. : /// TODO: Is this really necessary? The downside is that this class needs to duplicate : /// function signatures for every function the QES wants to expose. Isn't this comment no longer true given QESGuard::query_exec_state() ? PS10, Line 99: QueryExecMgr why is this an argument? Surely QEM is a process-wide singleton, and we can get at it in other ways (maybe the ExecEnv has a pointer?) Line 112: QueryExecState* query_exec_state_; add DISALLOW_COPY_AND_ASSIGN, otherwise you have the same leakage problems that people are concerned about with shared_ptr, just worse. PS10, Line 124: QueryExecStateMap can't get on one line? PS10, Line 127: /// The thread pool that is in charge of destroying a QueryExecState when it is no : /// longer in use. : ThreadPool<QueryExecState*> destroy_thread_; Do we have evidence that this is necessary, or should we remove this mechanism until it's proven to be a problem? Line 137: void DestroyQueryExecState(uint32_t thread_id, QueryExecState* query_exec_state); mention that it's invoked by the thread pool. -- To view, visit http://gerrit.cloudera.org:8080/3817 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I892091d6401acb2ea91ccb1623af54c6f9635e6c Gerrit-PatchSet: 10 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Sailesh Mukil <[email protected]> Gerrit-Reviewer: Dan Hecht <[email protected]> Gerrit-Reviewer: David Knupp <[email protected]> Gerrit-Reviewer: Henry Robinson <[email protected]> Gerrit-Reviewer: Lars Volker <[email protected]> Gerrit-Reviewer: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Matthew Jacobs <[email protected]> Gerrit-Reviewer: Sailesh Mukil <[email protected]> Gerrit-HasComments: Yes
