Henry Robinson has posted comments on this change. Change subject: IMPALA-3902: Scheduler improvements for running multiple fragment instances on a single backend ......................................................................
Patch Set 3: (48 comments) Initial comments. Still thinking about a) how we can reduce the high number of confusingly named data structures, and better articulate their responsibilities and b) simplifying coordinator by factoring out some of its responsibilities. Both of these problems were pre-existing, but are exacerbated by introducing more data structures and more code into coordinator. It's quite hard to reason about what data structures are available where. http://gerrit.cloudera.org:8080/#/c/4054/3//COMMIT_MSG Commit Message: PS3, Line 10: ingle fragment instance per host "at most one instance of each fragment" (there are already several fragment instances per host). PS3, Line 19: fragent fragment PS3, Line 7: IMPALA-3902: Scheduler improvements for running multiple fragment instances on a single backend : : This is an extension of the scheduler and coordinator for multi-threaded execution. It : mainly removes the assumption of having a single fragment instance per host. The approach : taken here is to create parallel data structures and control flow functions, where : necessary, and otherwise to leave the existing single-instance logic in place. The parallel : structures' and functions' names are prefixed with "Mt" to facilitate the enventual clean-up. : : Changes to data structures: : - QuerySchedule: per-instance and per-fragment structs with complete execution : parameters (instead of partially relying on TQueryExecRequest); the per-instance : execution parameter struct is a child of the per-fragment parameter struct : - explicit fragent id, with range 0..#fragments-1 (instead of relying on an index into : an array in TQueryExecRequest) nit: try and keep commit messages to 90 chars at most - otherwise they're very hard to read in Gerrit. http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/common/global-types.h File be/src/common/global-types.h: PS3, Line 30: int nit: say int32_t so the width is explicit. http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: PS3, Line 375: const TPlanFragment* Coordinator::GetCoordFragment( : const TQueryExecRequest& request) const { : bool is_mt_exec = request.query_ctx.request.query_options.mt_dop != 1; : const TPlanFragment* fragment = is_mt_exec : ? &request.mt_plan_exec_info[0].fragments[0] : &request.fragments[0]; : if (fragment->partition.type == TPartitionType::UNPARTITIONED) { : return fragment; : } else { : return NULL; : } : } I agree that this and subsequent functions should be in a utility class. The more stuff we can keep out of the header the better. Plus this might get removed when the ST code is removed. Line 405: bool has_coordinator_fragment= GetCoordFragment(schedule.request()) != NULL; space before = Line 443: bool has_coordinator_fragment= GetCoordFragment(request) != NULL; space before = PS3, Line 651: std::bind(&Coordinator::ExecRemoteFragment, this, std::cref(params), : std::cref(request.fragments[fragment_idx]), fragment_instance_debug_options, : std::cref(schedule), instance_state_idx, fragment_instance_idx)); please use a lambda here instead. PS3, Line 658: fragments fragment instances PS3, Line 675: fragments fragment instances Line 695: VLOG_QUERY << "MtStartRemoteFInstances(): coord_id=" << instance_params.instance_id; long line PS3, Line 702: std::bind(&Coordinator::MtExecRemoteFInstance, this, instance_params, : instance_debug_options, std::cref(schedule), instance_state_idx, i) also prefer lambda here for readability PS3, Line 708: << query_id_; nit: indentation PS3, Line 710: fragments fragment instances PS3, Line 713: CancelOnStartupError FinishQueryStartup()? PS3, Line 1128: backend_completion_cv_ rename this to finst_completion_cv_? PS3, Line 1654: if (exec_state->fragment_id() == 0) continue; // the coord fragment don't we need to cancel that as well? PS3, Line 2131: //fragment_ct ?? PS3, Line 2214: static void DistributeFilters(shared_ptr<TPublishFilterParams> params, : const TNetworkAddress& impalad, const TUniqueId& fragment_instance_id) { please revert. Anonymous namespaces are slightly more flexible (can declare translation-unit-local types as well), so are preferable way to do this. PS3, Line 2300: std::cref this seems dangerous. There's no guarantee that DistributeFilters will complete before the coordinator is destroyed. That's why the arguments are copied in. I don't know much about cref, but isn't it making a reference_wrapper around a temporary? What's the problem with copying these small structs by value? http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: PS3, Line 360: NULL prefer nullptr now for C++11 PS3, Line 453: const TNetworkAddress& coord coord is constant across all fragments (and for the lifetime of the process). Make a coord_ member that's initialised in the c'tor to MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port) and remove this parameter. Line 566: vector<const TPlanFragment*>* fragments) const; std:: (worth understanding how this compiled without it...) PS3, Line 582: the coordinator fragment instance. can there be > 1 coordinator fragment instance in MT? PS3, Line 589: CancelOnStartupError Name doesn't really reflect what this does. http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/query-schedule.cc File be/src/scheduling/query-schedule.cc: PS3, Line 319: else remove else since you return on line 318 http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/query-schedule.h File be/src/scheduling/query-schedule.h: PS3, Line 67: TUniqueId if we are going to have a lot of these FInstanceExecParams, why not have instance_id be the i32 offset rather than the full query_id + instance id? PS3, Line 75: const MtFragmentExecParams& fragment_exec_params; ownership not clear; we usually avoid this pattern. PS3, Line 85: std::vector<TPlanFragmentDestination> destinations; : std::map<PlanNodeId, int> per_exch_num_senders; comment PS3, Line 92: //int ? PS3, Line 107: /// TODO: Consider moving QuerySchedule and all Schedulers into : /// their own lib (and out of statestore). remove (it was done) Line 184: MtFragmentExecParams* GetFragmentExecParams(FragmentId id) { I think adding mutable accessors means that the QuerySchedule's role is hard to discern - is it an immutable description of how a query should execute, or is it a mutable, evolving structure throughout query execution? I think it's a mistake to have the QES create the QS, then pass it to the scheduler (not your doing, but exacerbated by adding these mutable accessors). That requires that the scheduler be able to update the QS after it's created. Instead, the scheduler should be able to build a QS piecemeal during the call to schedule() and return it to the QES fully-formed and immutable. Doing so would make the responsibilities of the scheduler more clear cut, and make it more clear what the post-conditions of schedule() are. PS3, Line 195: remove space PS3, Line 196: DCHECK(coord_fragment.partition.type == TPartitionType::UNPARTITIONED); : const MtFragmentExecParams& fragment_params = : mt_fragment_exec_params_[coord_fragment.id]; write this in terms of GetCoordInstanceExecParams()? Any reason this needs to DCHECK() where the former doesn't? http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/simple-scheduler.cc File be/src/scheduling/simple-scheduler.cc: PS3, Line 312: /// TODO-MT: fix this (do we even need to report it?) this is supposed to be the number of active backends - the metric was renamed by a find-and-replace gone bad. Yes, we should report it. I'm not sure it's broken here. Line 379: QuerySchedule* schedule) { one line? Line 436: PlanNodeId exch_id = sink.dest_node_id; move to line 443 PS3, Line 483: // that instance gets all of the scan ranges, if there are any sentence fragment? PS3, Line 554: create a mirror : /// instance for this fragment. 'mirror instance' isn't clear. What is reflected? PS3, Line 565: FInstanceExecParams& instance_param unused? http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/simple-scheduler.h File be/src/scheduling/simple-scheduler.h: PS3, Line 489: Status MtComputeScanRangeAssignment(QuerySchedule* schedule); : void MtComputeFragmentExecParams(QuerySchedule* schedule); : void MtComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info, : MtFragmentExecParams* fragment_params, QuerySchedule* schedule); : void MtCreateScanInstances(PlanNodeId leftmost_scan_id, : MtFragmentExecParams* fragment_params, QuerySchedule* schedule); : void MtCreateMirrorInstances(MtFragmentExecParams* fragment_params, : QuerySchedule* schedule); comments? http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/Frontend.thrift File common/thrift/Frontend.thrift: PS3, Line 344: Info is there a better word than 'info' - what does a data structure contain if not information? TPlanExecParameters would be better to my reading (TPlanExecParams makes it sound like a parameter for an rpc, and I think we should avoid that possible source of confusion). PS3, Line 353: 2: optional list<i32> dest_fragment_idx why isn't this part of TPlanFragment? (with -1 being the value for the root fragment). It's not clear why there is a distinction between TPlanFragment and the other fields in this structure. One possibility is that TPlanFragment represents some abstract plan fragment without reference to the plan in which it exists (and these parameters then make it concrete - connecting it to other plan fragments and describing the input data. But none of the comments / code make it clear why we would want to separate the data structures in this way. For contrast, if all of this information were included in the TPlanFragment, it would simplify some code that wouldn't need to do lookups in the list here, and the map below. If they really are separate, I think it would read better to have a struct TConcretePlanFragment { TPlanFragment abstract_fragment i32 dest_fragment_idx optional map<TPlanNodeId, TScanRangeLocations> scan_range_locations } http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/ImpalaInternalService.thrift File common/thrift/ImpalaInternalService.thrift: PS3, Line 185: degree of parallelism somewhere this needs to be made very precise: is this the number of active threads per query per node? The maximum number of active threads per plan fragment per node? PS3, Line 188: number of cores dop? Line 254: // TODO: rename to client_request, we have too many requests I'd like to see TQueryExecRequest renamed to TQueryExecutionPlan or similar - 'Request' has always confused me about who provides the information, and who consumes it. http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: Line 216: // Multi-threaded execution: degree of parallelism = number of cores per machine see other comment - let's please nail down exactly what this means (this seems more precise - active number of threads per query per node?) http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/Planner.thrift File common/thrift/Planner.thrift: PS3, Line 35: // TODO: should this be called idx, to distinguish more clearly from a : // globally unique id? I think ID is ok - idx automatically makes me wonder what structure is being indexed (i.e. an index usually only exists with a data structure). -- To view, visit http://gerrit.cloudera.org:8080/4054 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I240445199e22f009f4e72fdb8754eb8d77e3d680 Gerrit-PatchSet: 3 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Alex Behm <[email protected]> Gerrit-Reviewer: Henry Robinson <[email protected]> Gerrit-Reviewer: Marcel Kornacker <[email protected]> Gerrit-HasComments: Yes
