This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 0adb07753 IMPALA-14784: Upgrade to python-xdist==3.5.0 and use
--dist=worksteal
0adb07753 is described below
commit 0adb0775368dca39aa253c0e11583df315354e15
Author: Joe McDonnell <[email protected]>
AuthorDate: Tue Feb 24 19:01:51 2026 -0800
IMPALA-14784: Upgrade to python-xdist==3.5.0 and use --dist=worksteal
On exhaustive jobs, the end-to-end parallel tests show enormous
skew. The last 1-2% of tests takes hours, and logs indicate that
the last 1257 tests execute on a single worker.
pytest-xdist introduced a 'worksteal' algorithm in 3.2.0 that
can rebalance the work. Exhaustive end-to-end parallel tests
take about 5:20, while the same tests run in about 2:40 with
the worksteal policy. The improvement on core exhaustive
tests is much smaller, because it doesn't suffer the same
level of skew.
pytest-xdist changed the way they assign tests to workers,
and it exposed an issue with TestAcid::test_lock_timings().
The test sets the query option lock_max_wait_time_s on the
session, but it never unsets it. When multiple copies of
the test run on a single worker, the test case for a timeout
of 300 seconds with lock_max_wait_time_s unset is actually
using a value of lock_max_wait_time_s=5. This reworks the
test to set lock_max_wait_time_s via execute_query()'s
query_options argument rather than on the session itself.
Testing:
- Ran end-to-end exhaustive tests
- Ran a core job
- Verified that TestAcid::test_lock_timings() can run multiple
times with a single worker without failing
Change-Id: I6916bbef94b380a516356763dfabb3777c682637
Reviewed-on: http://gerrit.cloudera.org:8080/24035
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
---
infra/python/deps/requirements.txt | 2 +-
tests/query_test/test_acid.py | 28 ++++++++++++++++------------
tests/run-tests.py | 2 +-
3 files changed, 18 insertions(+), 14 deletions(-)
diff --git a/infra/python/deps/requirements.txt
b/infra/python/deps/requirements.txt
index 6df6437da..e75d7d2aa 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -53,7 +53,7 @@ pytest == 6.2.5
pytest-forked == 1.6.0
pytest-reportlog == 0.4.0
pytest-timeout == 2.2.0
- pytest-xdist == 2.4.0
+ pytest-xdist == 3.5.0
execnet == 2.1.2
hatchling == 1.28.0
pathspec == 1.0.4
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index a0b5783ff..7ac06bbf6 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -18,6 +18,7 @@
# Functional tests for ACID integration with Hive.
from __future__ import absolute_import, division, print_function
+from copy import deepcopy
import os
from subprocess import check_call
import time
@@ -240,9 +241,12 @@ class TestAcid(ImpalaTestSuite):
@SkipIfFS.hive
def test_lock_timings(self, vector, unique_database):
- def elapsed_time_for_query(query):
+ def elapsed_time_for_query(query, lock_max_wait_time=None):
t_start = time.time()
- self.execute_query_expect_failure(self.client, query)
+ query_options = deepcopy(vector.get_exec_option_dict())
+ if lock_max_wait_time is not None:
+ query_options['lock_max_wait_time_s'] = lock_max_wait_time
+ self.execute_query_expect_failure(self.client, query,
query_options=query_options)
return time.time() - t_start
tbl_name = "test_lock"
@@ -256,24 +260,24 @@ class TestAcid(ImpalaTestSuite):
if self.exploration_strategy() == 'exhaustive':
elapsed = elapsed_time_for_query("insert into {} values
(1)".format(tbl))
assert elapsed > 300 and elapsed < 310
- self.execute_query("set lock_max_wait_time_s=20")
- elapsed = elapsed_time_for_query("insert into {} values (1)".format(tbl))
+ elapsed = elapsed_time_for_query(
+ "insert into {} values (1)".format(tbl), lock_max_wait_time=20)
assert elapsed > 20 and elapsed < 28
- self.execute_query("set lock_max_wait_time_s=0")
- elapsed = elapsed_time_for_query("insert into {} values (1)".format(tbl))
+ elapsed = elapsed_time_for_query(
+ "insert into {} values (1)".format(tbl), lock_max_wait_time=0)
assert elapsed < 8
- self.execute_query("set lock_max_wait_time_s=10")
- elapsed = elapsed_time_for_query("insert into {} values (1)".format(tbl))
+ elapsed = elapsed_time_for_query(
+ "insert into {} values (1)".format(tbl), lock_max_wait_time=10)
assert elapsed > 10 and elapsed < 18
- self.execute_query("set lock_max_wait_time_s=2")
- elapsed = elapsed_time_for_query("truncate table {}".format(tbl))
+ elapsed = elapsed_time_for_query(
+ "truncate table {}".format(tbl), lock_max_wait_time=2)
assert elapsed > 2 and elapsed < 10
- self.execute_query("set lock_max_wait_time_s=5")
- elapsed = elapsed_time_for_query("drop table {}".format(tbl))
+ elapsed = elapsed_time_for_query(
+ "drop table {}".format(tbl), lock_max_wait_time=5)
assert elapsed > 5 and elapsed < 13
finally:
acid_util.unlock(lock_resp.lockid)
diff --git a/tests/run-tests.py b/tests/run-tests.py
index a5608617b..92bd2cf9e 100755
--- a/tests/run-tests.py
+++ b/tests/run-tests.py
@@ -344,7 +344,7 @@ if __name__ == "__main__":
# Run the remaining query tests in parallel
if not skip_parallel:
base_args = conf_args + ['-m', 'not execute_serially and not stress',
- '-n', NUM_CONCURRENT_TESTS]
+ '-n', NUM_CONCURRENT_TESTS, '--dist=worksteal']
run(base_args + build_test_args("parallel{0}".format(shard_identifier)))
# The total number of tests executed at this point is expected to be >0