Marcel Kornacker has posted comments on this change. Change subject: IMPALA-3902: Scheduler improvements for running multiple fragment instances on a single backend ......................................................................
Patch Set 4: (100 comments) http://gerrit.cloudera.org:8080/#/c/4054/3//COMMIT_MSG Commit Message: > nit: would you mind wrapping the text in commit messages to 80 characters? Done PS3, Line 10: oordinator for multi-threaded > "at most one instance of each fragment" (there are already several fragment Done PS3, Line 13: eave > typo Done PS3, Line 19: ring th > fragment Done PS3, Line 19: ring th > fragment Done PS3, Line 7: IMPALA-3902: Scheduler improvements for running multiple fragment instances on : a single backend : : This is an extension of the scheduler and coordinator for multi-threaded : execution. It mainly removes the assumption of having one instance per : fragment per host. The approach taken here is to create parallel data : structures and control flow functions, where necessary, and otherwise to leave : the existing single-instance logic in place. The parallel structures' and : functions' names are prefixed with "Mt" to facilitate the enventual clean-up. : Not much of an attempt was made to factor out common functionality between : the Mt- and the single-threaded version, because the single-threaded version : will eventually disappear (once multi-threaded execution is the default) : and refactoring the existing code to fit into two parallel functions from : which it's being called might en > nit: try and keep commit messages to 90 chars at most - otherwise they're v Done http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/common/global-types.h File be/src/common/global-types.h: PS3, Line 30: int > nit: say int32_t so the width is explicit. we don't do that for the other ones. what's the concern with not having the width be explicit? http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: PS3, Line 226: /// same fragment, but this ID u > It's getting kind of confusing with all the ids and indexes. It might be he Done PS3, Line 238: k ti > why is this an 'id' but the other monotonically increasing *idx_ fields bel it doesn't have to be. i asked that same question in Planner.thrift. PS3, Line 244: Summed acr > This name isn't very clear. Maybe coord_finstance_state_idx_, coord_state_ once you stare at this code enough, the term 'state' becomes much more specific. :) but i can rename it if it's still too vague. PS3, Line 375: } : : Status Coordinator::Exec(const QuerySchedule& schedule, : vector<ExprContext*>* output_expr_ctxs) { : const TQueryExecRequest& request = schedule.request(); : DCHECK(request.fragments.size() > 0 || request.mt_plan_exec_info.size() > 0); : needs_finalization_ = request.__isset.finalize_params; : if (needs_finalization_) finalize_params_ = request.finalize_params; : : VLOG_QUERY << "Exec() query_id=" << schedule.query_id(); : > I agree that this and subsequent functions should be in a utility class. Th there are only two functions left for this utility class, which seems too meager. i moved those two into QuerySchedule. PS3, Line 405: n > space before Done Line 405: // TODO: move initial setup into a separate function; right now part of it > space before = Done PS3, Line 407: instance state > is_mt_exec for consistency above Done PS3, Line 408: int32_t num_fragment_instances = schedule.GetTotalFInstanc > can you add a comment about why this is necessary in the MT case only? moved into QuerySchedule and added comment PS3, Line 408: 2_t num_fragmen > why is this only needed for MT? Done PS3, Line 443: T > nit space Done PS3, Line 443: T > space Done Line 443: MemTracker::GetQueryMemTracker(query_id_, query_limit, -1, pool_tracker, NULL); > space before = Done PS3, Line 466: files_[coord_fragment.id], 0, obj_pool())); > parentheses? Sometimes we have them in these cases. i don't think this is unclear, so i'd say parentheses here don't add anything. PS3, Line 560: lse { : SetExecPlanFragmentParams( > this is a bit outdated since we know there is a coordinator fragment here, moved comment to where it makes sense PS3, Line 651: UERY << "starting " << num_fragment_instances << " fragment instances for query " : << query_id_; : query_events_->MarkEvent( > please use a lambda here instead. it started out that way but it was kind of painful - you need to list every parameter twice, once in the capture list (some things need to be passed by value, some by reference) and once as a parameter. i found it to be more verbose. PS3, Line 658: r to avoi > fragment instances Done PS3, Line 662: for (const MtFragmentExecParams& fragmen > remove? Done PS3, Line 664: > nit extra space Done PS3, Line 675: > fragment instances Done Line 695: for (FragmentInstanceState* exec_state: fragment_instance_states_) { > long line Done PS3, Line 702: ofile_->AddInfoString( : "Fragment instance start latencies", latencies.ToHumanReadable()); > also prefer lambda here for readability see above PS3, Line 708: ncelInternal( > nit: indentation this is accepted indentation PS3, Line 710: > fragment instances Done PS3, Line 1128: if (return_status.ok() > rename this to finst_completion_cv_? renamed to instance_completion_cv PS3, Line 1158: > is that true? yes, only insert plans have multiple sinks (and no coord fragment) Line 1389: node_summary.__set_estimated_stats(node.estimated_stats); > Maybe clear exec_summary_.nodes here or DCHECK if it's not already empty. Done PS3, Line 1411: etTPlanFragments(&fragments); > can you move this out of the node loop (e.g. int num_frag_instances) so it' Done PS3, Line 1429: coord_fragment->display_name) > remove? Done PS3, Line 1441: a->a > TODO-MT? this is really independent of the mt work. the way we deal with the coordinator fragment is special and screwed up, and the way its profile is set up is part of that. PS3, Line 1446: > nit: would be nice to capitalize and punctuate comments that are sentences. historically, single-sentence comments were not capitalized and punctuated. i don't feel it impacts the intelligibility of the comment. Line 1469: } > weird early line break Done Line 1502: exec_state->impalad_address(), &client_connect_status); > remove? Done PS3, Line 1517: PrintId(exec_state->fragment_instance_id()), rpc_status.msg().msg()); : VLOG_QU > indent this is acceptable indentation (we have that all over the place) PS3, Line 1654: // keep going > don't we need to cancel that as well? we haven't so far. PS3, Line 2007: } : for (FragmentInstanceState* state: fragment_instance_states_) { : lock_guard<mutex> l(*state->lock()); : if (state->error_log()->size() > 0) MergeErrorMaps(&merged, *state->error_log()); : } : retur > If I understand this correctly, the meaning of this counter changes since I i'm not sure what you mean. are you saying this code isn't collecting the peak any longer? PS3, Line 2045: > remove? Done PS3, Line 2131: > ? Done PS3, Line 2131: tuple_i > ?? Done PS3, Line 2214: // Make a 'master' copy that will be shared by all concurrent delivery RPC attempts. : shared_ptr<TPublishFilterParams> rpc_params(new TPublishFilterParams()); > please revert. Anonymous namespaces are slightly more flexible (can declare i'll switch it back, but in general please have specific reasons for asking for code changes (there are no local types declared here, and using a namespace simply adds a few extra lines). PS3, Line 2300: > this seems dangerous. There's no guarantee that DistributeFilters will comp switching back to copies, but leaving a todo. the coordinator should absolutely not get destroyed while running this function (and the current qes/qem work should fix that). http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: PS3, Line 346: Number of ins > initialize this as well? Done PS3, Line 360: > prefer nullptr now for C++11 Done PS3, Line 453: ams* rpc_params); > coord is constant across all fragments (and for the lifetime of the process turns out that parameter wasn't used in the first place. PS3, Line 455: ms, TExecPlanFragmentParams* rpc_params); : : /// Wrapper for ExecPlanFragment() RPC. This function will be called in parallel from : /// multiple threads. Creates a new FragmentInstanceState and regis > nit: format these to avoid an extra line or two Done Line 562: /// Starts all remote fragment instances contained in the schedule by issuing RPCs in > static? moved PS3, Line 565: MtStartRemoteFIns > Could this be static? Implementation doesn't seem to access instance member moved Line 566: > std:: (worth understanding how this compiled without it...) Done Line 569: /// Also updates query_profile_ with the startup latency histogram. > static? moved PS3, Line 582: > can there be > 1 coordinator fragment instance in MT? clarified PS3, Line 589: > Name doesn't really reflect what this does. what would be a better name? http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/query-schedule.cc File be/src/scheduling/query-schedule.cc: PS3, Line 319: t re > remove else since you return on line 318 Done PS3, Line 321: ragm > use auto only for iterators Done http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/query-schedule.h File be/src/scheduling/query-schedule.h: PS3, Line 67: TUniqueId > if we are going to have a lot of these FInstanceExecParams, why not have in it's more autonomous this way. if there's evidence that this is using up too much cache space, i'd be happy to change it, but that's not the case at this point. the coordinator has a lot of functions with lots of parameters, and i'm trying to get away from that by packaging things up that belong together. PS3, Line 73: > Initialize this? Done PS3, Line 75: /// uniquely identify it to a receiver. -1 = inva > ownership not clear; we usually avoid this pattern. clarified in comment. this is advantageous because even if you need the fragment params you only need to pass one struct around. PS3, Line 85: per_fragment_instance_idx(per_fragment_instance_idx), : sender_id(-1), > comment Done PS3, Line 92: /// ou > ? Done PS3, Line 107: std::vector<FInstanceExecParams> instance_exec_params; : > remove (it was done) Done Line 184: /// Map node ids to the id of their containing fragment. > I think adding mutable accessors means that the QuerySchedule's role is har it has to be mutable, because it gets created in the scheduler. PS3, Line 195: e > remove space Done PS3, Line 196: t32_t GetNodeIdx(PlanNodeId id) const { return plan_node_to_plan_node_idx_[id]; } : : const TPlanNode& GetNode(PlanNodeId id) const { > write this in terms of GetCoordInstanceExecParams()? Any reason this needs Done http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/simple-scheduler.cc File be/src/scheduling/simple-scheduler.cc: PS3, Line 312: /// TODO-MT: fix this (do we even need to report it?) > this is supposed to be the number of active backends - the metric was renam i'll leave it as-is for now. Line 379: RuntimeProfile::Counter* total_assignment_timer = > one line? Done PS3, Line 384: t au > auto only for iterators Done PS3, Line 403: ntry.second, e > nit: some of these parameters could go to the previous line, maybe saving a i prefer to break before the first param if the whole call doesn't fit on a line, i think it's easier to see what's being called. PS3, Line 429: nFrag > Can you add a comment to explain the "+ 1"? Done Line 434: DCHECK( > nit: formatting Done Line 436: || sink.output_partition.type == TPartitionType::HASH_PARTITIONED > move to line 443 Done Line 448: dest.__set_server(dest_params->instance_exec_params[j].host); > You could resize src_params->destinations and then call setters on the elem Done PS3, Line 478: e == TPartitionTyp > Replace with local_backend_descriptor_.address? Done PS3, Line 483: > sentence fragment? no. rephrased. Line 509: for (const auto& assignment_entry: fragment_params->scan_range_assignment) { > Incomplete comment? Done PS3, Line 522: > nit: extra space Done Line 530: float avg_bytes_per_instance = static_cast<float>(total_size) / num_instances; > formatting Done PS3, Line 534: > can this be 0? Done PS3, Line 535: > we often use int64_t for those types. Is there a difference? oops, habit. i think we pulled this in from the google includes. PS3, Line 543: TScanRangeParams& scan_range_params = params_list[pa > Maybe use a variable for this to make it more readable? Done PS3, Line 554: le* schedule) { : DCHECK_GE(fragment_params->in > 'mirror instance' isn't clear. What is reflected? rephrased comment. PS3, Line 565: > unused? Done http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/simple-scheduler.h File be/src/scheduling/simple-scheduler.h: PS3, Line 489: /// Compute the assignment of scan ranges to hosts for each scan node in : /// the schedule's TQueryExecRequest.mt_plan_exec_info. : /// Unpartitioned fragments are assigned to the coordinator. Populate the schedule's : /// mt_fragment_exec_params_ with the resulting scan range assignment. : Status MtComputeScanRangeAssignment(QuerySchedule* schedule); : : /// Compute the MtFragmentExecParams for all plans in the schedule's : /// TQueryExecRequest.mt_plan > comments? Done http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/Frontend.thrift File common/thrift/Frontend.thrift: PS3, Line 344: Info > is there a better word than 'info' - what does a data structure contain if i'm open to suggestions, but i think -params and -parameters are too close not to be confusing. PS3, Line 353: // TODO: remove; TPlanFragment.output_s > why isn't this part of TPlanFragment? (with -1 being the value for the root turns out this is redundant, TPlanFragment.output_sink has the destination node (by which we can look up the fragment). however, this is general clean-up, so not specific to mt execution. left a todo in frontend.thrift. http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/ImpalaInternalService.thrift File common/thrift/ImpalaInternalService.thrift: PS3, Line 185: degree of parallelism > somewhere this needs to be made very precise: is this the number of active clarified in ImpalaService.thrift PS3, Line 188: // 1: single-threaded execution mode > Not really relevant for this change, but I think we should probably have th good idea, will leave todo in qes.cc. PS3, Line 188: > dop? Done Line 254: // Client request containing stmt to execute and query options. > I'd like to see TQueryExecRequest renamed to TQueryExecutionPlan or similar we'll have to think more about that - 'plan' already has a different connotation (= a collection of fragments that together provide materialized output; a single query in the new execution model can require multiple plans, all but the first one for join build materialization). http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: Line 216: // Multi-threaded execution: degree of parallelism = number of active threads per > see other comment - let's please nail down exactly what this means (this se Done http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/Planner.thrift File common/thrift/Planner.thrift: PS3, Line 35: // TODO: should this be called idx, to distinguish more clearly from a : // globally unique id? > I think ID is ok - idx automatically makes me wonder what structure is bein that's the intention behind this id - that it can be used to index into query-wide per-fragment data structures (of which we have several). http://gerrit.cloudera.org:8080/#/c/4054/3/fe/src/main/java/com/cloudera/impala/common/TreeNode.java File fe/src/main/java/com/cloudera/impala/common/TreeNode.java: Line 63: protected <C extends TreeNode<NodeType>> void getNodesPreOrderAux( > space before void Done http://gerrit.cloudera.org:8080/#/c/4054/3/fe/src/main/java/com/cloudera/impala/service/Frontend.java File fe/src/main/java/com/cloudera/impala/service/Frontend.java: Line 964: // The fragment at this point has all state set, serialize it to thrift. > so we are going to remove the resource estimates reported in the explain pl left a todo Line 981: Preconditions.checkState(queryCtx.request.query_options.mt_dop > 1); > fix comment: input is a planner Done Line 998: for (PlanFragment planRoot: planRoots) { > The explain level setting is somewhat subtle, maybe factor out that part. moved into getExplainString() Line 1042: fragmentIdx.put(fragment, idx); > Fair enough, but I don't see the old code going away anytime soon (i.e. sev i don't want to add a bunch of code now, like accessor functions, that make the code harder to read and will need to be thrown away later on. this functionality hasn't changed in a long time. i left a todo to pull out common functionality in case we ever need to make the same change in both this function and the mt counterpart. -- To view, visit http://gerrit.cloudera.org:8080/4054 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I240445199e22f009f4e72fdb8754eb8d77e3d680 Gerrit-PatchSet: 4 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Alex Behm <[email protected]> Gerrit-Reviewer: Dan Hecht <[email protected]> Gerrit-Reviewer: Henry Robinson <[email protected]> Gerrit-Reviewer: Lars Volker <[email protected]> Gerrit-Reviewer: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Matthew Jacobs <[email protected]> Gerrit-HasComments: Yes
