Marcel Kornacker has posted comments on this change.

Change subject: IMPALA-3902: Scheduler improvements for running multiple 
fragment instances on a single backend
......................................................................


Patch Set 4:

(100 comments)

http://gerrit.cloudera.org:8080/#/c/4054/3//COMMIT_MSG
Commit Message:

> nit: would you mind wrapping the text in commit messages to 80 characters? 
Done


PS3, Line 10: oordinator for multi-threaded
> "at most one instance of each fragment" (there are already several fragment
Done


PS3, Line 13: eave
> typo
Done


PS3, Line 19: ring th
> fragment
Done


PS3, Line 19: ring th
> fragment
Done


PS3, Line 7: IMPALA-3902: Scheduler improvements for running multiple fragment 
instances on
           : a single backend
           : 
           : This is an extension of the scheduler and coordinator for 
multi-threaded
           : execution. It mainly removes the assumption of having one instance 
per
           : fragment per host. The approach taken here is to create parallel 
data
           : structures and control flow functions, where necessary, and 
otherwise to leave
           : the existing single-instance logic in place. The parallel 
structures' and
           : functions' names are prefixed with "Mt" to facilitate the 
enventual clean-up.
           : Not much of an attempt was made to factor out common functionality 
between
           : the Mt- and the single-threaded version, because the 
single-threaded version
           : will eventually disappear (once multi-threaded execution is the 
default)
           : and refactoring the existing code to fit into two parallel 
functions from
           : which it's being called might en
> nit: try and keep commit messages to 90 chars at most - otherwise they're v
Done


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/common/global-types.h
File be/src/common/global-types.h:

PS3, Line 30: int
> nit: say int32_t so the width is explicit.
we don't do that for the other ones.

what's the concern with not having the width be explicit?


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS3, Line 226:   /// same fragment, but this ID u
> It's getting kind of confusing with all the ids and indexes. It might be he
Done


PS3, Line 238: k ti
> why is this an 'id' but the other monotonically increasing *idx_ fields bel
it doesn't have to be. i asked that same question in Planner.thrift.


PS3, Line 244: Summed acr
> This name isn't very clear. Maybe coord_finstance_state_idx_,  coord_state_
once you stare at this code enough, the term 'state' becomes much more 
specific. :)

but i can rename it if it's still too vague.


PS3, Line 375: }
             : 
             : Status Coordinator::Exec(const QuerySchedule& schedule,
             :     vector<ExprContext*>* output_expr_ctxs) {
             :   const TQueryExecRequest& request = schedule.request();
             :   DCHECK(request.fragments.size() > 0 || 
request.mt_plan_exec_info.size() > 0);
             :   needs_finalization_ = request.__isset.finalize_params;
             :   if (needs_finalization_) finalize_params_ = 
request.finalize_params;
             : 
             :   VLOG_QUERY << "Exec() query_id=" << schedule.query_id();
             :  
> I agree that this and subsequent functions should be in a utility class. Th
there are only two functions left for this utility class, which seems too 
meager. i moved those two into QuerySchedule.


PS3, Line 405: n
> space before
Done


Line 405:   // TODO: move initial setup into a separate function; right now 
part of it
> space before =
Done


PS3, Line 407:  instance state
> is_mt_exec for consistency above
Done


PS3, Line 408:   int32_t num_fragment_instances = schedule.GetTotalFInstanc
> can you add a comment about why this is necessary in the MT case only?
moved into QuerySchedule and added comment


PS3, Line 408: 2_t num_fragmen
> why is this only needed for MT?
Done


PS3, Line 443: T
> nit space
Done


PS3, Line 443: T
> space
Done


Line 443:         MemTracker::GetQueryMemTracker(query_id_, query_limit, -1, 
pool_tracker, NULL);
> space before =
Done


PS3, Line 466: files_[coord_fragment.id], 0, obj_pool()));
> parentheses? Sometimes we have them in these cases.
i don't think this is unclear, so i'd say parentheses here don't add anything.


PS3, Line 560: lse {
             :     SetExecPlanFragmentParams(
> this is a bit outdated since we know there is a coordinator fragment here, 
moved comment to where it makes sense


PS3, Line 651: UERY << "starting " << num_fragment_instances << " fragment 
instances for query "
             :              << query_id_;
             :   query_events_->MarkEvent(
> please use a lambda here instead.
it started out that way but it was kind of painful - you need to list every 
parameter twice, once in the capture list (some things need to be passed by 
value, some by reference) and once as a parameter. i found it to be more 
verbose.


PS3, Line 658: r to avoi
> fragment instances
Done


PS3, Line 662:   for (const MtFragmentExecParams& fragmen
> remove?
Done


PS3, Line 664: 
> nit extra space
Done


PS3, Line 675: 
> fragment instances
Done


Line 695:   for (FragmentInstanceState* exec_state: fragment_instance_states_) {
> long line
Done


PS3, Line 702: ofile_->AddInfoString(
             :       "Fragment instance start latencies", 
latencies.ToHumanReadable());
> also prefer lambda here for readability
see above


PS3, Line 708: ncelInternal(
> nit: indentation
this is accepted indentation


PS3, Line 710: 
> fragment instances
Done


PS3, Line 1128: if (return_status.ok()
> rename this to finst_completion_cv_?
renamed to instance_completion_cv


PS3, Line 1158: 
> is that true?
yes, only insert plans have multiple sinks (and no coord fragment)


Line 1389:           node_summary.__set_estimated_stats(node.estimated_stats);
> Maybe clear exec_summary_.nodes here or DCHECK if it's not already empty.
Done


PS3, Line 1411: etTPlanFragments(&fragments);
> can you move this out of the node loop (e.g. int num_frag_instances) so it'
Done


PS3, Line 1429:         coord_fragment->display_name)
> remove?
Done


PS3, Line 1441: a->a
> TODO-MT?
this is really independent of the mt work. the way we deal with the coordinator 
fragment is special and screwed up, and the way its profile is set up is part 
of that.


PS3, Line 1446:  
> nit: would be nice to capitalize and punctuate comments that are sentences.
historically, single-sentence comments were not capitalized and punctuated. i 
don't feel it impacts the intelligibility of the comment.


Line 1469:     }
> weird early line break
Done


Line 1502:       exec_state->impalad_address(), &client_connect_status);
> remove?
Done


PS3, Line 1517:         PrintId(exec_state->fragment_instance_id()), 
rpc_status.msg().msg());
              :     VLOG_QU
> indent
this is acceptable indentation (we have that all over the place)


PS3, Line 1654: // keep going
> don't we need to cancel that as well?
we haven't so far.


PS3, Line 2007:   }
              :   for (FragmentInstanceState* state: fragment_instance_states_) 
{
              :     lock_guard<mutex> l(*state->lock());
              :     if (state->error_log()->size() > 0)  
MergeErrorMaps(&merged, *state->error_log());
              :   }
              :   retur
> If I understand this correctly, the meaning of this counter changes since I
i'm not sure what you mean. are you saying this code isn't collecting the peak 
any longer?


PS3, Line 2045: 
> remove?
Done


PS3, Line 2131:   
> ?
Done


PS3, Line 2131:       tuple_i
> ??
Done


PS3, Line 2214:   // Make a 'master' copy that will be shared by all concurrent 
delivery RPC attempts.
              :   shared_ptr<TPublishFilterParams> rpc_params(new 
TPublishFilterParams());
> please revert. Anonymous namespaces are slightly more flexible (can declare
i'll switch it back, but in general please have specific reasons for asking for 
code changes (there are no local types declared here, and using a namespace 
simply adds a few extra lines).


PS3, Line 2300: 
> this seems dangerous. There's no guarantee that DistributeFilters will comp
switching back to copies, but leaving a todo. the coordinator should absolutely 
not get destroyed while running this function (and the current qes/qem work 
should fix that).


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

PS3, Line 346: Number of ins
> initialize this as well?
Done


PS3, Line 360: 
> prefer nullptr now for C++11
Done


PS3, Line 453: ams* rpc_params);
> coord is constant across all fragments (and for the lifetime of the process
turns out that parameter wasn't used in the first place.


PS3, Line 455: ms, TExecPlanFragmentParams* rpc_params);
             : 
             :   /// Wrapper for ExecPlanFragment() RPC. This function will be 
called in parallel from
             :   /// multiple threads. Creates a new FragmentInstanceState and 
regis
> nit: format these to avoid an extra line or two
Done


Line 562:   /// Starts all remote fragment instances contained in the schedule 
by issuing RPCs in
> static?
moved


PS3, Line 565: MtStartRemoteFIns
> Could this be static? Implementation doesn't seem to access instance member
moved


Line 566: 
> std:: (worth understanding how this compiled without it...)
Done


Line 569:   /// Also updates query_profile_ with the startup latency histogram.
> static?
moved


PS3, Line 582: 
> can there be > 1 coordinator fragment instance in MT?
clarified


PS3, Line 589: 
> Name doesn't really reflect what this does.
what would be a better name?


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/query-schedule.cc
File be/src/scheduling/query-schedule.cc:

PS3, Line 319: t re
> remove else since you return on line 318
Done


PS3, Line 321: ragm
> use auto only for iterators
Done


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/query-schedule.h
File be/src/scheduling/query-schedule.h:

PS3, Line 67: TUniqueId
> if we are going to have a lot of these FInstanceExecParams, why not have in
it's more autonomous this way. if there's evidence that this is using up too 
much cache space, i'd be happy to change it, but that's not the case at this 
point.

the coordinator has a lot of functions with lots of parameters, and i'm trying 
to get away from that by packaging things up that belong together.


PS3, Line 73: 
> Initialize this?
Done


PS3, Line 75: /// uniquely identify it to a receiver. -1 = inva
> ownership not clear; we usually avoid this pattern.
clarified in comment.

this is advantageous because even if you need the fragment params you only need 
to pass one struct around.


PS3, Line 85:     per_fragment_instance_idx(per_fragment_instance_idx),
            :       sender_id(-1),
> comment
Done


PS3, Line 92: /// ou
> ?
Done


PS3, Line 107:   std::vector<FInstanceExecParams> instance_exec_params;
             : 
> remove (it was done)
Done


Line 184:   /// Map node ids to the id of their containing fragment.
> I think adding mutable accessors means that the QuerySchedule's role is har
it has to be mutable, because it gets created in the scheduler.


PS3, Line 195: e
> remove space
Done


PS3, Line 196: t32_t GetNodeIdx(PlanNodeId id) const { return 
plan_node_to_plan_node_idx_[id]; }
             : 
             :   const TPlanNode& GetNode(PlanNodeId id) const {
> write this in terms of GetCoordInstanceExecParams()? Any reason this needs 
Done


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/simple-scheduler.cc
File be/src/scheduling/simple-scheduler.cc:

PS3, Line 312: /// TODO-MT: fix this (do we even need to report it?)
> this is supposed to be the number of active backends - the metric was renam
i'll leave it as-is for now.


Line 379:   RuntimeProfile::Counter* total_assignment_timer =
> one line?
Done


PS3, Line 384: t au
> auto only for iterators
Done


PS3, Line 403: ntry.second, e
> nit: some of these parameters could go to the previous line, maybe saving a
i prefer to break before the first param if the whole call doesn't fit on a 
line, i think it's easier to see what's being called.


PS3, Line 429: nFrag
> Can you add a comment to explain the "+ 1"?
Done


Line 434:       DCHECK(
> nit: formatting
Done


Line 436:             || sink.output_partition.type == 
TPartitionType::HASH_PARTITIONED
> move to line 443
Done


Line 448:         dest.__set_server(dest_params->instance_exec_params[j].host);
> You could resize src_params->destinations and then call setters on the elem
Done


PS3, Line 478: e == TPartitionTyp
> Replace with local_backend_descriptor_.address?
Done


PS3, Line 483: 
> sentence fragment?
no. rephrased.


Line 509:   for (const auto& assignment_entry: 
fragment_params->scan_range_assignment) {
> Incomplete comment?
Done


PS3, Line 522: 
> nit: extra space
Done


Line 530:     float avg_bytes_per_instance = static_cast<float>(total_size) / 
num_instances;
> formatting
Done


PS3, Line 534: 
> can this be 0?
Done


PS3, Line 535:      
> we often use int64_t for those types. Is there a difference?
oops, habit. i think we pulled this in from the google includes.


PS3, Line 543: TScanRangeParams& scan_range_params = params_list[pa
> Maybe use a variable for this to make it more readable?
Done


PS3, Line 554: le* schedule) {
             :   DCHECK_GE(fragment_params->in
> 'mirror instance' isn't clear. What is reflected?
rephrased comment.


PS3, Line 565: 
> unused?
Done


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/simple-scheduler.h
File be/src/scheduling/simple-scheduler.h:

PS3, Line 489: /// Compute the assignment of scan ranges to hosts for each scan 
node in 
             :   /// the schedule's TQueryExecRequest.mt_plan_exec_info.
             :   /// Unpartitioned fragments are assigned to the coordinator. 
Populate the schedule's
             :   /// mt_fragment_exec_params_ with the resulting scan range 
assignment.
             :   Status MtComputeScanRangeAssignment(QuerySchedule* schedule);
             : 
             :   /// Compute the MtFragmentExecParams for all plans in the 
schedule's
             :   /// TQueryExecRequest.mt_plan
> comments?
Done


http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/Frontend.thrift
File common/thrift/Frontend.thrift:

PS3, Line 344: Info
> is there a better word than 'info' - what does a data structure contain if 
i'm open to suggestions, but i think -params and -parameters are too close not 
to be confusing.


PS3, Line 353: // TODO: remove; TPlanFragment.output_s
> why isn't this part of TPlanFragment? (with -1 being the value for the root
turns out this is redundant, TPlanFragment.output_sink has the destination node 
(by which we can look up the fragment).

however, this is general clean-up, so not specific to mt execution. left a todo 
in frontend.thrift.


http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/ImpalaInternalService.thrift
File common/thrift/ImpalaInternalService.thrift:

PS3, Line 185: degree of parallelism
> somewhere this needs to be made very precise: is this the number of active 
clarified in ImpalaService.thrift


PS3, Line 188:   // 1: single-threaded execution mode
> Not really relevant for this change, but I think we should probably have th
good idea, will leave todo in qes.cc.


PS3, Line 188: 
> dop?
Done


Line 254:   // Client request containing stmt to execute and query options.
> I'd like to see TQueryExecRequest renamed to TQueryExecutionPlan or similar
we'll have to think more about that - 'plan' already has a different 
connotation (= a collection of fragments that together provide materialized 
output; a single query in the new execution model can require multiple plans, 
all but the first one for join build materialization).


http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

Line 216:   // Multi-threaded execution: degree of parallelism = number of 
active threads per
> see other comment - let's please nail down exactly what this means (this se
Done


http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/Planner.thrift
File common/thrift/Planner.thrift:

PS3, Line 35: // TODO: should this be called idx, to distinguish more clearly 
from a
            :   // globally unique id?
> I think ID is ok - idx automatically makes me wonder what structure is bein
that's the intention behind this id - that it can be used to index into 
query-wide per-fragment data structures (of which we have several).


http://gerrit.cloudera.org:8080/#/c/4054/3/fe/src/main/java/com/cloudera/impala/common/TreeNode.java
File fe/src/main/java/com/cloudera/impala/common/TreeNode.java:

Line 63:   protected <C extends TreeNode<NodeType>> void getNodesPreOrderAux(
> space before void
Done


http://gerrit.cloudera.org:8080/#/c/4054/3/fe/src/main/java/com/cloudera/impala/service/Frontend.java
File fe/src/main/java/com/cloudera/impala/service/Frontend.java:

Line 964:     // The fragment at this point has all state set, serialize it to 
thrift.
> so we are going to remove the resource estimates reported in the explain pl
left a todo


Line 981:     Preconditions.checkState(queryCtx.request.query_options.mt_dop > 
1);
> fix comment: input is a planner
Done


Line 998:     for (PlanFragment planRoot: planRoots) {
> The explain level setting is somewhat subtle, maybe factor out that part.
moved into getExplainString()


Line 1042:       fragmentIdx.put(fragment, idx);
> Fair enough, but I don't see the old code going away anytime soon (i.e. sev
i don't want to add a bunch of code now, like accessor functions, that make the 
code harder to read and will need to be thrown away later on.

this functionality hasn't changed in a long time. i left a todo to pull out 
common functionality in case we ever need to make the same change in both this 
function and the mt counterpart.


-- 
To view, visit http://gerrit.cloudera.org:8080/4054
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I240445199e22f009f4e72fdb8754eb8d77e3d680
Gerrit-PatchSet: 4
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Marcel Kornacker <[email protected]>
Gerrit-Reviewer: Alex Behm <[email protected]>
Gerrit-Reviewer: Dan Hecht <[email protected]>
Gerrit-Reviewer: Henry Robinson <[email protected]>
Gerrit-Reviewer: Lars Volker <[email protected]>
Gerrit-Reviewer: Marcel Kornacker <[email protected]>
Gerrit-Reviewer: Matthew Jacobs <[email protected]>
Gerrit-HasComments: Yes

Reply via email to