Matthew Jacobs has posted comments on this change. Change subject: IMPALA-3902: Scheduler improvements for running multiple fragment instances on a single backend ......................................................................
Patch Set 4: (11 comments) I made my through most of scheduling/query-schedule but have a bunch of meetings so will need to context switch and figured I'd give this feedback so far. I'll pick up later from Coordinator::Exec(). http://gerrit.cloudera.org:8080/#/c/4054/4/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: PS4, Line 218: const TNetworkAddress local_addr_; Can you use ExecEnv::backend_address()? Or find some way to avoid the duplication. Grepping for "MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)" it looks like we create the same thing in a bunch of places, it'd be nice to consolidate or at least avoid adding another member here. PS4, Line 388: fragment_instance_state_idxs Is this referencing the "fragment id (TPlanFragment.id)"? If so, please say so and change idx -> id for consistency. http://gerrit.cloudera.org:8080/#/c/4054/4/be/src/scheduling/query-schedule.cc File be/src/scheduling/query-schedule.cc: PS4, Line 121: std::sort(fragments.begin(), fragments.end(), : [](const TPlanFragment* a, const TPlanFragment* b) { return a->id < b->id; }); shouldn't these come in an expected order? The ids are set in Frontend.mtCreateExecRequest() in the same way you iterate over them above: + // assign fragment ids + int id = 0; + for (TPlanExecInfo planExecInfo: result.mt_plan_exec_info) { + for (TPlanFragment fragment: planExecInfo.fragments) fragment.setId(id++); + } http://gerrit.cloudera.org:8080/#/c/4054/4/be/src/scheduling/query-schedule.h File be/src/scheduling/query-schedule.h: Line 307: comment, e.g. what state in MtFragmentExecParams gets initialized here vs. what gets added later by SimpleScheduler http://gerrit.cloudera.org:8080/#/c/4054/4/be/src/scheduling/simple-scheduler.cc File be/src/scheduling/simple-scheduler.cc: PS4, Line 425: f can you change f indexes here to match the code? it's explained as if it starts at 1 as we do in other places. PS4, Line 433: const TDataStreamSink& sink = src_fragment.output_sink.stream_sink; : DCHECK( : sink.output_partition.type == TPartitionType::UNPARTITIONED : || sink.output_partition.type == TPartitionType::HASH_PARTITIONED : || sink.output_partition.type == TPartitionType::RANDOM); move this closer to where it's used on l453 PS4, Line 541: while (params_idx < params_list.size() : && total_assigned_bytes < threshold_total_bytes) { It's not obvious to me why this will always assign all params. If the last fragment instance gets assigned >= threshold_total_bytes but params_idx != params_list.size(), any params > params_idx won't get assigned. If I'm just missing something, can you add a brief comment to make it a bit more clear? PS4, Line 933: if (!FLAGS_disable_admission_control) { : RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule)); : } : if (!FLAGS_enable_rm) return Status::OK(); : : string user = GetEffectiveUser(schedule->request().query_ctx.session); : if (user.empty()) user = "default"; : schedule->PrepareReservationRequest(resolved_pool, user); : const TResourceBrokerReservationRequest& reservation_request = : schedule->reservation_request(); : if (!reservation_request.resources.empty()) { : Status status = resource_broker_->Reserve( : reservation_request, schedule->reservation()); : if (!status.ok()) { : // Warn about missing table and/or column stats if necessary. : const TQueryCtx& query_ctx = schedule->request().query_ctx; : if(!query_ctx.__isset.parent_query_id && : query_ctx.__isset.tables_missing_stats && : !query_ctx.tables_missing_stats.empty()) { : status.AddDetail(GetTablesMissingStatsWarning(query_ctx.tables_missing_stats)); : } : return status; : } : RETURN_IF_ERROR(schedule->ValidateReservation()); : AddToActiveResourceMaps(*schedule->reservation(), coord); : } while the llama related code will be removed soon, I think you should still have this go through admission control. You can move all of this out of the if (is_mt_exec)/else block. Or if there's some reason you're avoiding some of this code, you should be able to at least copy the call into admission control (l933 to l935) after l928. http://gerrit.cloudera.org:8080/#/c/4054/4/be/src/scheduling/simple-scheduler.h File be/src/scheduling/simple-scheduler.h: PS4, Line 499: MtComputeFragmentExecParams The separation between what structures are set/updated in QuerySchedule and which of QuerySchedule's state/structures gets set up here is confusing. It'd be nice to refactor some of this. E.g. it makes sense to compute the scan range assignment here, but maybe most of this fn (and what it calls) could be moved into QuerySchedule. If that's too disruptive for this change, let's leave a TODO. PS4, Line 504: MtComputeFragmentExecParams It'd be nice to have a different and more specific name, e.g. MtCreateFInstanceExecParams() http://gerrit.cloudera.org:8080/#/c/4054/4/be/src/service/query-exec-state.cc File be/src/service/query-exec-state.cc: PS4, Line 439: // TODO-MT: if exec_request_.query_options.mt_dop isn't set, set it to the pool : // default, if any This should already happen in impala-{hs2,beeswax}-server when the query comes in, they call ImpalaServer::AddPoolQueryOptions. -- To view, visit http://gerrit.cloudera.org:8080/4054 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I240445199e22f009f4e72fdb8754eb8d77e3d680 Gerrit-PatchSet: 4 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Alex Behm <[email protected]> Gerrit-Reviewer: Dan Hecht <[email protected]> Gerrit-Reviewer: Henry Robinson <[email protected]> Gerrit-Reviewer: Lars Volker <[email protected]> Gerrit-Reviewer: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Matthew Jacobs <[email protected]> Gerrit-HasComments: Yes
