This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 24611f7dba77723a805518a09ca6e334c4794796
Author: Joe McDonnell <[email protected]>
AuthorDate: Tue Mar 3 10:48:35 2026 -0800

    IMPALA-14776 (part 3): Fix misc resource leaks
    
    This fixes a variety of misc resource leaks and other warnings.
    This is purely about reducing the noise, so these are not
    fixing issues that would impact build stability.
    
    There are a few different categories:
    1. Leaked open files
     - There are a couple locations that open /dev/null manually
       to use with subprocess. This switches them to subprocess.DEVNULL
     - Otherwise, use a context manager
    2. Leaked thread pools - Manually call terminate at the end
    3. Leaked processes - Add a wait() call after killing them
    4. Warnings about deprecated pyparsing APIs - switched to the
       new API in db_connection.py
    5. Warnings about invalid escapes - used raw strings for regex
    6. Warnings from pytest failing to collect tests from
       TestLastDdlTimeUpdate's TestHelper
     - Renamed TestHelper to Helper so pytest doesn't try to find
       tests in it
    7. A few minor socket leaks
    
    Testing:
     - Ran a core job
    
    Change-Id: Iee2c7733f477eadac7cde020b9aa47077985eb0d
    Reviewed-on: http://gerrit.cloudera.org:8080/24066
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
---
 tests/common/impala_test_suite.py                   | 2 +-
 tests/comparison/db_connection.py                   | 6 +++---
 tests/custom_cluster/test_concurrent_ddls.py        | 1 +
 tests/custom_cluster/test_local_catalog.py          | 2 ++
 tests/custom_cluster/test_shell_interactive.py      | 1 +
 tests/custom_cluster/test_topic_update_frequency.py | 5 +++++
 tests/metadata/test_compute_stats.py                | 3 ++-
 tests/metadata/test_ddl.py                          | 1 +
 tests/metadata/test_last_ddl_time_update.py         | 4 ++--
 tests/performance/query_executor.py                 | 2 +-
 tests/shell/test_shell_commandline.py               | 4 +++-
 tests/shell/test_shell_interactive.py               | 3 +++
 tests/stress/stress_util.py                         | 4 ++--
 tests/util/auto_scaler.py                           | 4 ++--
 tests/util/compute_table_stats.py                   | 1 +
 15 files changed, 30 insertions(+), 13 deletions(-)

diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 5232e8e29..c72be679e 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1493,7 +1493,7 @@ class ImpalaTestSuite(BaseTestSuite):
         # is specified; explicitly make sure there's nothing to
         # read to avoid hanging, especially when running interactively
         # with py.test.
-        stdin=open("/dev/null"),
+        stdin=subprocess.DEVNULL,
         universal_newlines=True,
         env=env)
     (stdout, stderr) = call.communicate()
diff --git a/tests/comparison/db_connection.py 
b/tests/comparison/db_connection.py
index 0c2b60b8d..25d6e4f28 100644
--- a/tests/comparison/db_connection.py
+++ b/tests/comparison/db_connection.py
@@ -36,7 +36,7 @@ from logging import getLogger
 from os import symlink, unlink
 from pyparsing import (
     alphanums,
-    delimitedList,
+    DelimitedList,
     Forward,
     Group,
     Literal,
@@ -346,11 +346,11 @@ class DbCursor(object):
     struct_field_name = Word(alphanums + '_')
     struct_field_pair = Group(struct_field_name + COLON + any_type)
 
-    t_struct << Group(Literal('struct') + LPAR + 
delimitedList(struct_field_pair) + RPAR)
+    t_struct << Group(Literal('struct') + LPAR + 
DelimitedList(struct_field_pair) + RPAR)
     t_array << Group(Literal('array') + LPAR + any_type + RPAR)
     t_map << Group(Literal('map') + LPAR + any_type + COMMA + any_type + RPAR)
 
-    return any_type.parseString(data_type)[0]
+    return any_type.parse_string(data_type)[0]
 
   def create_column(self, col_name, col_type):
     ''' Takes the output from parse_col_desc and creates the right column 
type. This
diff --git a/tests/custom_cluster/test_concurrent_ddls.py 
b/tests/custom_cluster/test_concurrent_ddls.py
index 8051d6e6a..4ab61a329 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -195,6 +195,7 @@ class TestConcurrentDdls(CustomClusterTestSuite):
         dump_server_stacktraces()
         assert False, "Timeout in thread run_ddls(%d)" % i
     stop = True
+    pool.terminate()
 
   @classmethod
   def is_transient_error(cls, err):
diff --git a/tests/custom_cluster/test_local_catalog.py 
b/tests/custom_cluster/test_local_catalog.py
index 63bac1eed..7f9dea5e9 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -496,6 +496,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
     NUM_ITERS = 100
     for i in t.imap_unordered(do_table, range(NUM_ITERS)):
       pass
+    t.terminate()
 
   @CustomClusterTestSuite.with_args(
       impalad_args="--use_local_catalog=true "
@@ -531,6 +532,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
         pass
       # Refresh to invalidate the partition in local catalog cache
       self.execute_query("refresh {0}.tbl 
partition(p=0)".format(unique_database))
+    t.terminate()
 
 
 class TestLocalCatalogObservability(CustomClusterTestSuite):
diff --git a/tests/custom_cluster/test_shell_interactive.py 
b/tests/custom_cluster/test_shell_interactive.py
index 7d9d45896..826462cb3 100644
--- a/tests/custom_cluster/test_shell_interactive.py
+++ b/tests/custom_cluster/test_shell_interactive.py
@@ -152,4 +152,5 @@ class TestShellInteractive(CustomClusterTestSuite):
     wait_for_query_state(vector, query, "RUNNING")
     self.cluster.impalads[
         randint(1, ImpalaTestSuite.get_impalad_cluster_size() - 1)].kill()
+    pool.terminate()
     return proc
diff --git a/tests/custom_cluster/test_topic_update_frequency.py 
b/tests/custom_cluster/test_topic_update_frequency.py
index 18428cb60..4f0ef21c4 100644
--- a/tests/custom_cluster/test_topic_update_frequency.py
+++ b/tests/custom_cluster/test_topic_update_frequency.py
@@ -133,6 +133,9 @@ class TestTopicUpdateFrequency(CustomClusterTestSuite):
     assert slow_query_future.get() > blocking_query_min_time, \
       "{0} query took less time than {1} msec".format(slow_blocking_query,
         blocking_query_min_time)
+    # Shutdown the thread pools
+    pool.terminate()
+    slow_query_pool.terminate()
     # we wait for some time here to make sure that the topic updates from the 
last
     # query have been propagated so that next run of this method starts from a 
clean
     # state.
@@ -177,6 +180,7 @@ class TestTopicUpdateFrequency(CustomClusterTestSuite):
       for i in range(len(durations)):
         assert durations[i] < timeout, "Query {0} iteration {1} did " \
                                        "not complete within {2}.".format(q, i, 
timeout)
+    slow_query_pool.terminate()
 
   def loop_exec(self, query, query_options, iterations=3, impalad=0):
     durations = []
@@ -254,3 +258,4 @@ class TestTopicUpdateFrequency(CustomClusterTestSuite):
     self.execute_query_expect_success(self.client, sync_ddl_query_2,
       {"sync_ddl": "false"})
     slow_query_future.get()
+    slow_query_pool.terminate()
diff --git a/tests/metadata/test_compute_stats.py 
b/tests/metadata/test_compute_stats.py
index 41763e5fa..ff056f4f5 100644
--- a/tests/metadata/test_compute_stats.py
+++ b/tests/metadata/test_compute_stats.py
@@ -439,7 +439,7 @@ class TestInvalidStatsFromHms(ImpalaTestSuite):
     tbl = unique_database + ".tbl"
     self.execute_query("create table {} as select 1 as id, 'aaa' as 
name".format(tbl))
     # Add invalid stats in HMS
-    hms_client, _ = ImpalaTestSuite.create_hive_client(9083)
+    hms_client, hms_transport = ImpalaTestSuite.create_hive_client(9083)
     cs = ColumnStatistics()
     cs.engine = "impala"
     isTblLevel = True
@@ -460,3 +460,4 @@ class TestInvalidStatsFromHms(ImpalaTestSuite):
     assert res.data == [
       'id\tTINYINT\t-1\t-1\t1\t1\t-1\t-1',
       'name\tSTRING\t-1\t-1\t-1\t-1\t-1\t-1']
+    hms_transport.close()
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 0c56dd3f4..dc0748ecd 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -582,6 +582,7 @@ class TestDdlStatements(TestDdlBase):
       except TimeoutError:
         dump_server_stacktraces()
         assert False, "Timeout in thread run_ddls(%d)" % i
+    pool.terminate()
 
   @SkipIfFS.hbase
   @UniqueDatabase.parametrize(sync_ddl=True)
diff --git a/tests/metadata/test_last_ddl_time_update.py 
b/tests/metadata/test_last_ddl_time_update.py
index a5e48c7fd..32bd7e11c 100644
--- a/tests/metadata/test_last_ddl_time_update.py
+++ b/tests/metadata/test_last_ddl_time_update.py
@@ -48,7 +48,7 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
   # Convenience class to make calls to TestLastDdlTimeUpdate.run_test() 
shorter by
   # storing common arguments as members and substituting table name and HDFS 
warehouse
   # path to the query string.
-  class TestHelper:
+  class Helper:
     class TimeState:
       CHANGED = "Changed"
       UNCHANGED = "Unchanged"
@@ -187,7 +187,7 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
           % fq_tbl_name)
 
   def _create_and_init_test_helper(self, unique_database, tbl_name, 
table_format):
-    helper = TestLastDdlTimeUpdate.TestHelper(self, unique_database, tbl_name)
+    helper = TestLastDdlTimeUpdate.Helper(self, unique_database, tbl_name)
     self._create_table(helper.fq_tbl_name, table_format)
 
     # compute statistics to fill table property impala.lastComputeStatsTime
diff --git a/tests/performance/query_executor.py 
b/tests/performance/query_executor.py
index bb47a40cb..5e5992fd9 100644
--- a/tests/performance/query_executor.py
+++ b/tests/performance/query_executor.py
@@ -41,7 +41,7 @@ LOG = logging.getLogger('query_executor')
 LOG.setLevel(level=logging.INFO)
 
 # Globals.
-hive_result_regex = 'Time taken: (\d*).(\d*) seconds'
+hive_result_regex = r'Time taken: (\d*).(\d*) seconds'
 # Match any CRUD statement that can follow EXPLAIN.
 # The statement may begin with SQL line comments starting with --
 COMMENT_LINES_REGEX = r'(?:\s*--.*\n)*'
diff --git a/tests/shell/test_shell_commandline.py 
b/tests/shell/test_shell_commandline.py
index 1443a9aa6..ff9fc3362 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -1107,6 +1107,7 @@ class TestImpalaShell(ImpalaTestSuite):
       finally:
         if impala_shell.poll() is None:
           impala_shell.kill()
+          impala_shell.wait()
         if connection is not None:
           connection.close()
 
@@ -1642,7 +1643,8 @@ class TestImpalaShell(ImpalaTestSuite):
     result = run_impala_shell_cmd(vector, args)
 
     stdout_data = result.stdout.strip()
-    rpc_file_data = open(tmp_file, "r").read().strip()
+    with open(tmp_file, "r") as f:
+      rpc_file_data = f.read().strip()
 
     # compare the rpc details from stdout and file to ensure they match
     # stdout contains additional output such as query results, remove all 
non-rpc details
diff --git a/tests/shell/test_shell_interactive.py 
b/tests/shell/test_shell_interactive.py
index b1c7bea31..d262491ce 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -148,6 +148,9 @@ def shutdown_server(server):
     server.httpd.shutdown()
   if server.http_server_thread is not None:
     server.http_server_thread.join()
+  # Shutdown stopped the thread's main loop, so now free the socket (which is 
not
+  # done by shutdown()).
+  server.httpd.server_close()
 
 
 @pytest.fixture
diff --git a/tests/stress/stress_util.py b/tests/stress/stress_util.py
index 6bd8faa1c..4dcda498d 100644
--- a/tests/stress/stress_util.py
+++ b/tests/stress/stress_util.py
@@ -42,6 +42,6 @@ class Task:
 def run_tasks(tasks, timeout_seconds=600):
   """Runs a list of Tasks in parallel in a thread pool."""
   start = time.time()
-  pool = ThreadPool(processes=len(tasks))
-  pool.map_async(Task.run, tasks).get(timeout_seconds)
+  with ThreadPool(processes=len(tasks)) as pool:
+    pool.map_async(Task.run, tasks).get(timeout_seconds)
   return time.time() - start
diff --git a/tests/util/auto_scaler.py b/tests/util/auto_scaler.py
index d7ee34f6e..bec6b1955 100755
--- a/tests/util/auto_scaler.py
+++ b/tests/util/auto_scaler.py
@@ -23,7 +23,7 @@ import time
 import logging
 import os
 import shlex
-from subprocess import check_call
+from subprocess import check_call, DEVNULL
 from tests.common.impala_cluster import ImpalaCluster
 from threading import Event, Thread
 
@@ -262,7 +262,7 @@ class AutoScaler(object):
     log_debug = logging.getLogger().getEffectiveLevel() == logging.DEBUG
     log_file = None
     if not log_debug:
-      log_file = open("/dev/null", "w")
+      log_file = DEVNULL
 
     check_call(cmd + options, close_fds=True, stdout=log_file, stderr=log_file)
 
diff --git a/tests/util/compute_table_stats.py 
b/tests/util/compute_table_stats.py
index 206a70479..cee2c70ee 100755
--- a/tests/util/compute_table_stats.py
+++ b/tests/util/compute_table_stats.py
@@ -115,6 +115,7 @@ def compute_stats(client_factory, db_names=None, 
table_names=None,
           log_completion(completed, total_tables, e)
           raise e
     log_completion(completed, total_tables)
+    pool.terminate()
 
 
 if __name__ == "__main__":

Reply via email to