IMPALA-6907: Close stale connections to removed cluster members Previously, ImpalaServer::MembershipCallback() is used by each Impala backend node to update cluster membership. It also removes stale connections to nodes which are no longer members of the cluster. However, the way it detects removed member is flawed as it relies on query_locations_ to determine whether stale connections may exist to the removed members. query_locations_ is a map of host name to a set of queries running on that host. A entry for a remote node only exists in query_locations_ if an Impalad node has acted as coordinator of a query with fragment instances scheduled to run on that remote node.
This change fixes this problem by closing connections to remote hosts which are removed from the cluster regardless of whether it can be found in query_locations_. A new test is added to exercise this path by restarting Impalad backend nodes between queries. Also change impala_cluster.py to use bin/start-impala.sh to start Impala demon instead of directly forking and exec'ing Impalad. This is needed as start-impala.sh sets up the proper Java related environment variables. Change-Id: I41b7297cf665bf291b09b23524d19b1d10ab281d Reviewed-on: http://gerrit.cloudera.org:8080/10327 Reviewed-by: Michael Ho <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f40dc5dd Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f40dc5dd Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f40dc5dd Branch: refs/heads/2.x Commit: f40dc5dd4d5e4b6e7c01f078940778fc23e33a8b Parents: 60be81b Author: Michael Ho <[email protected]> Authored: Wed May 2 15:25:26 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue May 15 21:10:10 2018 +0000 ---------------------------------------------------------------------- be/src/service/impala-server.cc | 10 ++-- tests/common/impala_cluster.py | 19 ++++--- tests/custom_cluster/test_restart_services.py | 62 ++++++++++++++-------- 3 files changed, 56 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/f40dc5dd/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 811abab..36224d4 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1538,10 +1538,15 @@ void ImpalaServer::MembershipCallback( // clear the saved mapping of known backends. if (!delta.is_delta) known_backends_.clear(); - // Process membership additions. + // Process membership additions/deletions. for (const TTopicItem& item: delta.topic_entries) { if (item.deleted) { - known_backends_.erase(item.key); + auto entry = known_backends_.find(item.key); + // Remove stale connections to removed members. + if (entry != known_backends_.end()) { + exec_env_->impalad_client_cache()->CloseConnections(entry->second.address); + known_backends_.erase(item.key); + } continue; } uint32_t len = item.value.size(); @@ -1599,7 +1604,6 @@ void ImpalaServer::MembershipCallback( vector<TNetworkAddress>& failed_hosts = queries_to_cancel[*query_id]; failed_hosts.push_back(loc_entry->first); } - exec_env_->impalad_client_cache()->CloseConnections(loc_entry->first); // We can remove the location wholesale once we know backend's failed. To do so // safely during iteration, we have to be careful not in invalidate the current // iterator, so copy the iterator to do the erase(..) and advance the original. http://git-wip-us.apache.org/repos/asf/impala/blob/f40dc5dd/tests/common/impala_cluster.py ---------------------------------------------------------------------- diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index 2d1ca58..9ff7318 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -38,6 +38,7 @@ LOG.setLevel(level=logging.DEBUG) IMPALA_HOME = os.environ['IMPALA_HOME'] CATALOGD_PATH = os.path.join(IMPALA_HOME, 'bin/start-catalogd.sh') +IMPALAD_PATH = os.path.join(IMPALA_HOME, 'bin/start-impalad.sh -build_type=latest') # Represents a set of Impala processes. Each Impala process must be created with # a basic set of command line options (beeswax_port, webserver_port, etc) @@ -160,14 +161,11 @@ class Process(object): def start(self): LOG.info("Starting process: %s" % ' '.join(self.cmd)) - self.process = exec_process_async(' '.join(self.cmd)) - - def wait(self): - """Wait until the current process has exited, and returns - (return code, stdout, stderr)""" - LOG.info("Waiting for process: %s" % ' '.join(self.cmd)) - stdout, stderr = self.process.communicate() - return self.process.returncode, stdout, stderr + # Use os.system() to start 'cmd' in the background via a shell so its parent will be + # init after the shell exits. Otherwise, the parent of 'cmd' will be py.test and we + # cannot cleanly kill it until py.test exits. In theory, Popen(shell=True) should + # achieve the same thing but it doesn't work on some platforms for some reasons. + os.system(' '.join(self.cmd) + ' &') def kill(self, signal=SIGKILL): """ @@ -180,7 +178,6 @@ class Process(object): assert 0, "No processes %s found" % self.cmd LOG.info('Killing: %s (PID: %d) with signal %s' % (' '.join(self.cmd), pid, signal)) exec_process("kill -%d %d" % (signal, pid)) - return pid def restart(self): @@ -233,7 +230,9 @@ class ImpaladProcess(BaseImpalaProcess): def start(self, wait_until_ready=True): """Starts the impalad and waits until the service is ready to accept connections.""" - super(ImpaladProcess, self).start() + restart_cmd = [IMPALAD_PATH] + self.cmd[1:] + ['&'] + LOG.info("Starting Impalad process: %s" % ' '.join(restart_cmd)) + os.system(' '.join(restart_cmd)) if wait_until_ready: self.service.wait_for_metric_value('impala-server.ready', expected_value=1, timeout=30) http://git-wip-us.apache.org/repos/asf/impala/blob/f40dc5dd/tests/custom_cluster/test_restart_services.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py index bcfe19d..f2bb7fb 100644 --- a/tests/custom_cluster/test_restart_services.py +++ b/tests/custom_cluster/test_restart_services.py @@ -35,25 +35,43 @@ class TestRestart(CustomClusterTestSuite): """ Regression test of IMPALA-6973. After the statestore restarts, the metadata should eventually recover after being cleared by the new statestore. """ - try: - self.cluster.statestored.restart() - # We need to wait for the impalad to register to the new statestored and for a - # non-empty catalog update from the new statestored. It cannot be expressed with the - # existing metrics yet so we wait for some time here. - wait_time_s = specific_build_type_timeout(60, slow_build_timeout=100) - sleep(wait_time_s) - for retry in xrange(wait_time_s): - try: - cursor.execute("describe database functional") - return - except HiveServer2Error, e: - assert "AnalysisException: Database does not exist: functional" in e.message,\ - "Unexpected exception: " + e.message - sleep(1) - assert False, "Coordinator never received non-empty metadata from the restarted " \ - "statestore after {0} seconds".format(wait_time_s) - finally: - # Workaround for IMPALA-5695. Restarted process has to be manually killed or it will - # block start-impala-cluster.py from killing impala daemons. - self.cluster.statestored.kill() - self.cluster.statestored.wait() + + self.cluster.statestored.restart() + # We need to wait for the impalad to register to the new statestored and for a + # non-empty catalog update from the new statestored. It cannot be expressed with the + # existing metrics yet so we wait for some time here. + wait_time_s = specific_build_type_timeout(60, slow_build_timeout=100) + sleep(wait_time_s) + for retry in xrange(wait_time_s): + try: + cursor.execute("describe database functional") + return + except HiveServer2Error, e: + assert "AnalysisException: Database does not exist: functional" in e.message,\ + "Unexpected exception: " + e.message + sleep(1) + assert False, "Coordinator never received non-empty metadata from the restarted " \ + "statestore after {0} seconds".format(wait_time_s) + + @pytest.mark.execute_serially + def test_restart_impala(self): + """ This test aims to restart Impalad executor nodes between queries to exercise + the cluster membership callback which removes stale connections to the restarted + nodes.""" + + self._start_impala_cluster([], num_coordinators=1, cluster_size=3) + assert len(self.cluster.impalads) == 3 + + client = self.cluster.impalads[0].service.create_beeswax_client() + assert client is not None + + for i in xrange(5): + self.execute_query_expect_success(client, "select * from functional.alltypes") + node_to_restart = 1 + (i % 2) + self.cluster.impalads[node_to_restart].restart() + # Sleep for a bit for the statestore change in membership to propagate. The min + # update frequency for statestore is 100ms but using a larger sleep time here + # as certain builds (e.g. ASAN) can be really slow. + sleep(3) + + client.close()
