Lars Volker has posted comments on this change. Change subject: PREVIEW IMPALA-2550: RPC batching ......................................................................
Patch Set 4: (13 comments) Thanks for the review. Addressed your comments in PS5. http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: PS4, Line 422: const FragmentExecParams& fragment_exec_params = (*schedule.exec_params())[0]; : TQueryCtx query_ctx = query_ctx_; : // TODO is this line needed? : query_ctx.__set_query_id(query_id_); : TPlanFragmentCtx fragment_ctx; : fragment_ctx.__set_fragment(request.fragments[0]); : fragment_ctx.__set_num_fragment_instances(fragment_exec_params.instance_ids.size()); : TPlanFragmentInstanceCtx fragment_instance_ctx; : SetFragmentInstanceCtx(schedule, request.fragments[0], fragment_exec_params, 0, 0, 0, coord, &fragment_instance_ctx); : unordered_set<TTupleId> tuple_ids; : CollectTupleIds(request.fragments[0], &tuple_ids); : unordered_set<TTableId> table_ids; : CollectOutputSinkTableIds(request.fragments[0], &table_ids); : PopulateSharedDescriptorTable(tuple_ids, table_ids, &query_ctx); : RETURN_IF_ERROR(executor_->Prepare(query_ctx, fragment_ctx, fragment_instance_ctx)); > this block is very hard to read (please also note the long line, here and e Factored out creation of the RPC parameters here. I could not see an obvious way to factor out the creation of remote RPC parameters because it is intertwined with the filter and coordinator logic so I left it in place there. PS4, Line 607: RPC's > nit: RPCs Done PS4, Line 612: bind<void>(mem_fn(&Coordinator::ExecRemoteFragments), this, : backend_address, : debug_options, : backend_exec_data, : schedule > fwiw, I think lambdas are going to be easier to read here: Done. We need to capture backend_address by value, which might make the code slightly harder to read. PS4, Line 1395: batch_rpc_params > just rpc_params, I think. Done PS4, Line 1404: // Notification barrier : NotifyBarrierOnExit notifier(exec_complete_barrier_.get(), : backend_exec_data->fragment_instances.size()); > please put this back at the top of the method, to avoid bugs creeping in wh Done PS4, Line 1424: int instance_state_idx = fragment_instance.instance_state_idx; : int fragment_idx = fragment_instance.fragment_idx; : int fragment_instance_idx = fragment_instance.fragment_instance_idx; > try and keep these declarations as close to their first point of use as pos Done PS4, Line 1456: after > should this comment have said 'before', not 'after'? Removed. PS4, Line 1456: // Guard against concurrent UpdateFragmentExecStatus() that may arrive after RPC returns. : // TODO How to fix this mess? Use std::unique_lock? Unlock the exec_states in : // SetInitialStatus()? : guards.push_back(shared_ptr<lock_guard<mutex>>(new lock_guard<mutex>(*exec_state->lock()))); > I agree, we should do something. What would UpdateFragmentExecStatus() acce Done PS4, Line 1985: void Coordinator::SetFragmentInstanceCtx(QuerySchedule& schedule, : const TPlanFragment& fragment, const FragmentExecParams& params, : int instance_state_idx, int fragment_idx, int fragment_instance_idx, : const TNetworkAddress& coord, : TPlanFragmentInstanceCtx* fragment_instance_ctx) { : : TNetworkAddress exec_host = params.hosts[fragment_instance_idx]; : if (schedule.HasReservation()) { : // The reservation has already have been validated at this point. : TNetworkAddress resource_hostport; : schedule.GetResourceHostport(exec_host, &resource_hostport); : map<TNetworkAddress, llama::TAllocatedResource>::const_iterator it = : schedule.reservation()->allocated_resources.find(resource_hostport); : // Only set reserved resource if we actually have one for this plan : // fragment. Otherwise, don't set it (usually this the coordinator fragment), and it : // won't participate in dynamic RM controls. : if (it != schedule.reservation()->allocated_resources.end()) { : fragment_instance_ctx->__set_reserved_resource(it->second); : fragment_instance_ctx->__set_local_resource_address(resource_hostport); : } : } : FragmentScanRangeAssignment::const_iterator it = : params.scan_range_assignment.find(exec_host); : // Scan ranges may not always be set, so use an empty structure if so. : const PerNodeScanRanges& scan_ranges = : (it != params.scan_range_assignment.end()) ? it->second : PerNodeScanRanges(); : : fragment_instance_ctx->__set_request_pool(schedule.request_pool()); : fragment_instance_ctx->__set_per_node_scan_ranges(scan_ranges); : fragment_instance_ctx->__set_per_exch_num_senders(params.per_exch_num_senders); : fragment_instance_ctx->__set_destinations(params.destinations); : fragment_instance_ctx->__set_sender_id(params.sender_id_base + fragment_instance_idx); : fragment_instance_ctx->fragment_instance_id = params.instance_ids[fragment_instance_idx]; : fragment_instance_ctx->fragment_instance_idx = fragment_instance_idx; : fragment_instance_ctx->instance_state_idx = instance_state_idx; : } > does this method access any member variables of Coordinator? if not, consid Done PS4, Line 2022: void Coordinator::CollectTupleIds(const TPlanFragment& fragment, > similarly here, consider moving out of Coordinator. Hopefully we'll find a Done http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: PS4, Line 271: boost > let's prefer std:: where possible. Done PS4, Line 271: void CollectTupleIds(const TPlanFragment& fragment, boost::unordered_set<TTupleId>* tuple_ids); : : void CollectOutputSinkTableIds(const TPlanFragment& fragment, : boost::unordered_set<TTableId>* table_ids); : : void PopulateSharedDescriptorTable( : const boost::unordered_set<TTupleId>& tuple_ids, : const boost::unordered_set<TTableId>& sink_table_ids, : TQueryCtx* query_ctx); > as mentioned in .cc, these methods don't seem like they belong to Coordinat Done http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/util/counting-barrier.h File be/src/util/counting-barrier.h: PS4, Line 81: NotifyBarrierOnExit& operator=(const NotifyBarrierOnExit&) = delete; : NotifyBarrierOnExit& operator=(NotifyBarrierOnExit&&) = delete; > we use DISALLOW_COPY_AND_ASSIGN usually. Done -- To view, visit http://gerrit.cloudera.org:8080/3390 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e Gerrit-PatchSet: 4 Gerrit-Project: Impala Gerrit-Branch: cdh5-trunk Gerrit-Owner: Lars Volker <[email protected]> Gerrit-Reviewer: Henry Robinson <[email protected]> Gerrit-Reviewer: Lars Volker <[email protected]> Gerrit-Reviewer: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Tim Armstrong <[email protected]> Gerrit-HasComments: Yes
