IMPALA-4494: Fix crash in SimpleScheduler The scheduler maintains a local list of active backends, which is updated through messages from the statestore. Even the local backend enters this list by registering with the statestore and being included in a statestore update message. Thus, during restarts it can happen that a query gets scheduled with exec_at_coord set to true, while the local backend has not been registered with the scheduler. In this case the IP address lookup in the internal BackendConfig fails and an empty IP address is returned, leading to a nullptr dereference down the line.
This change adds an additional check when handling updates from the statestore to make sure that the backend config always contains the local backend. It also changes scheduling when exec_at_coord is true to always use the local backend, irrespective of whether it is present in the backend config. Change-Id: I6e1196a2fa47e5954c4a190aa326c135d039a77f Reviewed-on: http://gerrit.cloudera.org:8080/5127 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/96d98abf Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/96d98abf Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/96d98abf Branch: refs/heads/master Commit: 96d98abff52e59742469f9d5a86018506e00f88f Parents: 1fea997 Author: Lars Volker <[email protected]> Authored: Thu Nov 17 18:09:10 2016 +0100 Committer: Internal Jenkins <[email protected]> Committed: Thu Nov 24 04:11:25 2016 +0000 ---------------------------------------------------------------------- be/src/scheduling/simple-scheduler-test.cc | 81 ++++++++--- be/src/scheduling/simple-scheduler.cc | 170 +++++++++++++----------- be/src/scheduling/simple-scheduler.h | 4 + 3 files changed, 158 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/96d98abf/be/src/scheduling/simple-scheduler-test.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler-test.cc b/be/src/scheduling/simple-scheduler-test.cc index 38638ff..3c2c83b 100644 --- a/be/src/scheduling/simple-scheduler-test.cc +++ b/be/src/scheduling/simple-scheduler-test.cc @@ -293,11 +293,11 @@ TEST_F(SchedulerTest, TestDisableCachedReads) { /// behavior. Remove. TEST_F(SchedulerTest, EmptyStatestoreMessage) { Cluster cluster; + cluster.AddHosts(2, true, true); cluster.AddHosts(3, false, true); - cluster.AddHosts(2, true, false); Schema schema(cluster); - schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::RANDOM, 3); + schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::REMOTE_ONLY, 3); Plan plan(schema); plan.AddTableScan("T1"); @@ -307,18 +307,18 @@ TEST_F(SchedulerTest, EmptyStatestoreMessage) { scheduler.Compute(&result); EXPECT_EQ(0, result.NumTotalAssignedBytes(0)); - EXPECT_EQ(0, result.NumTotalAssignedBytes(1)); + EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1)); EXPECT_EQ(0, result.NumTotalAssignedBytes(2)); EXPECT_EQ(0, result.NumTotalAssignedBytes(3)); - EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(4)); + EXPECT_EQ(0, result.NumTotalAssignedBytes(4)); result.Reset(); scheduler.SendEmptyUpdate(); scheduler.Compute(&result); - EXPECT_EQ(0, result.NumTotalAssignedBytes(0)); + EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0)); EXPECT_EQ(0, result.NumTotalAssignedBytes(1)); EXPECT_EQ(0, result.NumTotalAssignedBytes(2)); - EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(3)); + EXPECT_EQ(0, result.NumTotalAssignedBytes(3)); EXPECT_EQ(0, result.NumTotalAssignedBytes(4)); } @@ -330,7 +330,7 @@ TEST_F(SchedulerTest, TestSendUpdates) { for (int i=0; i < 3; ++i) cluster.AddHost(i < 2, true); Schema schema(cluster); - schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::RANDOM, 3); + schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::REMOTE_ONLY, 1); Plan plan(schema); plan.AddTableScan("T1"); @@ -339,29 +339,31 @@ TEST_F(SchedulerTest, TestSendUpdates) { SchedulerWrapper scheduler(plan); scheduler.Compute(&result); - EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(0)); - EXPECT_EQ(0, result.NumCachedAssignedBytes(0)); - EXPECT_EQ(0, result.NumRemoteAssignedBytes(0)); - EXPECT_EQ(0, result.NumDiskAssignedBytes(1)); + // Two backends are registered, so the scheduler will pick a random one. + EXPECT_EQ(0, result.NumTotalAssignedBytes(0)); + EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1)); // Remove first host from scheduler. - scheduler.RemoveBackend(cluster.hosts()[0]); + scheduler.RemoveBackend(cluster.hosts()[1]); result.Reset(); scheduler.Compute(&result); - EXPECT_EQ(0, result.NumDiskAssignedBytes(0)); - EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(1)); + EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0)); + EXPECT_EQ(0, result.NumTotalAssignedBytes(1)); // Re-add first host from scheduler. - scheduler.AddBackend(cluster.hosts()[0]); + scheduler.AddBackend(cluster.hosts()[1]); result.Reset(); scheduler.Compute(&result); - EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(0)); - EXPECT_EQ(0, result.NumDiskAssignedBytes(1)); + // Two backends are registered, so the scheduler will pick a random one. + EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0)); + EXPECT_EQ(0, result.NumTotalAssignedBytes(1)); } /// IMPALA-4329: Test scheduling with no backends. +/// With the fix for IMPALA-4494, the scheduler will always register its local backend +/// with itself, so scheduling with no backends will still succeed. TEST_F(SchedulerTest, TestEmptyBackendConfig) { Cluster cluster; cluster.AddHost(false, true); @@ -375,9 +377,48 @@ TEST_F(SchedulerTest, TestEmptyBackendConfig) { Result result(plan); SchedulerWrapper scheduler(plan); Status status = scheduler.Compute(&result); - EXPECT_TRUE(!status.ok()); - EXPECT_EQ( - status.GetDetail(), "Cannot schedule query: no registered backends available.\n"); + EXPECT_TRUE(status.ok()); +} + +/// IMPALA-4494: Test scheduling with no backends but exec_at_coord. +TEST_F(SchedulerTest, TestExecAtCoordWithEmptyBackendConfig) { + Cluster cluster; + cluster.AddHost(false, true); + + Schema schema(cluster); + schema.AddMultiBlockTable("T", 1, ReplicaPlacement::REMOTE_ONLY, 1); + + Plan plan(schema); + plan.AddTableScan("T"); + + Result result(plan); + SchedulerWrapper scheduler(plan); + bool exec_at_coord = true; + Status status = scheduler.Compute(exec_at_coord, &result); + EXPECT_TRUE(status.ok()); +} + +/// IMPALA-4494: Test exec_at_coord while local backend is not registered with itself. +TEST_F(SchedulerTest, TestExecAtCoordWithoutLocalBackend) { + Cluster cluster; + cluster.AddHosts(3, true, true); + + Schema schema(cluster); + schema.AddMultiBlockTable("T", 1, ReplicaPlacement::LOCAL_ONLY, 1); + + Plan plan(schema); + plan.AddTableScan("T"); + + Result result(plan); + SchedulerWrapper scheduler(plan); + + // Remove first host from scheduler. By convention this is the coordinator. The + // scheduler will ignore this and successfully assign the scan. + scheduler.RemoveBackend(cluster.hosts()[0]); + + bool exec_at_coord = true; + Status status = scheduler.Compute(exec_at_coord, &result); + EXPECT_TRUE(status.ok()); } } // end namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/96d98abf/be/src/scheduling/simple-scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc index 020c67c..1c8af72 100644 --- a/be/src/scheduling/simple-scheduler.cc +++ b/be/src/scheduling/simple-scheduler.cc @@ -131,6 +131,8 @@ Status SimpleScheduler::Init() { local_backend_descriptor_.ip_address = ip; LOG(INFO) << "Simple-scheduler using " << ip << " as IP address"; + coord_only_backend_config_.AddBackend(local_backend_descriptor_); + if (webserver_ != NULL) { Webserver::UrlCallback backends_callback = bind<void>(mem_fn(&SimpleScheduler::BackendsUrlCallback), this, _1, _2); @@ -198,88 +200,98 @@ void SimpleScheduler::UpdateMembership( StatestoreSubscriber::TopicDeltaMap::const_iterator topic = incoming_topic_deltas.find(IMPALA_MEMBERSHIP_TOPIC); - if (topic != incoming_topic_deltas.end()) { - const TTopicDelta& delta = topic->second; + if (topic == incoming_topic_deltas.end()) return; + const TTopicDelta& delta = topic->second; - // This function needs to handle both delta and non-delta updates. To minimize the - // time needed to hold locks, all updates are applied to a copy of backend_config_, - // which is then swapped into place atomically. - std::shared_ptr<BackendConfig> new_backend_config; + // If the delta transmitted by the statestore is empty we can skip processing + // altogether and avoid making a copy of backend_config_. + if (delta.is_delta && delta.topic_entries.empty() && delta.topic_deletions.empty()) { + return; + } - if (!delta.is_delta) { - current_membership_.clear(); - new_backend_config = std::make_shared<BackendConfig>(); - } else { - // Make a copy - lock_guard<mutex> lock(backend_config_lock_); - new_backend_config = std::make_shared<BackendConfig>(*backend_config_); - } + // This function needs to handle both delta and non-delta updates. To minimize the + // time needed to hold locks, all updates are applied to a copy of backend_config_, + // which is then swapped into place atomically. + std::shared_ptr<BackendConfig> new_backend_config; - // Process new entries to the topic - for (const TTopicItem& item: delta.topic_entries) { - TBackendDescriptor be_desc; - // Benchmarks have suggested that this method can deserialize - // ~10m messages per second, so no immediate need to consider optimization. - uint32_t len = item.value.size(); - Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>( - item.value.data()), &len, false, &be_desc); - if (!status.ok()) { - VLOG(2) << "Error deserializing membership topic item with key: " << item.key; - continue; - } - if (be_desc.ip_address.empty()) { - // Each scheduler resolves its hostname locally in SimpleScheduler::Init() and - // adds the IP address to local_backend_descriptor_. If it is empty, then either - // that code has been changed, or someone else is sending malformed packets. - VLOG(1) << "Ignoring subscription request with empty IP address from subscriber: " - << be_desc.address; - continue; - } - if (item.key == local_backend_id_ - && be_desc.address != local_backend_descriptor_.address) { - // Someone else has registered this subscriber ID with a different address. We - // will try to re-register (i.e. overwrite their subscription), but there is - // likely a configuration problem. - LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: " - << be_desc.address; - } - - new_backend_config->AddBackend(be_desc); - current_membership_.insert(make_pair(item.key, be_desc)); + if (!delta.is_delta) { + current_membership_.clear(); + new_backend_config = std::make_shared<BackendConfig>(); + } else { + // Make a copy + lock_guard<mutex> lock(backend_config_lock_); + new_backend_config = std::make_shared<BackendConfig>(*backend_config_); + } + + // Process new entries to the topic + for (const TTopicItem& item : delta.topic_entries) { + TBackendDescriptor be_desc; + // Benchmarks have suggested that this method can deserialize + // ~10m messages per second, so no immediate need to consider optimization. + uint32_t len = item.value.size(); + Status status = DeserializeThriftMsg( + reinterpret_cast<const uint8_t*>(item.value.data()), &len, false, &be_desc); + if (!status.ok()) { + VLOG(2) << "Error deserializing membership topic item with key: " << item.key; + continue; } - // Process deletions from the topic - for (const string& backend_id: delta.topic_deletions) { - if (current_membership_.find(backend_id) != current_membership_.end()) { - new_backend_config->RemoveBackend(current_membership_[backend_id]); - current_membership_.erase(backend_id); - } + if (be_desc.ip_address.empty()) { + // Each scheduler resolves its hostname locally in SimpleScheduler::Init() and + // adds the IP address to local_backend_descriptor_. If it is empty, then either + // that code has been changed, or someone else is sending malformed packets. + VLOG(1) << "Ignoring subscription request with empty IP address from subscriber: " + << be_desc.address; + continue; } - SetBackendConfig(new_backend_config); - - // If this impalad is not in our view of the membership list, we should add it and - // tell the statestore. - if (current_membership_.find(local_backend_id_) == current_membership_.end()) { - VLOG(1) << "Registering local backend with statestore"; - subscriber_topic_updates->push_back(TTopicDelta()); - TTopicDelta& update = subscriber_topic_updates->back(); - update.topic_name = IMPALA_MEMBERSHIP_TOPIC; - update.topic_entries.push_back(TTopicItem()); - - TTopicItem& item = update.topic_entries.back(); - item.key = local_backend_id_; - Status status = thrift_serializer_.Serialize( - &local_backend_descriptor_, &item.value); - if (!status.ok()) { - LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic:" - << " " << status.GetDetail(); - subscriber_topic_updates->pop_back(); - } + if (item.key == local_backend_id_ + && be_desc.address != local_backend_descriptor_.address) { + // Someone else has registered this subscriber ID with a different address. We + // will try to re-register (i.e. overwrite their subscription), but there is + // likely a configuration problem. + LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: " + << be_desc.address; + continue; } - if (metrics_ != NULL) { - /// TODO-MT: fix this (do we even need to report it?) - num_fragment_instances_metric_->set_value(current_membership_.size()); + new_backend_config->AddBackend(be_desc); + current_membership_.insert(make_pair(item.key, be_desc)); + } + + // Process deletions from the topic + for (const string& backend_id : delta.topic_deletions) { + if (current_membership_.find(backend_id) != current_membership_.end()) { + new_backend_config->RemoveBackend(current_membership_[backend_id]); + current_membership_.erase(backend_id); } } + + // If the local backend is not in our view of the membership list, we should add it + // and tell the statestore. We also ensure that it is part of our backend config. + if (current_membership_.find(local_backend_id_) == current_membership_.end()) { + new_backend_config->AddBackend(local_backend_descriptor_); + VLOG(1) << "Registering local backend with statestore"; + subscriber_topic_updates->push_back(TTopicDelta()); + TTopicDelta& update = subscriber_topic_updates->back(); + update.topic_name = IMPALA_MEMBERSHIP_TOPIC; + update.topic_entries.push_back(TTopicItem()); + + TTopicItem& item = update.topic_entries.back(); + item.key = local_backend_id_; + Status status = thrift_serializer_.Serialize(&local_backend_descriptor_, &item.value); + if (!status.ok()) { + LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic:" + << " " << status.GetDetail(); + subscriber_topic_updates->pop_back(); + } + } + + DCHECK(new_backend_config->LookUpBackendIp( + local_backend_descriptor_.address.hostname, nullptr)); + SetBackendConfig(new_backend_config); + + if (metrics_ != NULL) { + /// TODO-MT: fix this (do we even need to report it?) + num_fragment_instances_metric_->set_value(current_membership_.size()); + } } SimpleScheduler::BackendConfigPtr SimpleScheduler::GetBackendConfig() const { @@ -565,7 +577,7 @@ Status SimpleScheduler::ComputeScanRangeAssignment( const vector<TNetworkAddress>& host_list, bool exec_at_coord, const TQueryOptions& query_options, RuntimeProfile::Counter* timer, FragmentScanRangeAssignment* assignment) { - if (backend_config.NumBackends() == 0) { + if (backend_config.NumBackends() == 0 && !exec_at_coord) { return Status(TErrorCode::NO_REGISTERED_BACKENDS); } @@ -589,7 +601,8 @@ Status SimpleScheduler::ComputeScanRangeAssignment( // random rank. bool random_replica = query_options.schedule_random_replica || node_random_replica; - AssignmentCtx assignment_ctx(backend_config, total_assignments_, + AssignmentCtx assignment_ctx( + exec_at_coord ? coord_only_backend_config_ : backend_config, total_assignments_, total_local_assignments_); vector<const TScanRangeLocationList*> remote_scan_range_locations; @@ -601,6 +614,8 @@ Status SimpleScheduler::ComputeScanRangeAssignment( // Select backend host for the current scan range. if (exec_at_coord) { + DCHECK(assignment_ctx.backend_config().LookUpBackendIp( + local_backend_descriptor_.address.hostname, nullptr)); assignment_ctx.RecordScanRangeAssignment(local_backend_descriptor_, node_id, host_list, scan_range_locations, assignment); } else { @@ -672,6 +687,7 @@ Status SimpleScheduler::ComputeScanRangeAssignment( // Assign remote scans to backends. for (const TScanRangeLocationList* scan_range_locations: remote_scan_range_locations) { + DCHECK(!exec_at_coord); const IpAddr* backend_ip = assignment_ctx.SelectRemoteBackendHost(); TBackendDescriptor backend; assignment_ctx.SelectBackendOnHost(*backend_ip, &backend); @@ -903,7 +919,7 @@ void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment( } IpAddr backend_ip; - backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip); + DCHECK(backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip)); DCHECK(!backend_ip.empty()); assignment_heap_.InsertOrUpdate(backend_ip, scan_range_length, GetBackendRank(backend_ip)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/96d98abf/be/src/scheduling/simple-scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h index cc68432..d09ff70 100644 --- a/be/src/scheduling/simple-scheduler.h +++ b/be/src/scheduling/simple-scheduler.h @@ -256,6 +256,10 @@ class SimpleScheduler : public Scheduler { /// during scheduling. BackendConfigPtr backend_config_; + /// A backend configuration which only contains the local backend. It is used when + /// scheduling on the coordinator. + BackendConfig coord_only_backend_config_; + /// Protect access to backend_config_ which might otherwise be updated asynchronously /// with respect to reads. mutable boost::mutex backend_config_lock_;
