IMPALA-3502: Fix race in the coordinator while updating filter routing table
This commit fixes an issue where a fragment may send an UpdateFilter message to the coordinator while the latter is still updating the runtime filter routing table. The fix is to decouple the update of the filter routing table from starting the fragments. With this fix, the coordinator will finish populating the filter routing table before it starts any remote fragments. Change-Id: Iecc737106fd38aa4af0c72959a577adfb413728d Reviewed-on: http://gerrit.cloudera.org:8080/3018 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/eaa39264 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/eaa39264 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/eaa39264 Branch: refs/heads/master Commit: eaa3926452bcba3c273c4e7e3c33a8828c63d647 Parents: 02d3e93 Author: Dimitris Tsirogiannis <[email protected]> Authored: Tue May 10 14:35:50 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Thu May 12 14:18:04 2016 -0700 ---------------------------------------------------------------------- be/src/runtime/coordinator.cc | 50 +++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eaa39264/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 2af596c..50714ea 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -420,8 +420,10 @@ Status Coordinator::Exec(QuerySchedule& schedule, // chance to register with the stream mgr. // TODO: This is no longer necessary (see IMPALA-1599). Consider starting all // fragments in the same way with no coordinator special case. - UpdateFilterRoutingTable(request.fragments[0].plan.nodes, 1, 0); - if (schedule.num_fragment_instances() == 0) MarkFilterRoutingTableComplete(); + if (filter_mode_ != TRuntimeFilterMode::OFF) { + UpdateFilterRoutingTable(request.fragments[0].plan.nodes, 1, 0); + if (schedule.num_fragment_instances() == 0) MarkFilterRoutingTableComplete(); + } TExecPlanFragmentParams rpc_params; SetExecPlanFragmentParams(schedule, request.fragments[0], (*schedule.exec_params())[0], 0, 0, 0, coord, &rpc_params); @@ -480,6 +482,8 @@ Status Coordinator::Exec(QuerySchedule& schedule, void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes, int num_hosts, int start_fragment_instance_idx) { + DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) + << "UpdateFilterRoutingTable() called although runtime filters are disabled"; DCHECK(!filter_routing_table_complete_) << "UpdateFilterRoutingTable() called after setting filter_routing_table_complete_"; for (const TPlanNode& plan_node: plan_nodes) { @@ -542,24 +546,35 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) { int fragment_instance_idx = 0; bool has_coordinator_fragment = request.fragments[0].partition.type == TPartitionType::UNPARTITIONED; + int first_remote_fragment_idx = has_coordinator_fragment ? 1 : 0; + if (filter_mode_ != TRuntimeFilterMode::OFF) { + // Populate the runtime filter routing table. This should happen before + // starting the remote fragments. + for (int fragment_idx = first_remote_fragment_idx; + fragment_idx < request.fragments.size(); ++fragment_idx) { + const FragmentExecParams* params = &(*schedule->exec_params())[fragment_idx]; + int num_hosts = params->hosts.size(); + DCHECK_GT(num_hosts, 0); + UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, num_hosts, + fragment_instance_idx); + fragment_instance_idx += num_hosts; + } + MarkFilterRoutingTableComplete(); + } + + fragment_instance_idx = 0; // Start one fragment instance per fragment per host (number of hosts running each // fragment may not be constant). - for (int fragment_idx = (has_coordinator_fragment ? 1 : 0); + for (int fragment_idx = first_remote_fragment_idx; fragment_idx < request.fragments.size(); ++fragment_idx) { const FragmentExecParams* params = &(*schedule->exec_params())[fragment_idx]; int num_hosts = params->hosts.size(); DCHECK_GT(num_hosts, 0); - - if (filter_mode_ != TRuntimeFilterMode::OFF) { - UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, num_hosts, - fragment_instance_idx); - } - fragment_profiles_[fragment_idx].num_instances = num_hosts; // Start one fragment instance for every fragment_instance required by the // schedule. Each fragment instance is assigned a unique ID, numbered from 0, with // instances for fragment ID 0 being assigned IDs [0 .. num_hosts(fragment_id_0)] and - // so on. This enumeration scheme is relied upon by UpdateFilterRoutingTable(). + // so on. for (int per_fragment_instance_idx = 0; per_fragment_instance_idx < num_hosts; ++per_fragment_instance_idx) { DebugOptions* fragment_instance_debug_options = @@ -575,7 +590,6 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) { per_fragment_instance_idx)); } } - MarkFilterRoutingTableComplete(); exec_complete_barrier_->Wait(); query_events_->MarkEvent( Substitute("All $0 remote fragments started", fragment_instance_idx)); @@ -656,12 +670,12 @@ string Coordinator::FilterDebugString() { } void Coordinator::MarkFilterRoutingTableComplete() { - if (filter_mode_ != TRuntimeFilterMode::OFF) { - query_profile_->AddInfoString( - "Number of filters", Substitute("$0", filter_routing_table_.size())); - query_profile_->AddInfoString("Filter routing table", FilterDebugString()); - if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString(); - } + DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) + << "MarkFilterRoutingTableComplete() called although runtime filters are disabled"; + query_profile_->AddInfoString( + "Number of filters", Substitute("$0", filter_routing_table_.size())); + query_profile_->AddInfoString("Filter routing table", FilterDebugString()); + if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString(); filter_routing_table_complete_ = true; } @@ -1984,6 +1998,8 @@ void DistributeFilters(shared_ptr<TPublishFilterParams> params, TNetworkAddress } void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { + DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF) + << "UpdateFilter() called although runtime filters are disabled"; DCHECK(exec_complete_barrier_.get() != NULL) << "Filters received before fragments started!"; exec_complete_barrier_->Wait();
