IMPALA-6793: Fix empty metadata after statestore restarts IMPALA-5990 introduced a bug where restarting the statestore deterministically clears the metadata without ever coming back. The cause of the bug is a wrong condition used by catalog to detect the restart of statestore.
A custom cluster regression test is added. The process restarting utility function in the custom cluster test is changed into using shell=True in popen. Change-Id: I332a60e172af84b93b3544373fe363cdced5e8d0 Reviewed-on: http://gerrit.cloudera.org:8080/9921 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Tianyi Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/740fc6b5 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/740fc6b5 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/740fc6b5 Branch: refs/heads/master Commit: 740fc6b57f074a448bac04ec2e8e05312f141f67 Parents: cfaffc2 Author: Tianyi Wang <[email protected]> Authored: Fri Apr 6 13:06:35 2018 -0700 Committer: Tianyi Wang <[email protected]> Committed: Wed Apr 18 18:26:24 2018 +0000 ---------------------------------------------------------------------- be/src/catalog/catalog-server.cc | 13 ++--- be/src/catalog/catalog-server.h | 5 -- tests/common/impala_cluster.py | 2 +- tests/custom_cluster/test_restart_services.py | 59 ++++++++++++++++++++++ 4 files changed, 67 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/be/src/catalog/catalog-server.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 8a91c25..e645204 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -157,7 +157,7 @@ CatalogServer::CatalogServer(MetricGroup* metrics) : thrift_iface_(new CatalogServiceThriftIf(this)), thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics), topic_updates_ready_(false), last_sent_catalog_version_(0L), - catalog_objects_min_version_(0L), catalog_objects_max_version_(0L) { + catalog_objects_max_version_(0L) { topic_processing_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics, CATALOG_SERVER_TOPIC_PROCESSING_TIMES); } @@ -228,10 +228,12 @@ void CatalogServer::UpdateCatalogTopicCallback( const TTopicDelta& delta = topic->second; - // If not generating a delta update and 'pending_topic_updates_' doesn't already contain - // the full catalog (beginning with version 0), then force GatherCatalogUpdatesThread() - // to reload the full catalog. - if (delta.from_version == 0 && catalog_objects_min_version_ != 0) { + // If the statestore restarts, both from_version and to_version would be 0. If catalog + // has sent non-empty topic udpate, pending_topic_updates_ won't be from version 0 and + // it should be re-collected. + if (delta.from_version == 0 && delta.to_version == 0 && + last_sent_catalog_version_ != 0) { + LOG(INFO) << "Statestore restart detected. Collecting a non-delta catalog update."; last_sent_catalog_version_ = 0L; } else if (!pending_topic_updates_.empty()) { // Process the pending topic update. @@ -284,7 +286,6 @@ void CatalogServer::UpdateCatalogTopicCallback( if (!status.ok()) { LOG(ERROR) << status.GetDetail(); } else { - catalog_objects_min_version_ = last_sent_catalog_version_; catalog_objects_max_version_ = resp.max_catalog_version; } } http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/be/src/catalog/catalog-server.h ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h index 2fa8ce7..1df83a3 100644 --- a/be/src/catalog/catalog-server.h +++ b/be/src/catalog/catalog-server.h @@ -119,11 +119,6 @@ class CatalogServer { /// Set in UpdateCatalogTopicCallback() and protected by the catalog_lock_. int64_t last_sent_catalog_version_; - /// The minimum catalog object version in pending_topic_updates_. All items in - /// pending_topic_updates_ will be greater than this version. Set by the - /// catalog_update_gathering_thread_ and protected by catalog_lock_. - int64_t catalog_objects_min_version_; - /// The max catalog version in pending_topic_updates_. Set by the /// catalog_update_gathering_thread_ and protected by catalog_lock_. int64_t catalog_objects_max_version_; http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/tests/common/impala_cluster.py ---------------------------------------------------------------------- diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index 3fbcacf..276c02b 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -29,7 +29,7 @@ from tests.common.impala_service import ( CatalogdService, ImpaladService, StateStoredService) -from tests.util.shell_util import exec_process_async, exec_process +from tests.util.shell_util import exec_process, exec_process_async logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s') LOG = logging.getLogger('impala_cluster') http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/tests/custom_cluster/test_restart_services.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py new file mode 100644 index 0000000..bcfe19d --- /dev/null +++ b/tests/custom_cluster/test_restart_services.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +from impala.error import HiveServer2Error +from tests.common.environ import specific_build_type_timeout +from time import sleep + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + + + +class TestRestart(CustomClusterTestSuite): + @classmethod + def get_workload(cls): + return 'functional-query' + + @pytest.mark.execute_serially + def test_restart_statestore(self, cursor): + """ Regression test of IMPALA-6973. After the statestore restarts, the metadata should + eventually recover after being cleared by the new statestore. + """ + try: + self.cluster.statestored.restart() + # We need to wait for the impalad to register to the new statestored and for a + # non-empty catalog update from the new statestored. It cannot be expressed with the + # existing metrics yet so we wait for some time here. + wait_time_s = specific_build_type_timeout(60, slow_build_timeout=100) + sleep(wait_time_s) + for retry in xrange(wait_time_s): + try: + cursor.execute("describe database functional") + return + except HiveServer2Error, e: + assert "AnalysisException: Database does not exist: functional" in e.message,\ + "Unexpected exception: " + e.message + sleep(1) + assert False, "Coordinator never received non-empty metadata from the restarted " \ + "statestore after {0} seconds".format(wait_time_s) + finally: + # Workaround for IMPALA-5695. Restarted process has to be manually killed or it will + # block start-impala-cluster.py from killing impala daemons. + self.cluster.statestored.kill() + self.cluster.statestored.wait()
