IMPALA-6907: Close stale connections to removed cluster members

Previously, ImpalaServer::MembershipCallback() is used by each
Impala backend node to update cluster membership. It also removes
stale connections to nodes which are no longer members of the cluster.
However, the way it detects removed member is flawed as it relies
on query_locations_ to determine whether stale connections may
exist to the removed members. query_locations_ is a map of host
name to a set of queries running on that host. A entry for a remote
node only exists in query_locations_ if an Impalad node has acted
as coordinator of a query with fragment instances scheduled to run
on that remote node.

This change fixes this problem by closing connections to remote
hosts which are removed from the cluster regardless of whether
it can be found in query_locations_. A new test is added to
exercise this path by restarting Impalad backend nodes between
queries. Also change impala_cluster.py to use bin/start-impala.sh
to start Impala demon instead of directly forking and exec'ing
Impalad. This is needed as start-impala.sh sets up the proper
Java related environment variables.

Change-Id: I41b7297cf665bf291b09b23524d19b1d10ab281d
Reviewed-on: http://gerrit.cloudera.org:8080/10327
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f40dc5dd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f40dc5dd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f40dc5dd

Branch: refs/heads/2.x
Commit: f40dc5dd4d5e4b6e7c01f078940778fc23e33a8b
Parents: 60be81b
Author: Michael Ho <[email protected]>
Authored: Wed May 2 15:25:26 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue May 15 21:10:10 2018 +0000

----------------------------------------------------------------------
 be/src/service/impala-server.cc               | 10 ++--
 tests/common/impala_cluster.py                | 19 ++++---
 tests/custom_cluster/test_restart_services.py | 62 ++++++++++++++--------
 3 files changed, 56 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f40dc5dd/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 811abab..36224d4 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1538,10 +1538,15 @@ void ImpalaServer::MembershipCallback(
     // clear the saved mapping of known backends.
     if (!delta.is_delta) known_backends_.clear();
 
-    // Process membership additions.
+    // Process membership additions/deletions.
     for (const TTopicItem& item: delta.topic_entries) {
       if (item.deleted) {
-        known_backends_.erase(item.key);
+        auto entry = known_backends_.find(item.key);
+        // Remove stale connections to removed members.
+        if (entry != known_backends_.end()) {
+          
exec_env_->impalad_client_cache()->CloseConnections(entry->second.address);
+          known_backends_.erase(item.key);
+        }
         continue;
       }
       uint32_t len = item.value.size();
@@ -1599,7 +1604,6 @@ void ImpalaServer::MembershipCallback(
             vector<TNetworkAddress>& failed_hosts = 
queries_to_cancel[*query_id];
             failed_hosts.push_back(loc_entry->first);
           }
-          
exec_env_->impalad_client_cache()->CloseConnections(loc_entry->first);
           // We can remove the location wholesale once we know backend's 
failed. To do so
           // safely during iteration, we have to be careful not in invalidate 
the current
           // iterator, so copy the iterator to do the erase(..) and advance 
the original.

http://git-wip-us.apache.org/repos/asf/impala/blob/f40dc5dd/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 2d1ca58..9ff7318 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -38,6 +38,7 @@ LOG.setLevel(level=logging.DEBUG)
 
 IMPALA_HOME = os.environ['IMPALA_HOME']
 CATALOGD_PATH = os.path.join(IMPALA_HOME, 'bin/start-catalogd.sh')
+IMPALAD_PATH = os.path.join(IMPALA_HOME, 'bin/start-impalad.sh 
-build_type=latest')
 
 # Represents a set of Impala processes. Each Impala process must be created 
with
 # a basic set of command line options (beeswax_port, webserver_port, etc)
@@ -160,14 +161,11 @@ class Process(object):
 
   def start(self):
     LOG.info("Starting process: %s" % ' '.join(self.cmd))
-    self.process = exec_process_async(' '.join(self.cmd))
-
-  def wait(self):
-    """Wait until the current process has exited, and returns
-    (return code, stdout, stderr)"""
-    LOG.info("Waiting for process: %s" % ' '.join(self.cmd))
-    stdout, stderr = self.process.communicate()
-    return self.process.returncode, stdout, stderr
+    # Use os.system() to start 'cmd' in the background via a shell so its 
parent will be
+    # init after the shell exits. Otherwise, the parent of 'cmd' will be 
py.test and we
+    # cannot cleanly kill it until py.test exits. In theory, Popen(shell=True) 
should
+    # achieve the same thing but it doesn't work on some platforms for some 
reasons.
+    os.system(' '.join(self.cmd) + ' &')
 
   def kill(self, signal=SIGKILL):
     """
@@ -180,7 +178,6 @@ class Process(object):
       assert 0, "No processes %s found" % self.cmd
     LOG.info('Killing: %s (PID: %d) with signal %s'  % (' '.join(self.cmd), 
pid, signal))
     exec_process("kill -%d %d" % (signal, pid))
-
     return pid
 
   def restart(self):
@@ -233,7 +230,9 @@ class ImpaladProcess(BaseImpalaProcess):
 
   def start(self, wait_until_ready=True):
     """Starts the impalad and waits until the service is ready to accept 
connections."""
-    super(ImpaladProcess, self).start()
+    restart_cmd = [IMPALAD_PATH] + self.cmd[1:] + ['&']
+    LOG.info("Starting Impalad process: %s" % ' '.join(restart_cmd))
+    os.system(' '.join(restart_cmd))
     if wait_until_ready:
       self.service.wait_for_metric_value('impala-server.ready',
                                          expected_value=1, timeout=30)

http://git-wip-us.apache.org/repos/asf/impala/blob/f40dc5dd/tests/custom_cluster/test_restart_services.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_restart_services.py 
b/tests/custom_cluster/test_restart_services.py
index bcfe19d..f2bb7fb 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -35,25 +35,43 @@ class TestRestart(CustomClusterTestSuite):
     """ Regression test of IMPALA-6973. After the statestore restarts, the 
metadata should
         eventually recover after being cleared by the new statestore.
     """
-    try:
-      self.cluster.statestored.restart()
-      # We need to wait for the impalad to register to the new statestored and 
for a
-      # non-empty catalog update from the new statestored. It cannot be 
expressed with the
-      # existing metrics yet so we wait for some time here.
-      wait_time_s = specific_build_type_timeout(60, slow_build_timeout=100)
-      sleep(wait_time_s)
-      for retry in xrange(wait_time_s):
-        try:
-          cursor.execute("describe database functional")
-          return
-        except HiveServer2Error, e:
-          assert "AnalysisException: Database does not exist: functional" in 
e.message,\
-              "Unexpected exception: " + e.message
-          sleep(1)
-      assert False, "Coordinator never received non-empty metadata from the 
restarted " \
-          "statestore after {0} seconds".format(wait_time_s)
-    finally:
-      # Workaround for IMPALA-5695. Restarted process has to be manually 
killed or it will
-      # block start-impala-cluster.py from killing impala daemons.
-      self.cluster.statestored.kill()
-      self.cluster.statestored.wait()
+
+    self.cluster.statestored.restart()
+    # We need to wait for the impalad to register to the new statestored and 
for a
+    # non-empty catalog update from the new statestored. It cannot be 
expressed with the
+    # existing metrics yet so we wait for some time here.
+    wait_time_s = specific_build_type_timeout(60, slow_build_timeout=100)
+    sleep(wait_time_s)
+    for retry in xrange(wait_time_s):
+      try:
+        cursor.execute("describe database functional")
+        return
+      except HiveServer2Error, e:
+        assert "AnalysisException: Database does not exist: functional" in 
e.message,\
+               "Unexpected exception: " + e.message
+        sleep(1)
+    assert False, "Coordinator never received non-empty metadata from the 
restarted " \
+           "statestore after {0} seconds".format(wait_time_s)
+
+  @pytest.mark.execute_serially
+  def test_restart_impala(self):
+      """ This test aims to restart Impalad executor nodes between queries to 
exercise
+      the cluster membership callback which removes stale connections to the 
restarted
+      nodes."""
+
+      self._start_impala_cluster([], num_coordinators=1, cluster_size=3)
+      assert len(self.cluster.impalads) == 3
+
+      client = self.cluster.impalads[0].service.create_beeswax_client()
+      assert client is not None
+
+      for i in xrange(5):
+        self.execute_query_expect_success(client, "select * from 
functional.alltypes")
+        node_to_restart = 1 + (i % 2)
+        self.cluster.impalads[node_to_restart].restart()
+        # Sleep for a bit for the statestore change in membership to 
propagate. The min
+        # update frequency for statestore is 100ms but using a larger sleep 
time here
+        # as certain builds (e.g. ASAN) can be really slow.
+        sleep(3)
+
+      client.close()

Reply via email to