Henry Robinson has posted comments on this change.

Change subject: PREVIEW IMPALA-2550: RPC batching
......................................................................


Patch Set 4:

(13 comments)

Started to review this, but there are lots of TODOs and missing comments and so 
on - do you want to ping me again when you think this is ready for a closer 
look?

http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS4, Line 422: const FragmentExecParams& fragment_exec_params = 
(*schedule.exec_params())[0];
             :     TQueryCtx query_ctx = query_ctx_;
             :     // TODO is this line needed?
             :     query_ctx.__set_query_id(query_id_);
             :     TPlanFragmentCtx fragment_ctx;
             :     fragment_ctx.__set_fragment(request.fragments[0]);
             :     
fragment_ctx.__set_num_fragment_instances(fragment_exec_params.instance_ids.size());
             :     TPlanFragmentInstanceCtx fragment_instance_ctx;
             :     SetFragmentInstanceCtx(schedule, request.fragments[0], 
fragment_exec_params, 0, 0, 0, coord, &fragment_instance_ctx);
             :     unordered_set<TTupleId> tuple_ids;
             :     CollectTupleIds(request.fragments[0], &tuple_ids);
             :     unordered_set<TTableId> table_ids;
             :     CollectOutputSinkTableIds(request.fragments[0], &table_ids);
             :     PopulateSharedDescriptorTable(tuple_ids, table_ids, 
&query_ctx);
             :     RETURN_IF_ERROR(executor_->Prepare(query_ctx, fragment_ctx, 
fragment_instance_ctx));
this block is very hard to read (please also note the long line, here and 
elsewhere). I think there is some opportunity to factor out fragment creation 
into some kind of builder or factory class.


PS4, Line 607: RPC's
nit: RPCs


PS4, Line 612: bind<void>(mem_fn(&Coordinator::ExecRemoteFragments), this,
             :           backend_address,
             :           debug_options,
             :           backend_exec_data,
             :           schedule
fwiw, I think lambdas are going to be easier to read here:

  ...->Offer([&] { this->ExecRemoteFragments(backend_address, debug_options, 
backend_exec_data, schedule); });

Or something.


PS4, Line 1395: batch_rpc_params
just rpc_params, I think.


PS4, Line 1404: // Notification barrier
              :   NotifyBarrierOnExit notifier(exec_complete_barrier_.get(),
              :       backend_exec_data->fragment_instances.size());
please put this back at the top of the method, to avoid bugs creeping in where 
someone returns without signalling the barrier.


PS4, Line 1424: int instance_state_idx = fragment_instance.instance_state_idx;
              :     int fragment_idx = fragment_instance.fragment_idx;
              :     int fragment_instance_idx = 
fragment_instance.fragment_instance_idx;
try and keep these declarations as close to their first point of use as 
possible.


PS4, Line 1456: after
should this comment have said 'before', not 'after'?


PS4, Line 1456: // Guard against concurrent UpdateFragmentExecStatus() that may 
arrive after RPC returns.
              :     // TODO How to fix this mess? Use std::unique_lock? Unlock 
the exec_states in
              :     // SetInitialStatus()?
              :     guards.push_back(shared_ptr<lock_guard<mutex>>(new 
lock_guard<mutex>(*exec_state->lock())));
I agree, we should do something. What would UpdateFragmentExecStatus() access 
that would be bad concurrently?

How about having the update RPC return the equivalent of EAGAIN, telling the 
sender to try again when the the call can be processed? We'd need a flag in the 
exec state, but wouldn't need to take a lock.


PS4, Line 1985: void Coordinator::SetFragmentInstanceCtx(QuerySchedule& 
schedule,
              :     const TPlanFragment& fragment, const FragmentExecParams& 
params,
              :     int instance_state_idx, int fragment_idx, int 
fragment_instance_idx,
              :     const TNetworkAddress& coord,
              :     TPlanFragmentInstanceCtx* fragment_instance_ctx) {
              : 
              :   TNetworkAddress exec_host = 
params.hosts[fragment_instance_idx];
              :   if (schedule.HasReservation()) {
              :     // The reservation has already have been validated at this 
point.
              :     TNetworkAddress resource_hostport;
              :     schedule.GetResourceHostport(exec_host, &resource_hostport);
              :     map<TNetworkAddress, 
llama::TAllocatedResource>::const_iterator it =
              :         
schedule.reservation()->allocated_resources.find(resource_hostport);
              :     // Only set reserved resource if we actually have one for 
this plan
              :     // fragment. Otherwise, don't set it (usually this the 
coordinator fragment), and it
              :     // won't participate in dynamic RM controls.
              :     if (it != 
schedule.reservation()->allocated_resources.end()) {
              :       
fragment_instance_ctx->__set_reserved_resource(it->second);
              :       
fragment_instance_ctx->__set_local_resource_address(resource_hostport);
              :     }
              :   }
              :   FragmentScanRangeAssignment::const_iterator it =
              :       params.scan_range_assignment.find(exec_host);
              :   // Scan ranges may not always be set, so use an empty 
structure if so.
              :   const PerNodeScanRanges& scan_ranges =
              :       (it != params.scan_range_assignment.end()) ? it->second : 
PerNodeScanRanges();
              : 
              :   
fragment_instance_ctx->__set_request_pool(schedule.request_pool());
              :   
fragment_instance_ctx->__set_per_node_scan_ranges(scan_ranges);
              :   
fragment_instance_ctx->__set_per_exch_num_senders(params.per_exch_num_senders);
              :   
fragment_instance_ctx->__set_destinations(params.destinations);
              :   fragment_instance_ctx->__set_sender_id(params.sender_id_base 
+ fragment_instance_idx);
              :   fragment_instance_ctx->fragment_instance_id = 
params.instance_ids[fragment_instance_idx];
              :   fragment_instance_ctx->fragment_instance_idx = 
fragment_instance_idx;
              :   fragment_instance_ctx->instance_state_idx = 
instance_state_idx;
              : }
does this method access any member variables of Coordinator? if not, consider 
refactoring so that it's a local helper method.


PS4, Line 2022: void Coordinator::CollectTupleIds(const TPlanFragment& fragment,
similarly here, consider moving out of Coordinator. Hopefully we'll find a way 
to factor out some of this code into a separate module to keep this file 
manageable.


http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

PS4, Line 271: boost
let's prefer std:: where possible.


PS4, Line 271: void CollectTupleIds(const TPlanFragment& fragment, 
boost::unordered_set<TTupleId>* tuple_ids);
             : 
             :   void CollectOutputSinkTableIds(const TPlanFragment& fragment,
             :       boost::unordered_set<TTableId>* table_ids);
             : 
             :   void PopulateSharedDescriptorTable(
             :     const boost::unordered_set<TTupleId>& tuple_ids,
             :     const boost::unordered_set<TTableId>& sink_table_ids,
             :     TQueryCtx* query_ctx);
as mentioned in .cc, these methods don't seem like they belong to Coordinator.


http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/util/counting-barrier.h
File be/src/util/counting-barrier.h:

PS4, Line 81: NotifyBarrierOnExit& operator=(const NotifyBarrierOnExit&) = 
delete;
            :   NotifyBarrierOnExit& operator=(NotifyBarrierOnExit&&) = delete;
we use DISALLOW_COPY_AND_ASSIGN usually.


-- 
To view, visit http://gerrit.cloudera.org:8080/3390
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
Gerrit-PatchSet: 4
Gerrit-Project: Impala
Gerrit-Branch: cdh5-trunk
Gerrit-Owner: Lars Volker <[email protected]>
Gerrit-Reviewer: Henry Robinson <[email protected]>
Gerrit-Reviewer: Marcel Kornacker <[email protected]>
Gerrit-Reviewer: Tim Armstrong <[email protected]>
Gerrit-HasComments: Yes

Reply via email to