This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 72aaa6dc27bf32c973055a782aeaa2270c66c038
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Sat Aug 10 00:21:19 2024 +0200

    IMPALA-11729: Speed up start-impala-cluster.py
    
    The change reduces cluster startup time by 1-2 seconds. This also
    makes custom cluster tests a bit quicker.
    
    Most of the improvement is caused by removing unneeded sleep from
    wait_for_catalog() - it also slept after successful connections,
    while when the first coordinator is up, it is likely that all
    others are also up, meaning 3*0.5s extra sleep in the dev cluster.
    
    Other changes:
    - wait_for_catalog is cleaned up and renamed to
      wait_for_coordinator_services
    - also wait for hs2_http port to be open
    - decreased some sleep intervals
    - removed some non-informative logging
    - wait for hs2/beeswax/webui ports to be open before trying
      to actually connect to them to avoid extra logging from
      failed Thrift/http connections
    - reordered startup to first wait for coordinators to be up
      then wait for num_known_live_backends in each impalad - this
      reflects better what the cluster actually waits for (1st catalog
      update before starting coordinator services)
    
    Change-Id: Ic4dd8c2bc7056443373ceb256a03ce562fea38a0
    Reviewed-on: http://gerrit.cloudera.org:8080/21656
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Reviewed-by: Laszlo Gaal <[email protected]>
---
 bin/start-impala-cluster.py    |  2 +-
 tests/common/impala_cluster.py | 97 +++++++++++++++++++++++++++---------------
 tests/common/impala_service.py | 30 +++++++++++--
 3 files changed, 89 insertions(+), 40 deletions(-)

diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index ad0e41d66..917c17971 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -1159,7 +1159,7 @@ if __name__ == "__main__":
       cluster_ops.start_impalads(options.cluster_size, 
options.num_coordinators,
                                  options.use_exclusive_coordinators)
     # Sleep briefly to reduce log spam: the cluster takes some time to start 
up.
-    sleep(3)
+    sleep(2)
 
     impala_cluster = cluster_ops.get_cluster()
     expected_catalog_delays = 0
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 1465ab31b..707765182 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -104,7 +104,7 @@ class ImpalaCluster(object):
     the environment."""
     return ImpalaCluster(docker_network=tests.common.environ.docker_network)
 
-  def refresh(self):
+  def refresh(self, silent=False):
     """ Re-loads the impalad/statestored/catalogd processes if they exist.
 
     Helpful to confirm that processes have been killed.
@@ -119,9 +119,10 @@ class ImpalaCluster(object):
     if self.use_admission_service:
       admissiond_str = "/%d admissiond" % (1 if self.__admissiond else 0)
 
-    LOG.debug("Found %d impalad/%d statestored/%d catalogd%s process(es)" %
-        (len(self.__impalads), len(self.__statestoreds), len(self.__catalogds),
-         admissiond_str))
+    if not silent:
+      LOG.debug("Found %d impalad/%d statestored/%d catalogd%s process(es)" %
+          (len(self.__impalads), len(self.__statestoreds), 
len(self.__catalogds),
+           admissiond_str))
 
   @property
   def statestored(self):
@@ -201,8 +202,10 @@ class ImpalaCluster(object):
           one impalad is up).
         - expected_num_ready_impalads backends are registered with the 
statestore.
           expected_num_ready_impalads defaults to expected_num_impalads.
+        - Each impalad's debug webserver is ready.
+        - Each coordinator impalad's hs2/beeswax port is open (this happens 
after catalog
+          cache is ready).
         - All impalads knows about all other ready impalads.
-        - Each coordinator impalad's catalog cache is ready.
       This information is retrieved by querying the statestore debug webpage
       and each individual impalad's metrics webpage.
     """
@@ -215,20 +218,35 @@ class ImpalaCluster(object):
     def check_processes_still_running():
       """Check that the processes we waited for above (i.e. impalads, 
statestored,
       catalogd) are still running. Throw an exception otherwise."""
-      self.refresh()
+      self.refresh(silent=True)
       # The number of impalad processes may temporarily increase if breakpad 
forked a
       # process to write a minidump.
       assert len(self.impalads) >= expected_num_impalads
       assert self.statestored is not None
       assert self.catalogd is not None
 
+    sleep_interval = 0.5
+    # Wait for each webserver to be ready.
+    for impalad in self.impalads:
+      impalad.wait_for_webserver(sleep_interval, check_processes_still_running)
+
+    # Wait for coordinators to start.
+    for impalad in self.impalads:
+      if impalad._get_arg_value("is_coordinator", default="true") != "true": 
continue
+      if impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) != 
0: continue
+      impalad.wait_for_coordinator_services(sleep_interval, 
check_processes_still_running)
+      # Decrease sleep_interval after first coordinator ready as the others 
are also
+      # likely to be (nearly) ready.
+      sleep_interval = 0.2
+
+    # Restore sleep interval to avoid potential log spew. At this point it is 
unlikely
+    # that any more sleeps are actually needed.
+    sleep_interval = 0.5
+    # Wait till all impalads consider all backends ready.
     for impalad in self.impalads:
       
impalad.service.wait_for_num_known_live_backends(expected_num_ready_impalads,
-          timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2,
+          timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=sleep_interval,
           early_abort_fn=check_processes_still_running)
-      if (impalad._get_arg_value("is_coordinator", default="true") == "true"
-         and impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) 
== 0):
-        impalad.wait_for_catalog()
 
   def wait_for_num_impalads(self, num_impalads, retries=10):
     """Checks that at least 'num_impalads' impalad processes are running, 
along with
@@ -589,32 +607,41 @@ class ImpaladProcess(BaseImpalaProcess):
       self.service.wait_for_metric_value('impala-server.ready',
                                          expected_value=1, timeout=timeout)
 
-  def wait_for_catalog(self):
-    """Waits for a catalog copy to be received by the impalad. When its 
received,
-       additionally waits for client ports to be opened."""
+  def wait_for_webserver(self, sleep_interval, early_abort_fn):
     start_time = time.time()
-    beeswax_port_is_open = False
-    hs2_port_is_open = False
-    num_dbs = 0
-    num_tbls = 0
-    while ((time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS)
-        and not (beeswax_port_is_open and hs2_port_is_open)):
-      try:
-        num_dbs, num_tbls = self.service.get_metric_values(
-            ["catalog.num-databases", "catalog.num-tables"])
-        beeswax_port_is_open = self.service.beeswax_port_is_open()
-        hs2_port_is_open = self.service.hs2_port_is_open()
-      except Exception:
-        LOG.exception(("Client services not ready. Waiting for catalog cache: "
-            "({num_dbs} DBs / {num_tbls} tables). Trying again ...").format(
-                num_dbs=num_dbs,
-                num_tbls=num_tbls))
-      sleep(0.5)
-
-    if not hs2_port_is_open or not beeswax_port_is_open:
-      raise RuntimeError(
-          "Unable to open client ports within {num_seconds} seconds.".format(
-              num_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS))
+    while time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS:
+      LOG.info("Waiting for Impalad webserver port %s", 
self.service.webserver_port)
+      if self.service.webserver_port_is_open(): return
+      early_abort_fn()
+      sleep(sleep_interval)
+
+  def wait_for_coordinator_services(self, sleep_interval, early_abort_fn):
+    """Waits for client ports to be opened. Assumes that the webservice ports 
are open."""
+    start_time = time.time()
+    LOG.info(
+        "Waiting for coordinator client services " +
+        "- hs2 port: %d hs2-http port: %d beeswax port: %d",
+        self.service.hs2_port, self.service.hs2_http_port, 
self.service.beeswax_port)
+    while time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS:
+      beeswax_port_is_open = self.service.beeswax_port_is_open()
+      hs2_port_is_open = self.service.hs2_port_is_open()
+      hs2_http_port_is_open = self.service.hs2_http_port_is_open()
+      if beeswax_port_is_open and hs2_port_is_open and hs2_http_port_is_open:
+        return
+      early_abort_fn()
+      # The coordinator is likely to wait for the catalog update. Fetch the 
number
+      # of catalog objects.
+      num_dbs, num_tbls = self.service.get_metric_values(
+          ["catalog.num-databases", "catalog.num-tables"])
+      LOG.info(("Client services not ready. Waiting for catalog cache: "
+          "({num_dbs} DBs / {num_tbls} tables). Trying again ...").format(
+              num_dbs=num_dbs,
+              num_tbls=num_tbls))
+      sleep(sleep_interval)
+
+    raise RuntimeError(
+        "Unable to open client ports within {num_seconds} seconds.".format(
+            num_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS))
 
   def set_jvm_log_level(self, class_name, level):
     """Helper method to set JVM log level for certain class name."""
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index d46f67256..fdee26536 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -26,6 +26,7 @@ import logging
 import os
 import re
 import requests
+import socket
 import subprocess
 from datetime import datetime
 from time import sleep, time
@@ -370,7 +371,7 @@ class ImpaladService(BaseImpalaService):
       else:
         LOG.info("Waiting for num_known_live_backends=%s. Current value: %s" %
             (expected_value, value))
-      sleep(1)
+      sleep(interval)
     assert 0, 'num_known_live_backends did not reach expected value in time'
 
   def read_query_profile_page(self, query_id, timeout=10, interval=1):
@@ -425,6 +426,17 @@ class ImpaladService(BaseImpalaService):
       sleep(interval)
     return False
 
+  def is_port_open(self, port):
+    try:
+      sock = socket.create_connection((self.hostname, port), timeout=1)
+      sock.close()
+      return True
+    except Exception:
+      return False
+
+  def webserver_port_is_open(self):
+    return self.is_port_open(self.webserver_port)
+
   def create_beeswax_client(self, use_kerberos=False):
     """Creates a new beeswax client connection to the impalad"""
     client = create_connection('%s:%d' % (self.hostname, self.beeswax_port),
@@ -434,12 +446,16 @@ class ImpaladService(BaseImpalaService):
 
   def beeswax_port_is_open(self):
     """Test if the beeswax port is open. Does not need to authenticate."""
+    # Check if the port is open first to avoid chatty logging of Thrift 
connection.
+    if not self.is_port_open(self.beeswax_port): return False
+
     try:
       # The beeswax client will connect successfully even if not authenticated.
       client = self.create_beeswax_client()
       client.close()
       return True
-    except Exception:
+    except Exception as e:
+      LOG.info(e)
       return False
 
   def create_ldap_beeswax_client(self, user, password, use_ssl=False):
@@ -456,11 +472,14 @@ class ImpaladService(BaseImpalaService):
 
   def hs2_port_is_open(self):
     """Test if the HS2 port is open. Does not need to authenticate."""
+    # Check if the port is open first to avoid chatty logging of Thrift 
connection.
+    if not self.is_port_open(self.hs2_port): return False
+
     # Impyla will try to authenticate as part of connecting, so preserve 
previous logic
     # that uses the HS2 thrift code directly.
     try:
-      socket = TSocket(self.hostname, self.hs2_port)
-      transport = TBufferedTransport(socket)
+      sock = TSocket(self.hostname, self.hs2_port)
+      transport = TBufferedTransport(sock)
       transport.open()
       transport.close()
       return True
@@ -468,6 +487,9 @@ class ImpaladService(BaseImpalaService):
       LOG.info(e)
       return False
 
+  def hs2_http_port_is_open(self):
+    # Only check if the port is open, do not create Thrift transport.
+    return self.is_port_open(self.hs2_http_port)
 
 # Allows for interacting with the StateStore service to perform operations 
such as
 # accessing the debug webpage.

Reply via email to