IMPALA-3502: Fix race in the coordinator while updating filter routing
table

This commit fixes an issue where a fragment may send an
UpdateFilter message to the coordinator while the latter is still
updating the runtime filter routing table. The fix is to decouple the
update of the filter routing table from starting the fragments. With
this fix, the coordinator will finish populating the filter routing
table before it starts any remote fragments.

Change-Id: Iecc737106fd38aa4af0c72959a577adfb413728d
Reviewed-on: http://gerrit.cloudera.org:8080/3018
Reviewed-by: Dimitris Tsirogiannis <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/eaa39264
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/eaa39264
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/eaa39264

Branch: refs/heads/master
Commit: eaa3926452bcba3c273c4e7e3c33a8828c63d647
Parents: 02d3e93
Author: Dimitris Tsirogiannis <[email protected]>
Authored: Tue May 10 14:35:50 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Thu May 12 14:18:04 2016 -0700

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc | 50 +++++++++++++++++++++++++-------------
 1 file changed, 33 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eaa39264/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 2af596c..50714ea 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -420,8 +420,10 @@ Status Coordinator::Exec(QuerySchedule& schedule,
     // chance to register with the stream mgr.
     // TODO: This is no longer necessary (see IMPALA-1599). Consider starting 
all
     // fragments in the same way with no coordinator special case.
-    UpdateFilterRoutingTable(request.fragments[0].plan.nodes, 1, 0);
-    if (schedule.num_fragment_instances() == 0) 
MarkFilterRoutingTableComplete();
+    if (filter_mode_ != TRuntimeFilterMode::OFF) {
+      UpdateFilterRoutingTable(request.fragments[0].plan.nodes, 1, 0);
+      if (schedule.num_fragment_instances() == 0) 
MarkFilterRoutingTableComplete();
+    }
     TExecPlanFragmentParams rpc_params;
     SetExecPlanFragmentParams(schedule, request.fragments[0],
         (*schedule.exec_params())[0], 0, 0, 0, coord, &rpc_params);
@@ -480,6 +482,8 @@ Status Coordinator::Exec(QuerySchedule& schedule,
 
 void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
     int num_hosts, int start_fragment_instance_idx) {
+  DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
+      << "UpdateFilterRoutingTable() called although runtime filters are 
disabled";
   DCHECK(!filter_routing_table_complete_)
       << "UpdateFilterRoutingTable() called after setting 
filter_routing_table_complete_";
   for (const TPlanNode& plan_node: plan_nodes) {
@@ -542,24 +546,35 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* 
schedule) {
   int fragment_instance_idx = 0;
   bool has_coordinator_fragment =
       request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
+  int first_remote_fragment_idx = has_coordinator_fragment ? 1 : 0;
+  if (filter_mode_ != TRuntimeFilterMode::OFF) {
+    // Populate the runtime filter routing table. This should happen before
+    // starting the remote fragments.
+    for (int fragment_idx = first_remote_fragment_idx;
+         fragment_idx < request.fragments.size(); ++fragment_idx) {
+      const FragmentExecParams* params = 
&(*schedule->exec_params())[fragment_idx];
+      int num_hosts = params->hosts.size();
+      DCHECK_GT(num_hosts, 0);
+      UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, 
num_hosts,
+          fragment_instance_idx);
+      fragment_instance_idx += num_hosts;
+    }
+    MarkFilterRoutingTableComplete();
+  }
+
+  fragment_instance_idx = 0;
   // Start one fragment instance per fragment per host (number of hosts 
running each
   // fragment may not be constant).
-  for (int fragment_idx = (has_coordinator_fragment ? 1 : 0);
+  for (int fragment_idx = first_remote_fragment_idx;
        fragment_idx < request.fragments.size(); ++fragment_idx) {
     const FragmentExecParams* params = 
&(*schedule->exec_params())[fragment_idx];
     int num_hosts = params->hosts.size();
     DCHECK_GT(num_hosts, 0);
-
-    if (filter_mode_ != TRuntimeFilterMode::OFF) {
-      UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, 
num_hosts,
-          fragment_instance_idx);
-    }
-
     fragment_profiles_[fragment_idx].num_instances = num_hosts;
     // Start one fragment instance for every fragment_instance required by the
     // schedule. Each fragment instance is assigned a unique ID, numbered from 
0, with
     // instances for fragment ID 0 being assigned IDs [0 .. 
num_hosts(fragment_id_0)] and
-    // so on. This enumeration scheme is relied upon by 
UpdateFilterRoutingTable().
+    // so on.
     for (int per_fragment_instance_idx = 0; per_fragment_instance_idx < 
num_hosts;
          ++per_fragment_instance_idx) {
       DebugOptions* fragment_instance_debug_options =
@@ -575,7 +590,6 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* 
schedule) {
               per_fragment_instance_idx));
     }
   }
-  MarkFilterRoutingTableComplete();
   exec_complete_barrier_->Wait();
   query_events_->MarkEvent(
       Substitute("All $0 remote fragments started", fragment_instance_idx));
@@ -656,12 +670,12 @@ string Coordinator::FilterDebugString() {
 }
 
 void Coordinator::MarkFilterRoutingTableComplete() {
-  if (filter_mode_ != TRuntimeFilterMode::OFF) {
-    query_profile_->AddInfoString(
-        "Number of filters", Substitute("$0", filter_routing_table_.size()));
-    query_profile_->AddInfoString("Filter routing table", FilterDebugString());
-    if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString();
-  }
+  DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
+      << "MarkFilterRoutingTableComplete() called although runtime filters are 
disabled";
+  query_profile_->AddInfoString(
+      "Number of filters", Substitute("$0", filter_routing_table_.size()));
+  query_profile_->AddInfoString("Filter routing table", FilterDebugString());
+  if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString();
   filter_routing_table_complete_ = true;
 }
 
@@ -1984,6 +1998,8 @@ void DistributeFilters(shared_ptr<TPublishFilterParams> 
params, TNetworkAddress
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
+  DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
+      << "UpdateFilter() called although runtime filters are disabled";
   DCHECK(exec_complete_barrier_.get() != NULL)
       << "Filters received before fragments started!";
   exec_complete_barrier_->Wait();

Reply via email to