Lars Volker has posted comments on this change.

Change subject: PREVIEW IMPALA-2550: RPC batching
......................................................................


Patch Set 4:

(13 comments)

Thanks for the review. Addressed your comments in PS5.

http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS4, Line 422: const FragmentExecParams& fragment_exec_params = 
(*schedule.exec_params())[0];
             :     TQueryCtx query_ctx = query_ctx_;
             :     // TODO is this line needed?
             :     query_ctx.__set_query_id(query_id_);
             :     TPlanFragmentCtx fragment_ctx;
             :     fragment_ctx.__set_fragment(request.fragments[0]);
             :     
fragment_ctx.__set_num_fragment_instances(fragment_exec_params.instance_ids.size());
             :     TPlanFragmentInstanceCtx fragment_instance_ctx;
             :     SetFragmentInstanceCtx(schedule, request.fragments[0], 
fragment_exec_params, 0, 0, 0, coord, &fragment_instance_ctx);
             :     unordered_set<TTupleId> tuple_ids;
             :     CollectTupleIds(request.fragments[0], &tuple_ids);
             :     unordered_set<TTableId> table_ids;
             :     CollectOutputSinkTableIds(request.fragments[0], &table_ids);
             :     PopulateSharedDescriptorTable(tuple_ids, table_ids, 
&query_ctx);
             :     RETURN_IF_ERROR(executor_->Prepare(query_ctx, fragment_ctx, 
fragment_instance_ctx));
> this block is very hard to read (please also note the long line, here and e
Factored out creation of the RPC parameters here. I could not see an obvious 
way to factor out the creation of remote RPC parameters because it is 
intertwined with the filter and coordinator logic so I left it in place there.


PS4, Line 607: RPC's
> nit: RPCs
Done


PS4, Line 612: bind<void>(mem_fn(&Coordinator::ExecRemoteFragments), this,
             :           backend_address,
             :           debug_options,
             :           backend_exec_data,
             :           schedule
> fwiw, I think lambdas are going to be easier to read here:
Done. We need to capture backend_address by value, which might make the code 
slightly harder to read.


PS4, Line 1395: batch_rpc_params
> just rpc_params, I think.
Done


PS4, Line 1404: // Notification barrier
              :   NotifyBarrierOnExit notifier(exec_complete_barrier_.get(),
              :       backend_exec_data->fragment_instances.size());
> please put this back at the top of the method, to avoid bugs creeping in wh
Done


PS4, Line 1424: int instance_state_idx = fragment_instance.instance_state_idx;
              :     int fragment_idx = fragment_instance.fragment_idx;
              :     int fragment_instance_idx = 
fragment_instance.fragment_instance_idx;
> try and keep these declarations as close to their first point of use as pos
Done


PS4, Line 1456: after
> should this comment have said 'before', not 'after'?
Removed.


PS4, Line 1456: // Guard against concurrent UpdateFragmentExecStatus() that may 
arrive after RPC returns.
              :     // TODO How to fix this mess? Use std::unique_lock? Unlock 
the exec_states in
              :     // SetInitialStatus()?
              :     guards.push_back(shared_ptr<lock_guard<mutex>>(new 
lock_guard<mutex>(*exec_state->lock())));
> I agree, we should do something. What would UpdateFragmentExecStatus() acce
Done


PS4, Line 1985: void Coordinator::SetFragmentInstanceCtx(QuerySchedule& 
schedule,
              :     const TPlanFragment& fragment, const FragmentExecParams& 
params,
              :     int instance_state_idx, int fragment_idx, int 
fragment_instance_idx,
              :     const TNetworkAddress& coord,
              :     TPlanFragmentInstanceCtx* fragment_instance_ctx) {
              : 
              :   TNetworkAddress exec_host = 
params.hosts[fragment_instance_idx];
              :   if (schedule.HasReservation()) {
              :     // The reservation has already have been validated at this 
point.
              :     TNetworkAddress resource_hostport;
              :     schedule.GetResourceHostport(exec_host, &resource_hostport);
              :     map<TNetworkAddress, 
llama::TAllocatedResource>::const_iterator it =
              :         
schedule.reservation()->allocated_resources.find(resource_hostport);
              :     // Only set reserved resource if we actually have one for 
this plan
              :     // fragment. Otherwise, don't set it (usually this the 
coordinator fragment), and it
              :     // won't participate in dynamic RM controls.
              :     if (it != 
schedule.reservation()->allocated_resources.end()) {
              :       
fragment_instance_ctx->__set_reserved_resource(it->second);
              :       
fragment_instance_ctx->__set_local_resource_address(resource_hostport);
              :     }
              :   }
              :   FragmentScanRangeAssignment::const_iterator it =
              :       params.scan_range_assignment.find(exec_host);
              :   // Scan ranges may not always be set, so use an empty 
structure if so.
              :   const PerNodeScanRanges& scan_ranges =
              :       (it != params.scan_range_assignment.end()) ? it->second : 
PerNodeScanRanges();
              : 
              :   
fragment_instance_ctx->__set_request_pool(schedule.request_pool());
              :   
fragment_instance_ctx->__set_per_node_scan_ranges(scan_ranges);
              :   
fragment_instance_ctx->__set_per_exch_num_senders(params.per_exch_num_senders);
              :   
fragment_instance_ctx->__set_destinations(params.destinations);
              :   fragment_instance_ctx->__set_sender_id(params.sender_id_base 
+ fragment_instance_idx);
              :   fragment_instance_ctx->fragment_instance_id = 
params.instance_ids[fragment_instance_idx];
              :   fragment_instance_ctx->fragment_instance_idx = 
fragment_instance_idx;
              :   fragment_instance_ctx->instance_state_idx = 
instance_state_idx;
              : }
> does this method access any member variables of Coordinator? if not, consid
Done


PS4, Line 2022: void Coordinator::CollectTupleIds(const TPlanFragment& fragment,
> similarly here, consider moving out of Coordinator. Hopefully we'll find a 
Done


http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

PS4, Line 271: boost
> let's prefer std:: where possible.
Done


PS4, Line 271: void CollectTupleIds(const TPlanFragment& fragment, 
boost::unordered_set<TTupleId>* tuple_ids);
             : 
             :   void CollectOutputSinkTableIds(const TPlanFragment& fragment,
             :       boost::unordered_set<TTableId>* table_ids);
             : 
             :   void PopulateSharedDescriptorTable(
             :     const boost::unordered_set<TTupleId>& tuple_ids,
             :     const boost::unordered_set<TTableId>& sink_table_ids,
             :     TQueryCtx* query_ctx);
> as mentioned in .cc, these methods don't seem like they belong to Coordinat
Done


http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/util/counting-barrier.h
File be/src/util/counting-barrier.h:

PS4, Line 81: NotifyBarrierOnExit& operator=(const NotifyBarrierOnExit&) = 
delete;
            :   NotifyBarrierOnExit& operator=(NotifyBarrierOnExit&&) = delete;
> we use DISALLOW_COPY_AND_ASSIGN usually.
Done


-- 
To view, visit http://gerrit.cloudera.org:8080/3390
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
Gerrit-PatchSet: 4
Gerrit-Project: Impala
Gerrit-Branch: cdh5-trunk
Gerrit-Owner: Lars Volker <[email protected]>
Gerrit-Reviewer: Henry Robinson <[email protected]>
Gerrit-Reviewer: Lars Volker <[email protected]>
Gerrit-Reviewer: Marcel Kornacker <[email protected]>
Gerrit-Reviewer: Tim Armstrong <[email protected]>
Gerrit-HasComments: Yes

Reply via email to