This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 79e2ab998 IMPALA-14776 (part 2): Fix miscellaneous socket leaks
79e2ab998 is described below
commit 79e2ab998c9c12259f5b671050e7dd610c0f9e16
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed Feb 25 21:29:06 2026 -0800
IMPALA-14776 (part 2): Fix miscellaneous socket leaks
There are various sockets leaks that are not associated
with Impala clients:
- Users of urlopen don't auto-close the socket
- Direct use of sockets don't clean up the socket
- Various tests that spin up test servers (e.g. statestore tests)
don't close clients or server sockets.
This fixes those various locations either with manual clean up
or with-as constructs. For urlopen, this simplifies the import to
always use the Python 3 package location, dropping support for
Python 2.
Testing:
- Ran tests locally and verified that they stopped leaking
sockets
Change-Id: I8fd54fad18d6dbfb0f24ebd910f45cfed76c340a
Reviewed-on: http://gerrit.cloudera.org:8080/24061
Reviewed-by: Michael Smith <[email protected]>
Reviewed-by: Yida Wu <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
tests/benchmark/report_benchmark_results.py | 12 ++++--------
tests/common/iceberg_rest_server.py | 18 ++++++++----------
tests/common/network.py | 12 ++++++------
tests/custom_cluster/test_custom_statestore.py | 4 ++++
tests/custom_cluster/test_hs2.py | 1 +
tests/custom_cluster/test_krpc_options.py | 10 ++++------
tests/custom_cluster/test_metastore_service.py | 10 ++++++++++
tests/custom_cluster/test_statestored_ha.py | 2 ++
tests/hs2/test_hs2.py | 14 ++++++--------
tests/hs2/test_json_endpoints.py | 12 ++++--------
tests/statestore/test_statestore.py | 19 +++++++++++--------
11 files changed, 60 insertions(+), 54 deletions(-)
diff --git a/tests/benchmark/report_benchmark_results.py
b/tests/benchmark/report_benchmark_results.py
index 9df147b64..3daa76f6e 100755
--- a/tests/benchmark/report_benchmark_results.py
+++ b/tests/benchmark/report_benchmark_results.py
@@ -40,6 +40,7 @@ import re
from collections import defaultdict
from datetime import date
from optparse import OptionParser
+from urllib.request import Request, urlopen
from tests.util.calculation_util import (
calculate_tval, calculate_avg, calculate_stddev, calculate_geomean,
calculate_mwu)
@@ -239,17 +240,12 @@ def all_query_results(grouped):
def get_commit_date(commit_sha):
- try:
- from urllib.request import Request, urlopen
- except ImportError:
- from urllib2 import Request, urlopen
-
url = 'https://api.github.com/repos/apache/impala/commits/' + commit_sha
try:
request = Request(url)
- response = urlopen(request).read()
- data = json.loads(response.decode('utf8'))
- return data['commit']['committer']['date'][:10]
+ with urlopen(request).read() as response:
+ data = json.loads(response.decode('utf8'))
+ return data['commit']['committer']['date'][:10]
except Exception:
return ''
diff --git a/tests/common/iceberg_rest_server.py
b/tests/common/iceberg_rest_server.py
index 84bbad290..fa8e8412d 100644
--- a/tests/common/iceberg_rest_server.py
+++ b/tests/common/iceberg_rest_server.py
@@ -75,11 +75,10 @@ class IcebergRestServer(object):
sleep_interval_s = 0.5
start_time = time.time()
while time.time() - start_time < timeout_s:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if s.connect_ex(('localhost', self.port)) == 0:
- LOG.info("Iceberg REST server is available.")
- return
- s.close()
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ if s.connect_ex(('localhost', self.port)) == 0:
+ LOG.info("Iceberg REST server is available.")
+ return
time.sleep(sleep_interval_s)
raise Exception("Webserver did not become available within {} "
"seconds.".format(timeout_s))
@@ -88,11 +87,10 @@ class IcebergRestServer(object):
sleep_interval_s = 0.5
start_time = time.time()
while time.time() - start_time < timeout_s:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if s.connect_ex(('localhost', self.port)) != 0:
- LOG.info("Iceberg REST server has stopped.")
- return
- s.close()
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ if s.connect_ex(('localhost', self.port)) != 0:
+ LOG.info("Iceberg REST server has stopped.")
+ return
time.sleep(sleep_interval_s)
# Let's not throw an exception as this is typically invoked during
cleanup, and we
# want the rest of the cleanup code to be executed.
diff --git a/tests/common/network.py b/tests/common/network.py
index 924307cf0..c58baa9b4 100644
--- a/tests/common/network.py
+++ b/tests/common/network.py
@@ -26,12 +26,12 @@ from tests.common.environ import IS_REDHAT_DERIVATIVE
# Retrieves the host external IP rather than localhost/127.0.0.1 so we have an
IP that
# Impala will consider distinct from storage backends to force remote
scheduling.
def get_external_ip():
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.settimeout(0)
- # This address is used to get the networking stack to identify a return IP
address.
- # Timeout=0 means it doesn't need to resolve.
- s.connect(('10.254.254.254', 1))
- return s.getsockname()[0]
+ with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
+ s.settimeout(0)
+ # This address is used to get the networking stack to identify a return IP
address.
+ # Timeout=0 means it doesn't need to resolve.
+ s.connect(('10.254.254.254', 1))
+ return s.getsockname()[0]
def split_host_port(host_port):
diff --git a/tests/custom_cluster/test_custom_statestore.py
b/tests/custom_cluster/test_custom_statestore.py
index 4a701d8cd..1276324ac 100644
--- a/tests/custom_cluster/test_custom_statestore.py
+++ b/tests/custom_cluster/test_custom_statestore.py
@@ -71,6 +71,8 @@ class TestCustomStatestore(CustomClusterTestSuite):
# supported. Exception "Invalid method name: 'GetProtocolVersion'" is
thrown
# by Thrift client.
assert False, str(e)
+ finally:
+ client_transport.close()
def __register_subscriber(self,
ss_protocol=Subscriber.StatestoreServiceVersion.V2,
in_v2_format=True, expect_exception=False):
@@ -98,6 +100,8 @@ class TestCustomStatestore(CustomClusterTestSuite):
return response
except Exception as e:
assert expect_exception, str(e)
+ finally:
+ client_transport.close()
@CustomClusterTestSuite.with_args(statestored_args="-statestore_max_subscribers=3")
def test_statestore_max_subscribers(self):
diff --git a/tests/custom_cluster/test_hs2.py b/tests/custom_cluster/test_hs2.py
index e7d24a6d3..c5de5f7b4 100644
--- a/tests/custom_cluster/test_hs2.py
+++ b/tests/custom_cluster/test_hs2.py
@@ -66,6 +66,7 @@ class TestHS2(CustomClusterTestSuite):
# Run another query, which should fail since the session is closed.
conn.execute_statement("select 2", expected_error_prefix="Invalid session
id",
expected_status_code=TCLIService.TStatusCode.ERROR_STATUS)
+ conn.teardown()
# Check that the query was cancelled correctly.
query_id =
operation_id_to_query_id(execute_resp.operationHandle.operationId)
diff --git a/tests/custom_cluster/test_krpc_options.py
b/tests/custom_cluster/test_krpc_options.py
index edbdec53b..6cd9597cf 100644
--- a/tests/custom_cluster/test_krpc_options.py
+++ b/tests/custom_cluster/test_krpc_options.py
@@ -39,9 +39,7 @@ class TestKrpcOptions(CustomClusterTestSuite):
self.client.execute("select min(int_col) from functional_parquet.alltypes")
# Check that we can connect on multiple interfaces.
- sock = socket.socket()
- sock.connect(("127.0.0.1", DEFAULT_KRPC_PORT))
- sock.close()
- sock = socket.socket()
- sock.connect((socket.gethostname(), DEFAULT_KRPC_PORT))
- sock.close()
+ with socket.socket() as sock:
+ sock.connect(("127.0.0.1", DEFAULT_KRPC_PORT))
+ with socket.socket() as sock:
+ sock.connect((socket.gethostname(), DEFAULT_KRPC_PORT))
diff --git a/tests/custom_cluster/test_metastore_service.py
b/tests/custom_cluster/test_metastore_service.py
index 1128bc7b3..2ad54ff30 100644
--- a/tests/custom_cluster/test_metastore_service.py
+++ b/tests/custom_cluster/test_metastore_service.py
@@ -131,6 +131,7 @@ class TestMetastoreService(CustomClusterTestSuite):
if catalog_hms_client is not None:
catalog_hms_client.drop_database(db_name, True, True)
catalog_hms_client.shutdown()
+ hive_transport.close()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -229,6 +230,7 @@ class TestMetastoreService(CustomClusterTestSuite):
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
+ hive_transport.close()
if self.__get_database_no_throw(db_name) is not None:
self.hive_client.drop_database(db_name, True, True)
@@ -378,6 +380,7 @@ class TestMetastoreService(CustomClusterTestSuite):
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
+ hive_transport.close()
if self.__get_database_no_throw(db_name) is not None:
self.hive_client.drop_database(db_name, True, True)
if self.__get_database_no_throw(new_db_name) is not None:
@@ -403,6 +406,7 @@ class TestMetastoreService(CustomClusterTestSuite):
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
+ hive_transport.close()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -430,6 +434,7 @@ class TestMetastoreService(CustomClusterTestSuite):
finally:
if catalog_client is not None:
catalog_client.shutdown()
+ hive_transport.close()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -688,6 +693,7 @@ class TestMetastoreService(CustomClusterTestSuite):
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
+ hive_transport.close()
if self.__get_database_no_throw(db_name) is not None:
self.hive_client.drop_database(db_name, True, True)
@@ -786,6 +792,7 @@ class TestMetastoreService(CustomClusterTestSuite):
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
+ hive_transport.close()
if self.__get_database_no_throw(db_name) is not None:
self.hive_client.drop_database(db_name, True, True)
@@ -913,6 +920,7 @@ class TestMetastoreService(CustomClusterTestSuite):
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
+ hive_transport.close()
if self.__get_database_no_throw(db_name) is not None:
self.hive_client.drop_database(db_name, True, True)
@@ -1002,6 +1010,7 @@ class TestMetastoreService(CustomClusterTestSuite):
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
+ hive_transport.close()
if self.__get_database_no_throw(db_name) is not None:
self.hive_client.drop_database(db_name, True, True)
@@ -1061,6 +1070,7 @@ class TestMetastoreService(CustomClusterTestSuite):
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
+ hive_transport.close()
def __create_test_tbls_from_hive(self, db_name):
"""Util method to create test tables from hive in the given database. It
creates
diff --git a/tests/custom_cluster/test_statestored_ha.py
b/tests/custom_cluster/test_statestored_ha.py
index 9f5f747c1..f10f55673 100644
--- a/tests/custom_cluster/test_statestored_ha.py
+++ b/tests/custom_cluster/test_statestored_ha.py
@@ -58,6 +58,8 @@ class TestStatestoredHA(CustomClusterTestSuite):
return client.SetStatestoreDebugAction(request)
except Exception as e:
assert False, str(e)
+ finally:
+ client_transport.close()
# Return port of the active catalogd of statestore
def __get_active_catalogd_port(self, statestore_service):
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index 2ca89e39f..b42da858d 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -25,6 +25,7 @@ import logging
import random
import threading
import time
+from urllib.request import urlopen
import uuid
from builtins import range
@@ -44,11 +45,6 @@ from tests.hs2.hs2_test_suite import (
operation_id_to_query_id,
)
-try:
- from urllib.request import urlopen
-except ImportError:
- from urllib2 import urlopen
-
LOG = logging.getLogger('test_hs2')
SQLSTATE_GENERAL_ERROR = "HY000"
@@ -185,9 +181,9 @@ class TestHS2(HS2TestSuite):
with ScopedSession(self.hs2_client) as session:
TestHS2.check_response(session)
http_addr = session.configuration['http_addr']
- resp = urlopen("http://%s/queries?json" % http_addr)
- assert resp.msg == 'OK'
- queries_json = json.loads(resp.read())
+ with urlopen("http://%s/queries?json" % http_addr) as resp:
+ assert resp.msg == 'OK'
+ queries_json = json.loads(resp.read())
assert 'completed_queries' in queries_json
assert 'in_flight_queries' in queries_json
@@ -1356,6 +1352,8 @@ class TestHS2(HS2TestSuite):
rows = cursor.fetchall()
assert rows == [(1,)]
profile = cursor.get_profile()
+ cursor.close()
+ impyla_conn.close()
assert profile is not None
assert "Http Origin: value1" in profile
assert "Http Origin: value2" not in profile
diff --git a/tests/hs2/test_json_endpoints.py b/tests/hs2/test_json_endpoints.py
index 30c0c1fff..f611f2d0b 100644
--- a/tests/hs2/test_json_endpoints.py
+++ b/tests/hs2/test_json_endpoints.py
@@ -20,6 +20,7 @@
from __future__ import absolute_import, division, print_function
import json
from time import time
+from urllib.request import urlopen
import pytest
@@ -28,11 +29,6 @@ from tests.common.environ import IS_DOCKERIZED_TEST_CLUSTER
from tests.common.impala_cluster import ImpalaCluster
from tests.hs2.hs2_test_suite import HS2TestSuite
-try:
- from urllib.request import urlopen
-except ImportError:
- from urllib2 import urlopen
-
class TestJsonEndpoints(HS2TestSuite):
def _get_json_queries(self, http_addr):
@@ -42,9 +38,9 @@ class TestJsonEndpoints(HS2TestSuite):
cluster = ImpalaCluster.get_e2e_test_cluster()
return cluster.impalads[0].service.get_debug_webpage_json("/queries")
else:
- resp = urlopen("http://%s/queries?json" % http_addr)
- assert resp.msg == 'OK'
- return json.loads(resp.read())
+ with urlopen("http://%s/queries?json" % http_addr) as resp:
+ assert resp.msg == 'OK'
+ return json.loads(resp.read())
@pytest.mark.execute_serially
def test_waiting_in_flight_queries(self):
diff --git a/tests/statestore/test_statestore.py
b/tests/statestore/test_statestore.py
index 3661d09d1..8ea3b93c5 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -24,6 +24,7 @@ import sys
import threading
import time
import traceback
+from urllib.request import urlopen
import uuid
import pytest
@@ -46,10 +47,6 @@ from tests.common.base_test_suite import BaseTestSuite
from tests.common.environ import build_flavor_timeout
from tests.common.skip import SkipIfDockerizedCluster
-try:
- from urllib.request import urlopen
-except ImportError:
- from urllib2 import urlopen
LOG = logging.getLogger('test_statestore')
@@ -71,9 +68,9 @@ LOG = logging.getLogger('test_statestore')
def get_statestore_subscribers(host='localhost', port=25010):
- response = urlopen("http://{0}:{1}/subscribers?json".format(host, port))
- page = response.read()
- return json.loads(page)
+ with urlopen("http://{0}:{1}/subscribers?json".format(host, port)) as
response:
+ page = response.read()
+ return json.loads(page)
STATUS_OK = TStatus(TErrorCode.OK)
@@ -135,6 +132,8 @@ class KillableThreadedServer(TServer):
return
except Exception:
if i == num_tries - 1: raise
+ finally:
+ cnxn.close()
time.sleep(0.5)
def wait_until_down(self, num_tries=10):
@@ -145,6 +144,8 @@ class KillableThreadedServer(TServer):
except Exception:
LOG.info('Server localhost:{} is down'.format(cnxn.port))
return
+ finally:
+ cnxn.close()
time.sleep(0.5)
raise Exception("Server localhost:{} did not stop".format(cnxn.port))
@@ -154,7 +155,9 @@ class KillableThreadedServer(TServer):
client = self.serverTransport.accept()
# Since accept() can take a while, check again if the server is shutdown
to avoid
# starting an unnecessary thread.
- if self.is_shutdown: return
+ if self.is_shutdown:
+ client.close()
+ return
t = None
if sys.version_info.major < 3:
t = threading.Thread(target=self.handle, args=(client,))