This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 79e2ab998 IMPALA-14776 (part 2): Fix miscellaneous socket leaks
79e2ab998 is described below

commit 79e2ab998c9c12259f5b671050e7dd610c0f9e16
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed Feb 25 21:29:06 2026 -0800

    IMPALA-14776 (part 2): Fix miscellaneous socket leaks
    
    There are various sockets leaks that are not associated
    with Impala clients:
     - Users of urlopen don't auto-close the socket
     - Direct use of sockets don't clean up the socket
     - Various tests that spin up test servers (e.g. statestore tests)
       don't close clients or server sockets.
    This fixes those various locations either with manual clean up
    or with-as constructs. For urlopen, this simplifies the import to
    always use the Python 3 package location, dropping support for
    Python 2.
    
    Testing:
     - Ran tests locally and verified that they stopped leaking
       sockets
    
    Change-Id: I8fd54fad18d6dbfb0f24ebd910f45cfed76c340a
    Reviewed-on: http://gerrit.cloudera.org:8080/24061
    Reviewed-by: Michael Smith <[email protected]>
    Reviewed-by: Yida Wu <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/benchmark/report_benchmark_results.py    | 12 ++++--------
 tests/common/iceberg_rest_server.py            | 18 ++++++++----------
 tests/common/network.py                        | 12 ++++++------
 tests/custom_cluster/test_custom_statestore.py |  4 ++++
 tests/custom_cluster/test_hs2.py               |  1 +
 tests/custom_cluster/test_krpc_options.py      | 10 ++++------
 tests/custom_cluster/test_metastore_service.py | 10 ++++++++++
 tests/custom_cluster/test_statestored_ha.py    |  2 ++
 tests/hs2/test_hs2.py                          | 14 ++++++--------
 tests/hs2/test_json_endpoints.py               | 12 ++++--------
 tests/statestore/test_statestore.py            | 19 +++++++++++--------
 11 files changed, 60 insertions(+), 54 deletions(-)

diff --git a/tests/benchmark/report_benchmark_results.py 
b/tests/benchmark/report_benchmark_results.py
index 9df147b64..3daa76f6e 100755
--- a/tests/benchmark/report_benchmark_results.py
+++ b/tests/benchmark/report_benchmark_results.py
@@ -40,6 +40,7 @@ import re
 from collections import defaultdict
 from datetime import date
 from optparse import OptionParser
+from urllib.request import Request, urlopen
 from tests.util.calculation_util import (
     calculate_tval, calculate_avg, calculate_stddev, calculate_geomean, 
calculate_mwu)
 
@@ -239,17 +240,12 @@ def all_query_results(grouped):
 
 
 def get_commit_date(commit_sha):
-  try:
-    from urllib.request import Request, urlopen
-  except ImportError:
-    from urllib2 import Request, urlopen
-
   url = 'https://api.github.com/repos/apache/impala/commits/' + commit_sha
   try:
     request = Request(url)
-    response = urlopen(request).read()
-    data = json.loads(response.decode('utf8'))
-    return data['commit']['committer']['date'][:10]
+    with urlopen(request).read() as response:
+      data = json.loads(response.decode('utf8'))
+      return data['commit']['committer']['date'][:10]
   except Exception:
     return ''
 
diff --git a/tests/common/iceberg_rest_server.py 
b/tests/common/iceberg_rest_server.py
index 84bbad290..fa8e8412d 100644
--- a/tests/common/iceberg_rest_server.py
+++ b/tests/common/iceberg_rest_server.py
@@ -75,11 +75,10 @@ class IcebergRestServer(object):
     sleep_interval_s = 0.5
     start_time = time.time()
     while time.time() - start_time < timeout_s:
-      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-      if s.connect_ex(('localhost', self.port)) == 0:
-        LOG.info("Iceberg REST server is available.")
-        return
-      s.close()
+      with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+        if s.connect_ex(('localhost', self.port)) == 0:
+          LOG.info("Iceberg REST server is available.")
+          return
       time.sleep(sleep_interval_s)
     raise Exception("Webserver did not become available within {} "
         "seconds.".format(timeout_s))
@@ -88,11 +87,10 @@ class IcebergRestServer(object):
     sleep_interval_s = 0.5
     start_time = time.time()
     while time.time() - start_time < timeout_s:
-      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-      if s.connect_ex(('localhost', self.port)) != 0:
-        LOG.info("Iceberg REST server has stopped.")
-        return
-      s.close()
+      with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+        if s.connect_ex(('localhost', self.port)) != 0:
+          LOG.info("Iceberg REST server has stopped.")
+          return
       time.sleep(sleep_interval_s)
     # Let's not throw an exception as this is typically invoked during 
cleanup, and we
     # want the rest of the cleanup code to be executed.
diff --git a/tests/common/network.py b/tests/common/network.py
index 924307cf0..c58baa9b4 100644
--- a/tests/common/network.py
+++ b/tests/common/network.py
@@ -26,12 +26,12 @@ from tests.common.environ import IS_REDHAT_DERIVATIVE
 # Retrieves the host external IP rather than localhost/127.0.0.1 so we have an 
IP that
 # Impala will consider distinct from storage backends to force remote 
scheduling.
 def get_external_ip():
-  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-  s.settimeout(0)
-  # This address is used to get the networking stack to identify a return IP 
address.
-  # Timeout=0 means it doesn't need to resolve.
-  s.connect(('10.254.254.254', 1))
-  return s.getsockname()[0]
+  with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
+    s.settimeout(0)
+    # This address is used to get the networking stack to identify a return IP 
address.
+    # Timeout=0 means it doesn't need to resolve.
+    s.connect(('10.254.254.254', 1))
+    return s.getsockname()[0]
 
 
 def split_host_port(host_port):
diff --git a/tests/custom_cluster/test_custom_statestore.py 
b/tests/custom_cluster/test_custom_statestore.py
index 4a701d8cd..1276324ac 100644
--- a/tests/custom_cluster/test_custom_statestore.py
+++ b/tests/custom_cluster/test_custom_statestore.py
@@ -71,6 +71,8 @@ class TestCustomStatestore(CustomClusterTestSuite):
       # supported. Exception "Invalid method name: 'GetProtocolVersion'" is 
thrown
       # by Thrift client.
       assert False, str(e)
+    finally:
+      client_transport.close()
 
   def __register_subscriber(self, 
ss_protocol=Subscriber.StatestoreServiceVersion.V2,
                             in_v2_format=True, expect_exception=False):
@@ -98,6 +100,8 @@ class TestCustomStatestore(CustomClusterTestSuite):
       return response
     except Exception as e:
       assert expect_exception, str(e)
+    finally:
+      client_transport.close()
 
   
@CustomClusterTestSuite.with_args(statestored_args="-statestore_max_subscribers=3")
   def test_statestore_max_subscribers(self):
diff --git a/tests/custom_cluster/test_hs2.py b/tests/custom_cluster/test_hs2.py
index e7d24a6d3..c5de5f7b4 100644
--- a/tests/custom_cluster/test_hs2.py
+++ b/tests/custom_cluster/test_hs2.py
@@ -66,6 +66,7 @@ class TestHS2(CustomClusterTestSuite):
     # Run another query, which should fail since the session is closed.
     conn.execute_statement("select 2", expected_error_prefix="Invalid session 
id",
         expected_status_code=TCLIService.TStatusCode.ERROR_STATUS)
+    conn.teardown()
 
     # Check that the query was cancelled correctly.
     query_id = 
operation_id_to_query_id(execute_resp.operationHandle.operationId)
diff --git a/tests/custom_cluster/test_krpc_options.py 
b/tests/custom_cluster/test_krpc_options.py
index edbdec53b..6cd9597cf 100644
--- a/tests/custom_cluster/test_krpc_options.py
+++ b/tests/custom_cluster/test_krpc_options.py
@@ -39,9 +39,7 @@ class TestKrpcOptions(CustomClusterTestSuite):
     self.client.execute("select min(int_col) from functional_parquet.alltypes")
 
     # Check that we can connect on multiple interfaces.
-    sock = socket.socket()
-    sock.connect(("127.0.0.1", DEFAULT_KRPC_PORT))
-    sock.close()
-    sock = socket.socket()
-    sock.connect((socket.gethostname(), DEFAULT_KRPC_PORT))
-    sock.close()
+    with socket.socket() as sock:
+      sock.connect(("127.0.0.1", DEFAULT_KRPC_PORT))
+    with socket.socket() as sock:
+      sock.connect((socket.gethostname(), DEFAULT_KRPC_PORT))
diff --git a/tests/custom_cluster/test_metastore_service.py 
b/tests/custom_cluster/test_metastore_service.py
index 1128bc7b3..2ad54ff30 100644
--- a/tests/custom_cluster/test_metastore_service.py
+++ b/tests/custom_cluster/test_metastore_service.py
@@ -131,6 +131,7 @@ class TestMetastoreService(CustomClusterTestSuite):
             if catalog_hms_client is not None:
                 catalog_hms_client.drop_database(db_name, True, True)
                 catalog_hms_client.shutdown()
+                hive_transport.close()
 
     @pytest.mark.execute_serially
     @CustomClusterTestSuite.with_args(
@@ -229,6 +230,7 @@ class TestMetastoreService(CustomClusterTestSuite):
       finally:
         if catalog_hms_client is not None:
             catalog_hms_client.shutdown()
+            hive_transport.close()
         if self.__get_database_no_throw(db_name) is not None:
           self.hive_client.drop_database(db_name, True, True)
 
@@ -378,6 +380,7 @@ class TestMetastoreService(CustomClusterTestSuite):
       finally:
         if catalog_hms_client is not None:
           catalog_hms_client.shutdown()
+          hive_transport.close()
         if self.__get_database_no_throw(db_name) is not None:
           self.hive_client.drop_database(db_name, True, True)
         if self.__get_database_no_throw(new_db_name) is not None:
@@ -403,6 +406,7 @@ class TestMetastoreService(CustomClusterTestSuite):
       finally:
         if catalog_hms_client is not None:
           catalog_hms_client.shutdown()
+          hive_transport.close()
 
     @pytest.mark.execute_serially
     @CustomClusterTestSuite.with_args(
@@ -430,6 +434,7 @@ class TestMetastoreService(CustomClusterTestSuite):
       finally:
         if catalog_client is not None:
           catalog_client.shutdown()
+          hive_transport.close()
 
     @pytest.mark.execute_serially
     @CustomClusterTestSuite.with_args(
@@ -688,6 +693,7 @@ class TestMetastoreService(CustomClusterTestSuite):
         finally:
             if catalog_hms_client is not None:
                 catalog_hms_client.shutdown()
+                hive_transport.close()
             if self.__get_database_no_throw(db_name) is not None:
                 self.hive_client.drop_database(db_name, True, True)
 
@@ -786,6 +792,7 @@ class TestMetastoreService(CustomClusterTestSuite):
         finally:
             if catalog_hms_client is not None:
                 catalog_hms_client.shutdown()
+                hive_transport.close()
             if self.__get_database_no_throw(db_name) is not None:
                 self.hive_client.drop_database(db_name, True, True)
 
@@ -913,6 +920,7 @@ class TestMetastoreService(CustomClusterTestSuite):
         finally:
             if catalog_hms_client is not None:
                 catalog_hms_client.shutdown()
+                hive_transport.close()
             if self.__get_database_no_throw(db_name) is not None:
                 self.hive_client.drop_database(db_name, True, True)
 
@@ -1002,6 +1010,7 @@ class TestMetastoreService(CustomClusterTestSuite):
         finally:
             if catalog_hms_client is not None:
                 catalog_hms_client.shutdown()
+                hive_transport.close()
             if self.__get_database_no_throw(db_name) is not None:
                 self.hive_client.drop_database(db_name, True, True)
 
@@ -1061,6 +1070,7 @@ class TestMetastoreService(CustomClusterTestSuite):
         finally:
             if catalog_hms_client is not None:
                 catalog_hms_client.shutdown()
+                hive_transport.close()
 
     def __create_test_tbls_from_hive(self, db_name):
       """Util method to create test tables from hive in the given database. It 
creates
diff --git a/tests/custom_cluster/test_statestored_ha.py 
b/tests/custom_cluster/test_statestored_ha.py
index 9f5f747c1..f10f55673 100644
--- a/tests/custom_cluster/test_statestored_ha.py
+++ b/tests/custom_cluster/test_statestored_ha.py
@@ -58,6 +58,8 @@ class TestStatestoredHA(CustomClusterTestSuite):
       return client.SetStatestoreDebugAction(request)
     except Exception as e:
       assert False, str(e)
+    finally:
+      client_transport.close()
 
   # Return port of the active catalogd of statestore
   def __get_active_catalogd_port(self, statestore_service):
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index 2ca89e39f..b42da858d 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -25,6 +25,7 @@ import logging
 import random
 import threading
 import time
+from urllib.request import urlopen
 import uuid
 
 from builtins import range
@@ -44,11 +45,6 @@ from tests.hs2.hs2_test_suite import (
     operation_id_to_query_id,
 )
 
-try:
-  from urllib.request import urlopen
-except ImportError:
-  from urllib2 import urlopen
-
 LOG = logging.getLogger('test_hs2')
 
 SQLSTATE_GENERAL_ERROR = "HY000"
@@ -185,9 +181,9 @@ class TestHS2(HS2TestSuite):
     with ScopedSession(self.hs2_client) as session:
       TestHS2.check_response(session)
       http_addr = session.configuration['http_addr']
-      resp = urlopen("http://%s/queries?json"; % http_addr)
-      assert resp.msg == 'OK'
-      queries_json = json.loads(resp.read())
+      with urlopen("http://%s/queries?json"; % http_addr) as resp:
+        assert resp.msg == 'OK'
+        queries_json = json.loads(resp.read())
       assert 'completed_queries' in queries_json
       assert 'in_flight_queries' in queries_json
 
@@ -1356,6 +1352,8 @@ class TestHS2(HS2TestSuite):
     rows = cursor.fetchall()
     assert rows == [(1,)]
     profile = cursor.get_profile()
+    cursor.close()
+    impyla_conn.close()
     assert profile is not None
     assert "Http Origin: value1" in profile
     assert "Http Origin: value2" not in profile
diff --git a/tests/hs2/test_json_endpoints.py b/tests/hs2/test_json_endpoints.py
index 30c0c1fff..f611f2d0b 100644
--- a/tests/hs2/test_json_endpoints.py
+++ b/tests/hs2/test_json_endpoints.py
@@ -20,6 +20,7 @@
 from __future__ import absolute_import, division, print_function
 import json
 from time import time
+from urllib.request import urlopen
 
 import pytest
 
@@ -28,11 +29,6 @@ from tests.common.environ import IS_DOCKERIZED_TEST_CLUSTER
 from tests.common.impala_cluster import ImpalaCluster
 from tests.hs2.hs2_test_suite import HS2TestSuite
 
-try:
-  from urllib.request import urlopen
-except ImportError:
-  from urllib2 import urlopen
-
 
 class TestJsonEndpoints(HS2TestSuite):
   def _get_json_queries(self, http_addr):
@@ -42,9 +38,9 @@ class TestJsonEndpoints(HS2TestSuite):
       cluster = ImpalaCluster.get_e2e_test_cluster()
       return cluster.impalads[0].service.get_debug_webpage_json("/queries")
     else:
-      resp = urlopen("http://%s/queries?json"; % http_addr)
-      assert resp.msg == 'OK'
-      return json.loads(resp.read())
+      with urlopen("http://%s/queries?json"; % http_addr) as resp:
+        assert resp.msg == 'OK'
+        return json.loads(resp.read())
 
   @pytest.mark.execute_serially
   def test_waiting_in_flight_queries(self):
diff --git a/tests/statestore/test_statestore.py 
b/tests/statestore/test_statestore.py
index 3661d09d1..8ea3b93c5 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -24,6 +24,7 @@ import sys
 import threading
 import time
 import traceback
+from urllib.request import urlopen
 import uuid
 
 import pytest
@@ -46,10 +47,6 @@ from tests.common.base_test_suite import BaseTestSuite
 from tests.common.environ import build_flavor_timeout
 from tests.common.skip import SkipIfDockerizedCluster
 
-try:
-  from urllib.request import urlopen
-except ImportError:
-  from urllib2 import urlopen
 
 LOG = logging.getLogger('test_statestore')
 
@@ -71,9 +68,9 @@ LOG = logging.getLogger('test_statestore')
 
 
 def get_statestore_subscribers(host='localhost', port=25010):
-  response = urlopen("http://{0}:{1}/subscribers?json".format(host, port))
-  page = response.read()
-  return json.loads(page)
+  with urlopen("http://{0}:{1}/subscribers?json".format(host, port)) as 
response:
+    page = response.read()
+    return json.loads(page)
 
 
 STATUS_OK = TStatus(TErrorCode.OK)
@@ -135,6 +132,8 @@ class KillableThreadedServer(TServer):
         return
       except Exception:
         if i == num_tries - 1: raise
+      finally:
+        cnxn.close()
       time.sleep(0.5)
 
   def wait_until_down(self, num_tries=10):
@@ -145,6 +144,8 @@ class KillableThreadedServer(TServer):
       except Exception:
         LOG.info('Server localhost:{} is down'.format(cnxn.port))
         return
+      finally:
+        cnxn.close()
       time.sleep(0.5)
     raise Exception("Server localhost:{} did not stop".format(cnxn.port))
 
@@ -154,7 +155,9 @@ class KillableThreadedServer(TServer):
       client = self.serverTransport.accept()
       # Since accept() can take a while, check again if the server is shutdown 
to avoid
       # starting an unnecessary thread.
-      if self.is_shutdown: return
+      if self.is_shutdown:
+        client.close()
+        return
       t = None
       if sys.version_info.major < 3:
         t = threading.Thread(target=self.handle, args=(client,))

Reply via email to