IMPALA-4494: Fix crash in SimpleScheduler

The scheduler maintains a local list of active backends, which is
updated through messages from the statestore. Even the local backend
enters this list by registering with the statestore and being included
in a statestore update message. Thus, during restarts it can happen that
a query gets scheduled with exec_at_coord set to true, while the local
backend has not been registered with the scheduler. In this case the IP
address lookup in the internal BackendConfig fails and an empty IP
address is returned, leading to a nullptr dereference down the line.

This change adds an additional check when handling updates from the
statestore to make sure that the backend config always contains the
local backend. It also changes scheduling when exec_at_coord is true to
always use the local backend, irrespective of whether it is present in
the backend config.

Change-Id: I6e1196a2fa47e5954c4a190aa326c135d039a77f
Reviewed-on: http://gerrit.cloudera.org:8080/5127
Reviewed-by: Bharath Vissapragada <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/96d98abf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/96d98abf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/96d98abf

Branch: refs/heads/master
Commit: 96d98abff52e59742469f9d5a86018506e00f88f
Parents: 1fea997
Author: Lars Volker <[email protected]>
Authored: Thu Nov 17 18:09:10 2016 +0100
Committer: Internal Jenkins <[email protected]>
Committed: Thu Nov 24 04:11:25 2016 +0000

----------------------------------------------------------------------
 be/src/scheduling/simple-scheduler-test.cc |  81 ++++++++---
 be/src/scheduling/simple-scheduler.cc      | 170 +++++++++++++-----------
 be/src/scheduling/simple-scheduler.h       |   4 +
 3 files changed, 158 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/96d98abf/be/src/scheduling/simple-scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test.cc 
b/be/src/scheduling/simple-scheduler-test.cc
index 38638ff..3c2c83b 100644
--- a/be/src/scheduling/simple-scheduler-test.cc
+++ b/be/src/scheduling/simple-scheduler-test.cc
@@ -293,11 +293,11 @@ TEST_F(SchedulerTest, TestDisableCachedReads) {
 /// behavior. Remove.
 TEST_F(SchedulerTest, EmptyStatestoreMessage) {
   Cluster cluster;
+  cluster.AddHosts(2, true, true);
   cluster.AddHosts(3, false, true);
-  cluster.AddHosts(2, true, false);
 
   Schema schema(cluster);
-  schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::RANDOM, 3);
+  schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::REMOTE_ONLY, 3);
 
   Plan plan(schema);
   plan.AddTableScan("T1");
@@ -307,18 +307,18 @@ TEST_F(SchedulerTest, EmptyStatestoreMessage) {
 
   scheduler.Compute(&result);
   EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(3));
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(4));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(4));
   result.Reset();
 
   scheduler.SendEmptyUpdate();
   scheduler.Compute(&result);
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(3));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(3));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(4));
 }
 
@@ -330,7 +330,7 @@ TEST_F(SchedulerTest, TestSendUpdates) {
   for (int i=0; i < 3; ++i) cluster.AddHost(i < 2, true);
 
   Schema schema(cluster);
-  schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::RANDOM, 3);
+  schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::REMOTE_ONLY, 1);
 
   Plan plan(schema);
   plan.AddTableScan("T1");
@@ -339,29 +339,31 @@ TEST_F(SchedulerTest, TestSendUpdates) {
   SchedulerWrapper scheduler(plan);
 
   scheduler.Compute(&result);
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(0));
-  EXPECT_EQ(0, result.NumCachedAssignedBytes(0));
-  EXPECT_EQ(0, result.NumRemoteAssignedBytes(0));
-  EXPECT_EQ(0, result.NumDiskAssignedBytes(1));
+  // Two backends are registered, so the scheduler will pick a random one.
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1));
 
   // Remove first host from scheduler.
-  scheduler.RemoveBackend(cluster.hosts()[0]);
+  scheduler.RemoveBackend(cluster.hosts()[1]);
   result.Reset();
 
   scheduler.Compute(&result);
-  EXPECT_EQ(0, result.NumDiskAssignedBytes(0));
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(1));
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
 
   // Re-add first host from scheduler.
-  scheduler.AddBackend(cluster.hosts()[0]);
+  scheduler.AddBackend(cluster.hosts()[1]);
   result.Reset();
 
   scheduler.Compute(&result);
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(0));
-  EXPECT_EQ(0, result.NumDiskAssignedBytes(1));
+  // Two backends are registered, so the scheduler will pick a random one.
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
 }
 
 /// IMPALA-4329: Test scheduling with no backends.
+/// With the fix for IMPALA-4494, the scheduler will always register its local 
backend
+/// with itself, so scheduling with no backends will still succeed.
 TEST_F(SchedulerTest, TestEmptyBackendConfig) {
   Cluster cluster;
   cluster.AddHost(false, true);
@@ -375,9 +377,48 @@ TEST_F(SchedulerTest, TestEmptyBackendConfig) {
   Result result(plan);
   SchedulerWrapper scheduler(plan);
   Status status = scheduler.Compute(&result);
-  EXPECT_TRUE(!status.ok());
-  EXPECT_EQ(
-      status.GetDetail(), "Cannot schedule query: no registered backends 
available.\n");
+  EXPECT_TRUE(status.ok());
+}
+
+/// IMPALA-4494: Test scheduling with no backends but exec_at_coord.
+TEST_F(SchedulerTest, TestExecAtCoordWithEmptyBackendConfig) {
+  Cluster cluster;
+  cluster.AddHost(false, true);
+
+  Schema schema(cluster);
+  schema.AddMultiBlockTable("T", 1, ReplicaPlacement::REMOTE_ONLY, 1);
+
+  Plan plan(schema);
+  plan.AddTableScan("T");
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+  bool exec_at_coord = true;
+  Status status = scheduler.Compute(exec_at_coord, &result);
+  EXPECT_TRUE(status.ok());
+}
+
+/// IMPALA-4494: Test exec_at_coord while local backend is not registered with 
itself.
+TEST_F(SchedulerTest, TestExecAtCoordWithoutLocalBackend) {
+  Cluster cluster;
+  cluster.AddHosts(3, true, true);
+
+  Schema schema(cluster);
+  schema.AddMultiBlockTable("T", 1, ReplicaPlacement::LOCAL_ONLY, 1);
+
+  Plan plan(schema);
+  plan.AddTableScan("T");
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+
+  // Remove first host from scheduler. By convention this is the coordinator. 
The
+  // scheduler will ignore this and successfully assign the scan.
+  scheduler.RemoveBackend(cluster.hosts()[0]);
+
+  bool exec_at_coord = true;
+  Status status = scheduler.Compute(exec_at_coord, &result);
+  EXPECT_TRUE(status.ok());
 }
 
 }  // end namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/96d98abf/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc 
b/be/src/scheduling/simple-scheduler.cc
index 020c67c..1c8af72 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -131,6 +131,8 @@ Status SimpleScheduler::Init() {
   local_backend_descriptor_.ip_address = ip;
   LOG(INFO) << "Simple-scheduler using " << ip << " as IP address";
 
+  coord_only_backend_config_.AddBackend(local_backend_descriptor_);
+
   if (webserver_ != NULL) {
     Webserver::UrlCallback backends_callback =
         bind<void>(mem_fn(&SimpleScheduler::BackendsUrlCallback), this, _1, 
_2);
@@ -198,88 +200,98 @@ void SimpleScheduler::UpdateMembership(
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
       incoming_topic_deltas.find(IMPALA_MEMBERSHIP_TOPIC);
 
-  if (topic != incoming_topic_deltas.end()) {
-    const TTopicDelta& delta = topic->second;
+  if (topic == incoming_topic_deltas.end()) return;
+  const TTopicDelta& delta = topic->second;
 
-    // This function needs to handle both delta and non-delta updates. To 
minimize the
-    // time needed to hold locks, all updates are applied to a copy of 
backend_config_,
-    // which is then swapped into place atomically.
-    std::shared_ptr<BackendConfig> new_backend_config;
+  // If the delta transmitted by the statestore is empty we can skip processing
+  // altogether and avoid making a copy of backend_config_.
+  if (delta.is_delta && delta.topic_entries.empty() && 
delta.topic_deletions.empty()) {
+    return;
+  }
 
-    if (!delta.is_delta) {
-      current_membership_.clear();
-      new_backend_config = std::make_shared<BackendConfig>();
-    } else {
-      // Make a copy
-      lock_guard<mutex> lock(backend_config_lock_);
-      new_backend_config = std::make_shared<BackendConfig>(*backend_config_);
-    }
+  // This function needs to handle both delta and non-delta updates. To 
minimize the
+  // time needed to hold locks, all updates are applied to a copy of 
backend_config_,
+  // which is then swapped into place atomically.
+  std::shared_ptr<BackendConfig> new_backend_config;
 
-    // Process new entries to the topic
-    for (const TTopicItem& item: delta.topic_entries) {
-      TBackendDescriptor be_desc;
-      // Benchmarks have suggested that this method can deserialize
-      // ~10m messages per second, so no immediate need to consider 
optimization.
-      uint32_t len = item.value.size();
-      Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
-          item.value.data()), &len, false, &be_desc);
-      if (!status.ok()) {
-        VLOG(2) << "Error deserializing membership topic item with key: " << 
item.key;
-        continue;
-      }
-      if (be_desc.ip_address.empty()) {
-        // Each scheduler resolves its hostname locally in 
SimpleScheduler::Init() and
-        // adds the IP address to local_backend_descriptor_. If it is empty, 
then either
-        // that code has been changed, or someone else is sending malformed 
packets.
-        VLOG(1) << "Ignoring subscription request with empty IP address from 
subscriber: "
-            << be_desc.address;
-        continue;
-      }
-      if (item.key == local_backend_id_
-          && be_desc.address != local_backend_descriptor_.address) {
-        // Someone else has registered this subscriber ID with a different 
address. We
-        // will try to re-register (i.e. overwrite their subscription), but 
there is
-        // likely a configuration problem.
-        LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from 
address: "
-            << be_desc.address;
-      }
-
-      new_backend_config->AddBackend(be_desc);
-      current_membership_.insert(make_pair(item.key, be_desc));
+  if (!delta.is_delta) {
+    current_membership_.clear();
+    new_backend_config = std::make_shared<BackendConfig>();
+  } else {
+    // Make a copy
+    lock_guard<mutex> lock(backend_config_lock_);
+    new_backend_config = std::make_shared<BackendConfig>(*backend_config_);
+  }
+
+  // Process new entries to the topic
+  for (const TTopicItem& item : delta.topic_entries) {
+    TBackendDescriptor be_desc;
+    // Benchmarks have suggested that this method can deserialize
+    // ~10m messages per second, so no immediate need to consider optimization.
+    uint32_t len = item.value.size();
+    Status status = DeserializeThriftMsg(
+        reinterpret_cast<const uint8_t*>(item.value.data()), &len, false, 
&be_desc);
+    if (!status.ok()) {
+      VLOG(2) << "Error deserializing membership topic item with key: " << 
item.key;
+      continue;
     }
-    // Process deletions from the topic
-    for (const string& backend_id: delta.topic_deletions) {
-      if (current_membership_.find(backend_id) != current_membership_.end()) {
-        new_backend_config->RemoveBackend(current_membership_[backend_id]);
-        current_membership_.erase(backend_id);
-      }
+    if (be_desc.ip_address.empty()) {
+      // Each scheduler resolves its hostname locally in 
SimpleScheduler::Init() and
+      // adds the IP address to local_backend_descriptor_. If it is empty, 
then either
+      // that code has been changed, or someone else is sending malformed 
packets.
+      VLOG(1) << "Ignoring subscription request with empty IP address from 
subscriber: "
+              << be_desc.address;
+      continue;
     }
-    SetBackendConfig(new_backend_config);
-
-    // If this impalad is not in our view of the membership list, we should 
add it and
-    // tell the statestore.
-    if (current_membership_.find(local_backend_id_) == 
current_membership_.end()) {
-      VLOG(1) << "Registering local backend with statestore";
-      subscriber_topic_updates->push_back(TTopicDelta());
-      TTopicDelta& update = subscriber_topic_updates->back();
-      update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
-      update.topic_entries.push_back(TTopicItem());
-
-      TTopicItem& item = update.topic_entries.back();
-      item.key = local_backend_id_;
-      Status status = thrift_serializer_.Serialize(
-          &local_backend_descriptor_, &item.value);
-      if (!status.ok()) {
-        LOG(WARNING) << "Failed to serialize Impala backend address for 
statestore topic:"
-                     << " " << status.GetDetail();
-        subscriber_topic_updates->pop_back();
-      }
+    if (item.key == local_backend_id_
+        && be_desc.address != local_backend_descriptor_.address) {
+      // Someone else has registered this subscriber ID with a different 
address. We
+      // will try to re-register (i.e. overwrite their subscription), but 
there is
+      // likely a configuration problem.
+      LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from 
address: "
+                               << be_desc.address;
+      continue;
     }
-    if (metrics_ != NULL) {
-      /// TODO-MT: fix this (do we even need to report it?)
-      num_fragment_instances_metric_->set_value(current_membership_.size());
+    new_backend_config->AddBackend(be_desc);
+    current_membership_.insert(make_pair(item.key, be_desc));
+  }
+
+  // Process deletions from the topic
+  for (const string& backend_id : delta.topic_deletions) {
+    if (current_membership_.find(backend_id) != current_membership_.end()) {
+      new_backend_config->RemoveBackend(current_membership_[backend_id]);
+      current_membership_.erase(backend_id);
     }
   }
+
+  // If the local backend is not in our view of the membership list, we should 
add it
+  // and tell the statestore. We also ensure that it is part of our backend 
config.
+  if (current_membership_.find(local_backend_id_) == 
current_membership_.end()) {
+    new_backend_config->AddBackend(local_backend_descriptor_);
+    VLOG(1) << "Registering local backend with statestore";
+    subscriber_topic_updates->push_back(TTopicDelta());
+    TTopicDelta& update = subscriber_topic_updates->back();
+    update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
+    update.topic_entries.push_back(TTopicItem());
+
+    TTopicItem& item = update.topic_entries.back();
+    item.key = local_backend_id_;
+    Status status = thrift_serializer_.Serialize(&local_backend_descriptor_, 
&item.value);
+    if (!status.ok()) {
+      LOG(WARNING) << "Failed to serialize Impala backend address for 
statestore topic:"
+                   << " " << status.GetDetail();
+      subscriber_topic_updates->pop_back();
+    }
+  }
+
+  DCHECK(new_backend_config->LookUpBackendIp(
+      local_backend_descriptor_.address.hostname, nullptr));
+  SetBackendConfig(new_backend_config);
+
+  if (metrics_ != NULL) {
+    /// TODO-MT: fix this (do we even need to report it?)
+    num_fragment_instances_metric_->set_value(current_membership_.size());
+  }
 }
 
 SimpleScheduler::BackendConfigPtr SimpleScheduler::GetBackendConfig() const {
@@ -565,7 +577,7 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
     const vector<TNetworkAddress>& host_list, bool exec_at_coord,
     const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
     FragmentScanRangeAssignment* assignment) {
-  if (backend_config.NumBackends() == 0) {
+  if (backend_config.NumBackends() == 0 && !exec_at_coord) {
     return Status(TErrorCode::NO_REGISTERED_BACKENDS);
   }
 
@@ -589,7 +601,8 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
   // random rank.
   bool random_replica = query_options.schedule_random_replica || 
node_random_replica;
 
-  AssignmentCtx assignment_ctx(backend_config, total_assignments_,
+  AssignmentCtx assignment_ctx(
+      exec_at_coord ? coord_only_backend_config_ : backend_config, 
total_assignments_,
       total_local_assignments_);
 
   vector<const TScanRangeLocationList*> remote_scan_range_locations;
@@ -601,6 +614,8 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
 
     // Select backend host for the current scan range.
     if (exec_at_coord) {
+      DCHECK(assignment_ctx.backend_config().LookUpBackendIp(
+          local_backend_descriptor_.address.hostname, nullptr));
       assignment_ctx.RecordScanRangeAssignment(local_backend_descriptor_, 
node_id,
           host_list, scan_range_locations, assignment);
     } else {
@@ -672,6 +687,7 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
 
   // Assign remote scans to backends.
   for (const TScanRangeLocationList* scan_range_locations: 
remote_scan_range_locations) {
+    DCHECK(!exec_at_coord);
     const IpAddr* backend_ip = assignment_ctx.SelectRemoteBackendHost();
     TBackendDescriptor backend;
     assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
@@ -903,7 +919,7 @@ void 
SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
   }
 
   IpAddr backend_ip;
-  backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip);
+  DCHECK(backend_config_.LookUpBackendIp(backend.address.hostname, 
&backend_ip));
   DCHECK(!backend_ip.empty());
   assignment_heap_.InsertOrUpdate(backend_ip, scan_range_length,
       GetBackendRank(backend_ip));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/96d98abf/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h 
b/be/src/scheduling/simple-scheduler.h
index cc68432..d09ff70 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -256,6 +256,10 @@ class SimpleScheduler : public Scheduler {
   /// during scheduling.
   BackendConfigPtr backend_config_;
 
+  /// A backend configuration which only contains the local backend. It is 
used when
+  /// scheduling on the coordinator.
+  BackendConfig coord_only_backend_config_;
+
   /// Protect access to backend_config_ which might otherwise be updated 
asynchronously
   /// with respect to reads.
   mutable boost::mutex backend_config_lock_;

Reply via email to