This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1d6cdc0b4b6 Isolate tests in multi_process_shared with unique temp
path. (#38498)
1d6cdc0b4b6 is described below
commit 1d6cdc0b4b6674ff00f072b2b00c5de97f0c9697
Author: Shunping Huang <[email protected]>
AuthorDate: Thu May 14 12:36:13 2026 -0400
Isolate tests in multi_process_shared with unique temp path. (#38498)
---
.../apache_beam/utils/multi_process_shared_test.py | 37 ++++++++++++----------
1 file changed, 20 insertions(+), 17 deletions(-)
diff --git a/sdks/python/apache_beam/utils/multi_process_shared_test.py
b/sdks/python/apache_beam/utils/multi_process_shared_test.py
index 18ed49c6fa1..c597e459e1a 100644
--- a/sdks/python/apache_beam/utils/multi_process_shared_test.py
+++ b/sdks/python/apache_beam/utils/multi_process_shared_test.py
@@ -25,9 +25,29 @@ import unittest
from typing import Any
from unittest.mock import patch
+import pytest
+
from apache_beam.utils import multi_process_shared
[email protected](autouse=True)
+def isolate_multi_process_shared_tests(tmp_path, monkeypatch):
+ """Isolates MultiProcessShared tests by using a unique temp directory per
test.
+
+ This prevents tests running in parallel (e.g. with pytest-xdist) from
+ interfering with each other by writing to the same shared default temp
directory.
+ """
+ orig_init = multi_process_shared.MultiProcessShared.__init__
+
+ def mock_init(self, constructor, tag, *args, **kwargs):
+ if 'path' not in kwargs:
+ kwargs['path'] = str(tmp_path)
+ return orig_init(self, constructor, tag, *args, **kwargs)
+
+ monkeypatch.setattr(
+ multi_process_shared.MultiProcessShared, '__init__', mock_init)
+
+
class CallableCounter(object):
def __init__(self, start=0):
self.running = start
@@ -285,23 +305,6 @@ class MultiProcessSharedTest(unittest.TestCase):
class MultiProcessSharedSpawnProcessTest(unittest.TestCase):
- def setUp(self):
- tempdir = tempfile.gettempdir()
- for tag in ['basic',
- 'main',
- 'to_delete',
- 'to_keep',
- 'mix1',
- 'mix2',
- 'test_process_exit',
- 'thundering_herd_test',
- 'transient_test']:
- for ext in ['', '.address', '.address.error']:
- try:
- os.remove(os.path.join(tempdir, tag + ext))
- except OSError:
- pass
-
def tearDown(self):
for p in multiprocessing.active_children():
if p.is_alive():