Henry Robinson has posted comments on this change.

Change subject: IMPALA-3902: Scheduler improvements for running multiple 
fragment instances on a single backend
......................................................................


Patch Set 3:

(48 comments)

Initial comments. Still thinking about a) how we can reduce the high number of 
confusingly named data structures, and better articulate their responsibilities 
and b) simplifying coordinator by factoring out some of its responsibilities. 

Both of these problems were pre-existing, but are exacerbated by introducing 
more data structures and more code into coordinator. It's quite hard to reason 
about what data structures are available where.

http://gerrit.cloudera.org:8080/#/c/4054/3//COMMIT_MSG
Commit Message:

PS3, Line 10: ingle fragment instance per host
"at most one instance of each fragment" (there are already several fragment 
instances per host).


PS3, Line 19: fragent
fragment


PS3, Line 7: IMPALA-3902: Scheduler improvements for running multiple fragment 
instances on a single backend
           : 
           : This is an extension of the scheduler and coordinator for 
multi-threaded execution. It
           : mainly removes the assumption of having a single fragment instance 
per host. The approach
           : taken here is to create parallel data structures and control flow 
functions, where
           : necessary, and otherwise to leave the existing single-instance 
logic in place. The parallel
           : structures' and functions' names are prefixed with "Mt" to 
facilitate the enventual clean-up.
           : 
           : Changes to data structures:
           : - QuerySchedule: per-instance and per-fragment structs with 
complete execution
           :   parameters (instead of partially relying on TQueryExecRequest); 
the per-instance
           :   execution parameter struct is a child of the per-fragment 
parameter struct
           : - explicit fragent id, with range 0..#fragments-1 (instead of 
relying on an index into
           :   an array in TQueryExecRequest)
nit: try and keep commit messages to 90 chars at most - otherwise they're very 
hard to read in Gerrit.


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/common/global-types.h
File be/src/common/global-types.h:

PS3, Line 30: int
nit: say int32_t so the width is explicit.


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS3, Line 375: const TPlanFragment* Coordinator::GetCoordFragment(
             :     const TQueryExecRequest& request) const {
             :   bool is_mt_exec = 
request.query_ctx.request.query_options.mt_dop != 1;
             :   const TPlanFragment* fragment = is_mt_exec
             :       ? &request.mt_plan_exec_info[0].fragments[0] : 
&request.fragments[0];
             :   if (fragment->partition.type == TPartitionType::UNPARTITIONED) 
{
             :     return fragment;
             :   } else {
             :     return NULL;
             :   }
             : }
I agree that this and subsequent functions should be in a utility class. The 
more stuff we can keep out of the header the better. Plus this might get 
removed when the ST code is removed.


Line 405:   bool has_coordinator_fragment= GetCoordFragment(schedule.request()) 
!= NULL;
space before =


Line 443:   bool has_coordinator_fragment= GetCoordFragment(request) != NULL;
space before =


PS3, Line 651: std::bind(&Coordinator::ExecRemoteFragment, this, 
std::cref(params),
             :           std::cref(request.fragments[fragment_idx]), 
fragment_instance_debug_options,
             :           std::cref(schedule), instance_state_idx, 
fragment_instance_idx));
please use a lambda here instead.


PS3, Line 658: fragments
fragment instances


PS3, Line 675: fragments
fragment instances


Line 695:         VLOG_QUERY << "MtStartRemoteFInstances(): coord_id=" << 
instance_params.instance_id;
long line


PS3, Line 702: std::bind(&Coordinator::MtExecRemoteFInstance, this, 
instance_params,
             :             instance_debug_options, std::cref(schedule), 
instance_state_idx, i)
also prefer lambda here for readability


PS3, Line 708: << query_id_;
nit: indentation


PS3, Line 710: fragments
fragment instances


PS3, Line 713: CancelOnStartupError
FinishQueryStartup()?


PS3, Line 1128: backend_completion_cv_
rename this to finst_completion_cv_?


PS3, Line 1654: if (exec_state->fragment_id() == 0) continue;  // the coord 
fragment
don't we need to cancel that as well?


PS3, Line 2131: //fragment_ct
??


PS3, Line 2214: static void DistributeFilters(shared_ptr<TPublishFilterParams> 
params,
              :     const TNetworkAddress& impalad, const TUniqueId& 
fragment_instance_id) {
please revert. Anonymous namespaces are slightly more flexible (can declare 
translation-unit-local types as well), so are preferable way to do this.


PS3, Line 2300: std::cref
this seems dangerous. There's no guarantee that DistributeFilters will complete 
before the coordinator is destroyed. That's why the arguments are copied in. I 
don't know much about cref, but isn't it making a reference_wrapper around a 
temporary? What's the problem with copying these small structs by value?


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

PS3, Line 360: NULL
prefer nullptr now for C++11


PS3, Line 453: const TNetworkAddress& coord
coord is constant across all fragments (and for the lifetime of the process). 
Make a coord_ member that's initialised in the c'tor to 
MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port) and remove this parameter.


Line 566:       vector<const TPlanFragment*>* fragments) const;
std:: (worth understanding how this compiled without it...)


PS3, Line 582: the coordinator fragment instance.
can there be > 1 coordinator fragment instance in MT?


PS3, Line 589: CancelOnStartupError
Name doesn't really reflect what this does.


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/query-schedule.cc
File be/src/scheduling/query-schedule.cc:

PS3, Line 319: else
remove else since you return on line 318


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/query-schedule.h
File be/src/scheduling/query-schedule.h:

PS3, Line 67: TUniqueId
if we are going to have a lot of these FInstanceExecParams, why not have 
instance_id be the i32 offset rather than the full query_id + instance id?


PS3, Line 75: const MtFragmentExecParams& fragment_exec_params;
ownership not clear; we usually avoid this pattern.


PS3, Line 85: std::vector<TPlanFragmentDestination> destinations;
            :   std::map<PlanNodeId, int> per_exch_num_senders;
comment


PS3, Line 92: //int 
?


PS3, Line 107: /// TODO: Consider moving QuerySchedule and all Schedulers into
             : /// their own lib (and out of statestore).
remove (it was done)


Line 184:   MtFragmentExecParams* GetFragmentExecParams(FragmentId id) {
I think adding mutable accessors means that the QuerySchedule's role is hard to 
discern - is it an immutable description of how a query should execute, or is 
it a mutable, evolving structure throughout query execution?

I think it's a mistake to have the QES create the QS, then pass it to the 
scheduler (not your doing, but exacerbated by adding these mutable accessors). 
That requires that the scheduler be able to update the QS after it's created. 
Instead, the scheduler should be able to build a QS piecemeal during the call 
to schedule() and return it to the QES fully-formed and immutable. Doing so 
would make the responsibilities of the scheduler more clear cut, and make it 
more clear what the post-conditions of schedule() are.


PS3, Line 195:  
remove space


PS3, Line 196: DCHECK(coord_fragment.partition.type == 
TPartitionType::UNPARTITIONED);
             :     const MtFragmentExecParams& fragment_params =
             :         mt_fragment_exec_params_[coord_fragment.id];
write this in terms of GetCoordInstanceExecParams()? Any reason this needs to 
DCHECK() where the former doesn't?


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/simple-scheduler.cc
File be/src/scheduling/simple-scheduler.cc:

PS3, Line 312: /// TODO-MT: fix this (do we even need to report it?)
this is supposed to be the number of active backends - the metric was renamed 
by a find-and-replace gone bad. Yes, we should report it. I'm not sure it's 
broken here.


Line 379:     QuerySchedule* schedule) {
one line?


Line 436:       PlanNodeId exch_id = sink.dest_node_id;
move to line 443


PS3, Line 483: // that instance gets all of the scan ranges, if there are any
sentence fragment?


PS3, Line 554: create a mirror
             : /// instance for this fragment.
'mirror instance' isn't clear. What is reflected?


PS3, Line 565: FInstanceExecParams& instance_param
unused?


http://gerrit.cloudera.org:8080/#/c/4054/3/be/src/scheduling/simple-scheduler.h
File be/src/scheduling/simple-scheduler.h:

PS3, Line 489: Status MtComputeScanRangeAssignment(QuerySchedule* schedule);
             :   void MtComputeFragmentExecParams(QuerySchedule* schedule);
             :   void MtComputeFragmentExecParams(const TPlanExecInfo& 
plan_exec_info,
             :       MtFragmentExecParams* fragment_params, QuerySchedule* 
schedule);
             :   void MtCreateScanInstances(PlanNodeId leftmost_scan_id,
             :       MtFragmentExecParams* fragment_params, QuerySchedule* 
schedule);
             :   void MtCreateMirrorInstances(MtFragmentExecParams* 
fragment_params,
             :       QuerySchedule* schedule);
comments?


http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/Frontend.thrift
File common/thrift/Frontend.thrift:

PS3, Line 344: Info
is there a better word than 'info' - what does a data structure contain if not 
information? TPlanExecParameters would be better to my reading (TPlanExecParams 
makes it sound like a parameter for an rpc, and I think we should avoid that 
possible source of confusion).


PS3, Line 353: 2: optional list<i32> dest_fragment_idx
why isn't this part of TPlanFragment? (with -1 being the value for the root 
fragment). It's not clear why there is a distinction between TPlanFragment and 
the other fields in this structure. One possibility is that TPlanFragment 
represents some abstract plan fragment without reference to the plan in which 
it exists (and these parameters then make it concrete - connecting it to other 
plan fragments and describing the input data. But none of the comments / code 
make it clear why we would want to separate the data structures in this way. 

For contrast, if all of this information were included in the TPlanFragment, it 
would simplify some code that wouldn't need to do lookups in the list here, and 
the map below. 

If they really are separate, I think it would read better to have a 

  struct TConcretePlanFragment {
    TPlanFragment abstract_fragment
     i32 dest_fragment_idx
     optional map<TPlanNodeId, TScanRangeLocations> scan_range_locations
  }


http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/ImpalaInternalService.thrift
File common/thrift/ImpalaInternalService.thrift:

PS3, Line 185: degree of parallelism
somewhere this needs to be made very precise: is this the number of active 
threads per query per node? The maximum number of active threads per plan 
fragment per node?


PS3, Line 188: number of cores
dop?


Line 254:   // TODO: rename to client_request, we have too many requests
I'd like to see TQueryExecRequest renamed to TQueryExecutionPlan or similar - 
'Request' has always confused me about who provides the information, and who 
consumes it.


http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

Line 216:   // Multi-threaded execution: degree of parallelism = number of 
cores per machine
see other comment - let's please nail down exactly what this means (this seems 
more precise - active number of threads per query per node?)


http://gerrit.cloudera.org:8080/#/c/4054/3/common/thrift/Planner.thrift
File common/thrift/Planner.thrift:

PS3, Line 35: // TODO: should this be called idx, to distinguish more clearly 
from a
            :   // globally unique id?
I think ID is ok - idx automatically makes me wonder what structure is being 
indexed (i.e. an index usually only exists with a data structure).


-- 
To view, visit http://gerrit.cloudera.org:8080/4054
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I240445199e22f009f4e72fdb8754eb8d77e3d680
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Marcel Kornacker <[email protected]>
Gerrit-Reviewer: Alex Behm <[email protected]>
Gerrit-Reviewer: Henry Robinson <[email protected]>
Gerrit-Reviewer: Marcel Kornacker <[email protected]>
Gerrit-HasComments: Yes

Reply via email to