IMPALA-2642: Fix a potential deadlock in statestore The statestored can deadlock if the number of subscribers has reached STATESTORE_MAX_SUBSCRIBERS, because the DoSubscriberUpdate() method calls OfferUpdate(), while holding subscribers_lock_, which also tries to take the same lock in this situation.
Fix the issue by moving out the call to acquire subscribers_lock_ from OfferUpdate(), and depend on the callers to take it. We also make the maximum number of statestore subscribers a start-up time tuneable, to allow us to test the limit more easily. Testing: The problem is easily reproduced by lowering the value of STATESTORE_MAX_SUBSCRIBERS to 3, and then launching a mini cluster with 3 impalads. Without the fix, the statestored becomes completely deadlocked. A new EE test has been added to exercise this scenario. The test verifies that statestored correctly rejects new subscription requests when the limit it reached. Change-Id: I5d49dede221ce1f50ec299643b5532c61f93f0c6 Reviewed-on: http://gerrit.cloudera.org:8080/9038 Reviewed-by: Sailesh Mukil <sail...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ca01c9b7 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ca01c9b7 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ca01c9b7 Branch: refs/heads/2.x Commit: ca01c9b70f5d55b1f4c990c5f2897d110268bfff Parents: 4e5f039 Author: Zoram Thanga <zo...@cloudera.com> Authored: Tue Jan 16 12:01:09 2018 -0800 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Fri Feb 2 01:10:15 2018 +0000 ---------------------------------------------------------------------- be/src/statestore/statestore.cc | 33 ++++---- be/src/statestore/statestore.h | 2 +- tests/custom_cluster/test_custom_statestore.py | 88 +++++++++++++++++++++ 3 files changed, 108 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/ca01c9b7/be/src/statestore/statestore.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index 0f72e58..8f4ddbf 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -62,6 +62,9 @@ DEFINE_int32(statestore_heartbeat_tcp_timeout_seconds, 3, "(Advanced) The time a "badly hung machines that are not able to respond to the heartbeat RPC in short " "order"); +DEFINE_int32(statestore_max_subscribers, 10000, "Used to control the maximum size " + "of the pending topic-update queue. There is at most one entry per subscriber."); + // If this value is set too low, it's possible that UpdateState() might timeout during a // working invocation, and only a restart of the statestore with a change in value would // allow progress to be made. If set too high, a hung subscriber will waste an update @@ -93,10 +96,6 @@ const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations"; // an item with the initial version. const Statestore::TopicEntry::Version Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0; -// Used to control the maximum size of the pending topic-update queue, in which there is -// at most one entry per subscriber. -const int32_t STATESTORE_MAX_SUBSCRIBERS = 10000; - // Updates or heartbeats that miss their deadline by this much are logged. const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000; @@ -216,12 +215,12 @@ Statestore::Statestore(MetricGroup* metrics) subscriber_topic_update_threadpool_("statestore-update", "subscriber-update-worker", FLAGS_statestore_num_update_threads, - STATESTORE_MAX_SUBSCRIBERS, + FLAGS_statestore_max_subscribers, bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, false, _1, _2)), subscriber_heartbeat_threadpool_("statestore-heartbeat", "subscriber-heartbeat-worker", FLAGS_statestore_num_heartbeat_threads, - STATESTORE_MAX_SUBSCRIBERS, + FLAGS_statestore_max_subscribers, bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, true, _1, _2)), update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0, FLAGS_statestore_update_tcp_timeout_seconds * 1000, @@ -349,11 +348,17 @@ void Statestore::SubscribersHandler(const Webserver::ArgumentMap& args, Status Statestore::OfferUpdate(const ScheduledSubscriberUpdate& update, ThreadPool<ScheduledSubscriberUpdate>* threadpool) { - if (threadpool->GetQueueSize() >= STATESTORE_MAX_SUBSCRIBERS + // Somewhat confusingly, we're checking the number of entries in a particular + // threadpool's work queue to decide whether or not we have too many + // subscribers. The number of subscribers registered can be actually more + // than statestore_max_subscribers. This is because RegisterSubscriber() adds + // the new subscriber to subscribers_ first before scheduling its updates. + // Should we be stricter in enforcing this limit on subscribers_.size() itself? + if (threadpool->GetQueueSize() >= FLAGS_statestore_max_subscribers || !threadpool->Offer(update)) { stringstream ss; - ss << "Maximum subscriber limit reached: " << STATESTORE_MAX_SUBSCRIBERS; - lock_guard<mutex> l(subscribers_lock_); + ss << "Maximum subscriber limit reached: " << FLAGS_statestore_max_subscribers; + ss << ", subscribers_ size: " << subscribers_.size(); SubscriberMap::iterator subscriber_it = subscribers_.find(update.subscriber_id); DCHECK(subscriber_it != subscribers_.end()); subscribers_.erase(subscriber_it); @@ -400,12 +405,12 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id, PrintId(current_registration->registration_id()), true); num_subscribers_metric_->SetValue(subscribers_.size()); subscriber_set_metric_->Add(subscriber_id); - } - // Add the subscriber to the update queue, with an immediate schedule. - ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id); - RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_)); - RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_)); + // Add the subscriber to the update queue, with an immediate schedule. + ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id); + RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_)); + RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_)); + } LOG(INFO) << "Subscriber '" << subscriber_id << "' registered (registration id: " << PrintId(*registration_id) << ")"; http://git-wip-us.apache.org/repos/asf/impala/blob/ca01c9b7/be/src/statestore/statestore.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h index 38b8361..deeb5aa 100644 --- a/be/src/statestore/statestore.h +++ b/be/src/statestore/statestore.h @@ -461,7 +461,7 @@ class Statestore : public CacheLineAligned { StatsMetric<double>* heartbeat_duration_metric_; /// Utility method to add an update to the given thread pool, and to fail if the thread - /// pool is already at capacity. + /// pool is already at capacity. Assumes that subscribers_lock_ is held by the caller. Status OfferUpdate(const ScheduledSubscriberUpdate& update, ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT; http://git-wip-us.apache.org/repos/asf/impala/blob/ca01c9b7/tests/custom_cluster/test_custom_statestore.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_custom_statestore.py b/tests/custom_cluster/test_custom_statestore.py new file mode 100644 index 0000000..ee810ed --- /dev/null +++ b/tests/custom_cluster/test_custom_statestore.py @@ -0,0 +1,88 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests statestore with non-default startup options + +import logging +import os +import pytest +import re +import sys +import uuid +import socket + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_test_suite import ImpalaTestSuite + +from Types.ttypes import TNetworkAddress +from thrift.protocol import TBinaryProtocol +from thrift.transport import TSocket, TTransport + +import StatestoreService.StatestoreSubscriber as Subscriber +import StatestoreService.StatestoreService as Statestore +from ErrorCodes.ttypes import TErrorCode + +LOG = logging.getLogger('custom_statestore_test') +STATESTORE_SERVICE_PORT = 24000 + +# A simple wrapper class to launch a cluster where we can tune various +# startup parameters of the statestored to test correct boundary-value +# behavior. +class TestCustomStatestore(CustomClusterTestSuite): + # Grab a port the statestore subscribers will use to connect. + # Note that all subscribers we create below use this port to connect, + # with different subscriber IDs. + handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + handle.bind(('localhost', 0)) + _, port = handle.getsockname() + + @classmethod + def get_workload(self): + return 'functional-query' + + def __register_subscriber(self): + subscriber_id = "python-test-client-%s" % uuid.uuid4() + topics = [] + request = Subscriber.TRegisterSubscriberRequest(topic_registrations=topics, + subscriber_location=TNetworkAddress("localhost", self.port), + subscriber_id=subscriber_id) + client_transport = \ + TTransport.TBufferedTransport(TSocket.TSocket('localhost', STATESTORE_SERVICE_PORT)) + protocol = TBinaryProtocol.TBinaryProtocol(client_transport) + client = Statestore.Client(protocol) + client_transport.open() + return client.RegisterSubscriber(request) + + @CustomClusterTestSuite.with_args(statestored_args="-statestore_max_subscribers=3") + def test_statestore_max_subscribers(self): + """Test that the statestored correctly handles the condition where the number + of subscribers exceeds FLAGS_statestore_max_subscribers + (see be/src/statestore/statestore.cc). The expected behavior is for the + statestored to reject the subscription request once the threshold is + exceeded.""" + # With a statestore_max_subscribers of 3, we should hit the registration error + # pretty quick. + for x in xrange(20): + response = self.__register_subscriber() + if response.status.status_code == TErrorCode.OK: + self.registration_id = response.registration_id + LOG.log(logging.INFO, "Registration id %s, x=%d" % (response.registration_id, x)) + else: + assert 'Maximum subscriber limit reached:' in ''.join(response.status.error_msgs) + return