This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 24611f7dba77723a805518a09ca6e334c4794796 Author: Joe McDonnell <[email protected]> AuthorDate: Tue Mar 3 10:48:35 2026 -0800 IMPALA-14776 (part 3): Fix misc resource leaks This fixes a variety of misc resource leaks and other warnings. This is purely about reducing the noise, so these are not fixing issues that would impact build stability. There are a few different categories: 1. Leaked open files - There are a couple locations that open /dev/null manually to use with subprocess. This switches them to subprocess.DEVNULL - Otherwise, use a context manager 2. Leaked thread pools - Manually call terminate at the end 3. Leaked processes - Add a wait() call after killing them 4. Warnings about deprecated pyparsing APIs - switched to the new API in db_connection.py 5. Warnings about invalid escapes - used raw strings for regex 6. Warnings from pytest failing to collect tests from TestLastDdlTimeUpdate's TestHelper - Renamed TestHelper to Helper so pytest doesn't try to find tests in it 7. A few minor socket leaks Testing: - Ran a core job Change-Id: Iee2c7733f477eadac7cde020b9aa47077985eb0d Reviewed-on: http://gerrit.cloudera.org:8080/24066 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Csaba Ringhofer <[email protected]> Reviewed-by: Michael Smith <[email protected]> --- tests/common/impala_test_suite.py | 2 +- tests/comparison/db_connection.py | 6 +++--- tests/custom_cluster/test_concurrent_ddls.py | 1 + tests/custom_cluster/test_local_catalog.py | 2 ++ tests/custom_cluster/test_shell_interactive.py | 1 + tests/custom_cluster/test_topic_update_frequency.py | 5 +++++ tests/metadata/test_compute_stats.py | 3 ++- tests/metadata/test_ddl.py | 1 + tests/metadata/test_last_ddl_time_update.py | 4 ++-- tests/performance/query_executor.py | 2 +- tests/shell/test_shell_commandline.py | 4 +++- tests/shell/test_shell_interactive.py | 3 +++ tests/stress/stress_util.py | 4 ++-- tests/util/auto_scaler.py | 4 ++-- tests/util/compute_table_stats.py | 1 + 15 files changed, 30 insertions(+), 13 deletions(-) diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 5232e8e29..c72be679e 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -1493,7 +1493,7 @@ class ImpalaTestSuite(BaseTestSuite): # is specified; explicitly make sure there's nothing to # read to avoid hanging, especially when running interactively # with py.test. - stdin=open("/dev/null"), + stdin=subprocess.DEVNULL, universal_newlines=True, env=env) (stdout, stderr) = call.communicate() diff --git a/tests/comparison/db_connection.py b/tests/comparison/db_connection.py index 0c2b60b8d..25d6e4f28 100644 --- a/tests/comparison/db_connection.py +++ b/tests/comparison/db_connection.py @@ -36,7 +36,7 @@ from logging import getLogger from os import symlink, unlink from pyparsing import ( alphanums, - delimitedList, + DelimitedList, Forward, Group, Literal, @@ -346,11 +346,11 @@ class DbCursor(object): struct_field_name = Word(alphanums + '_') struct_field_pair = Group(struct_field_name + COLON + any_type) - t_struct << Group(Literal('struct') + LPAR + delimitedList(struct_field_pair) + RPAR) + t_struct << Group(Literal('struct') + LPAR + DelimitedList(struct_field_pair) + RPAR) t_array << Group(Literal('array') + LPAR + any_type + RPAR) t_map << Group(Literal('map') + LPAR + any_type + COMMA + any_type + RPAR) - return any_type.parseString(data_type)[0] + return any_type.parse_string(data_type)[0] def create_column(self, col_name, col_type): ''' Takes the output from parse_col_desc and creates the right column type. This diff --git a/tests/custom_cluster/test_concurrent_ddls.py b/tests/custom_cluster/test_concurrent_ddls.py index 8051d6e6a..4ab61a329 100644 --- a/tests/custom_cluster/test_concurrent_ddls.py +++ b/tests/custom_cluster/test_concurrent_ddls.py @@ -195,6 +195,7 @@ class TestConcurrentDdls(CustomClusterTestSuite): dump_server_stacktraces() assert False, "Timeout in thread run_ddls(%d)" % i stop = True + pool.terminate() @classmethod def is_transient_error(cls, err): diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py index 63bac1eed..7f9dea5e9 100644 --- a/tests/custom_cluster/test_local_catalog.py +++ b/tests/custom_cluster/test_local_catalog.py @@ -496,6 +496,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite): NUM_ITERS = 100 for i in t.imap_unordered(do_table, range(NUM_ITERS)): pass + t.terminate() @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true " @@ -531,6 +532,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite): pass # Refresh to invalidate the partition in local catalog cache self.execute_query("refresh {0}.tbl partition(p=0)".format(unique_database)) + t.terminate() class TestLocalCatalogObservability(CustomClusterTestSuite): diff --git a/tests/custom_cluster/test_shell_interactive.py b/tests/custom_cluster/test_shell_interactive.py index 7d9d45896..826462cb3 100644 --- a/tests/custom_cluster/test_shell_interactive.py +++ b/tests/custom_cluster/test_shell_interactive.py @@ -152,4 +152,5 @@ class TestShellInteractive(CustomClusterTestSuite): wait_for_query_state(vector, query, "RUNNING") self.cluster.impalads[ randint(1, ImpalaTestSuite.get_impalad_cluster_size() - 1)].kill() + pool.terminate() return proc diff --git a/tests/custom_cluster/test_topic_update_frequency.py b/tests/custom_cluster/test_topic_update_frequency.py index 18428cb60..4f0ef21c4 100644 --- a/tests/custom_cluster/test_topic_update_frequency.py +++ b/tests/custom_cluster/test_topic_update_frequency.py @@ -133,6 +133,9 @@ class TestTopicUpdateFrequency(CustomClusterTestSuite): assert slow_query_future.get() > blocking_query_min_time, \ "{0} query took less time than {1} msec".format(slow_blocking_query, blocking_query_min_time) + # Shutdown the thread pools + pool.terminate() + slow_query_pool.terminate() # we wait for some time here to make sure that the topic updates from the last # query have been propagated so that next run of this method starts from a clean # state. @@ -177,6 +180,7 @@ class TestTopicUpdateFrequency(CustomClusterTestSuite): for i in range(len(durations)): assert durations[i] < timeout, "Query {0} iteration {1} did " \ "not complete within {2}.".format(q, i, timeout) + slow_query_pool.terminate() def loop_exec(self, query, query_options, iterations=3, impalad=0): durations = [] @@ -254,3 +258,4 @@ class TestTopicUpdateFrequency(CustomClusterTestSuite): self.execute_query_expect_success(self.client, sync_ddl_query_2, {"sync_ddl": "false"}) slow_query_future.get() + slow_query_pool.terminate() diff --git a/tests/metadata/test_compute_stats.py b/tests/metadata/test_compute_stats.py index 41763e5fa..ff056f4f5 100644 --- a/tests/metadata/test_compute_stats.py +++ b/tests/metadata/test_compute_stats.py @@ -439,7 +439,7 @@ class TestInvalidStatsFromHms(ImpalaTestSuite): tbl = unique_database + ".tbl" self.execute_query("create table {} as select 1 as id, 'aaa' as name".format(tbl)) # Add invalid stats in HMS - hms_client, _ = ImpalaTestSuite.create_hive_client(9083) + hms_client, hms_transport = ImpalaTestSuite.create_hive_client(9083) cs = ColumnStatistics() cs.engine = "impala" isTblLevel = True @@ -460,3 +460,4 @@ class TestInvalidStatsFromHms(ImpalaTestSuite): assert res.data == [ 'id\tTINYINT\t-1\t-1\t1\t1\t-1\t-1', 'name\tSTRING\t-1\t-1\t-1\t-1\t-1\t-1'] + hms_transport.close() diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index 0c56dd3f4..dc0748ecd 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -582,6 +582,7 @@ class TestDdlStatements(TestDdlBase): except TimeoutError: dump_server_stacktraces() assert False, "Timeout in thread run_ddls(%d)" % i + pool.terminate() @SkipIfFS.hbase @UniqueDatabase.parametrize(sync_ddl=True) diff --git a/tests/metadata/test_last_ddl_time_update.py b/tests/metadata/test_last_ddl_time_update.py index a5e48c7fd..32bd7e11c 100644 --- a/tests/metadata/test_last_ddl_time_update.py +++ b/tests/metadata/test_last_ddl_time_update.py @@ -48,7 +48,7 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite): # Convenience class to make calls to TestLastDdlTimeUpdate.run_test() shorter by # storing common arguments as members and substituting table name and HDFS warehouse # path to the query string. - class TestHelper: + class Helper: class TimeState: CHANGED = "Changed" UNCHANGED = "Unchanged" @@ -187,7 +187,7 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite): % fq_tbl_name) def _create_and_init_test_helper(self, unique_database, tbl_name, table_format): - helper = TestLastDdlTimeUpdate.TestHelper(self, unique_database, tbl_name) + helper = TestLastDdlTimeUpdate.Helper(self, unique_database, tbl_name) self._create_table(helper.fq_tbl_name, table_format) # compute statistics to fill table property impala.lastComputeStatsTime diff --git a/tests/performance/query_executor.py b/tests/performance/query_executor.py index bb47a40cb..5e5992fd9 100644 --- a/tests/performance/query_executor.py +++ b/tests/performance/query_executor.py @@ -41,7 +41,7 @@ LOG = logging.getLogger('query_executor') LOG.setLevel(level=logging.INFO) # Globals. -hive_result_regex = 'Time taken: (\d*).(\d*) seconds' +hive_result_regex = r'Time taken: (\d*).(\d*) seconds' # Match any CRUD statement that can follow EXPLAIN. # The statement may begin with SQL line comments starting with -- COMMENT_LINES_REGEX = r'(?:\s*--.*\n)*' diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py index 1443a9aa6..ff9fc3362 100644 --- a/tests/shell/test_shell_commandline.py +++ b/tests/shell/test_shell_commandline.py @@ -1107,6 +1107,7 @@ class TestImpalaShell(ImpalaTestSuite): finally: if impala_shell.poll() is None: impala_shell.kill() + impala_shell.wait() if connection is not None: connection.close() @@ -1642,7 +1643,8 @@ class TestImpalaShell(ImpalaTestSuite): result = run_impala_shell_cmd(vector, args) stdout_data = result.stdout.strip() - rpc_file_data = open(tmp_file, "r").read().strip() + with open(tmp_file, "r") as f: + rpc_file_data = f.read().strip() # compare the rpc details from stdout and file to ensure they match # stdout contains additional output such as query results, remove all non-rpc details diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py index b1c7bea31..d262491ce 100755 --- a/tests/shell/test_shell_interactive.py +++ b/tests/shell/test_shell_interactive.py @@ -148,6 +148,9 @@ def shutdown_server(server): server.httpd.shutdown() if server.http_server_thread is not None: server.http_server_thread.join() + # Shutdown stopped the thread's main loop, so now free the socket (which is not + # done by shutdown()). + server.httpd.server_close() @pytest.fixture diff --git a/tests/stress/stress_util.py b/tests/stress/stress_util.py index 6bd8faa1c..4dcda498d 100644 --- a/tests/stress/stress_util.py +++ b/tests/stress/stress_util.py @@ -42,6 +42,6 @@ class Task: def run_tasks(tasks, timeout_seconds=600): """Runs a list of Tasks in parallel in a thread pool.""" start = time.time() - pool = ThreadPool(processes=len(tasks)) - pool.map_async(Task.run, tasks).get(timeout_seconds) + with ThreadPool(processes=len(tasks)) as pool: + pool.map_async(Task.run, tasks).get(timeout_seconds) return time.time() - start diff --git a/tests/util/auto_scaler.py b/tests/util/auto_scaler.py index d7ee34f6e..bec6b1955 100755 --- a/tests/util/auto_scaler.py +++ b/tests/util/auto_scaler.py @@ -23,7 +23,7 @@ import time import logging import os import shlex -from subprocess import check_call +from subprocess import check_call, DEVNULL from tests.common.impala_cluster import ImpalaCluster from threading import Event, Thread @@ -262,7 +262,7 @@ class AutoScaler(object): log_debug = logging.getLogger().getEffectiveLevel() == logging.DEBUG log_file = None if not log_debug: - log_file = open("/dev/null", "w") + log_file = DEVNULL check_call(cmd + options, close_fds=True, stdout=log_file, stderr=log_file) diff --git a/tests/util/compute_table_stats.py b/tests/util/compute_table_stats.py index 206a70479..cee2c70ee 100755 --- a/tests/util/compute_table_stats.py +++ b/tests/util/compute_table_stats.py @@ -115,6 +115,7 @@ def compute_stats(client_factory, db_names=None, table_names=None, log_completion(completed, total_tables, e) raise e log_completion(completed, total_tables) + pool.terminate() if __name__ == "__main__":
