This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d6cdc0b4b6 Isolate tests in multi_process_shared with unique temp 
path. (#38498)
1d6cdc0b4b6 is described below

commit 1d6cdc0b4b6674ff00f072b2b00c5de97f0c9697
Author: Shunping Huang <[email protected]>
AuthorDate: Thu May 14 12:36:13 2026 -0400

    Isolate tests in multi_process_shared with unique temp path. (#38498)
---
 .../apache_beam/utils/multi_process_shared_test.py | 37 ++++++++++++----------
 1 file changed, 20 insertions(+), 17 deletions(-)

diff --git a/sdks/python/apache_beam/utils/multi_process_shared_test.py 
b/sdks/python/apache_beam/utils/multi_process_shared_test.py
index 18ed49c6fa1..c597e459e1a 100644
--- a/sdks/python/apache_beam/utils/multi_process_shared_test.py
+++ b/sdks/python/apache_beam/utils/multi_process_shared_test.py
@@ -25,9 +25,29 @@ import unittest
 from typing import Any
 from unittest.mock import patch
 
+import pytest
+
 from apache_beam.utils import multi_process_shared
 
 
[email protected](autouse=True)
+def isolate_multi_process_shared_tests(tmp_path, monkeypatch):
+  """Isolates MultiProcessShared tests by using a unique temp directory per 
test.
+
+  This prevents tests running in parallel (e.g. with pytest-xdist) from
+  interfering with each other by writing to the same shared default temp 
directory.
+  """
+  orig_init = multi_process_shared.MultiProcessShared.__init__
+
+  def mock_init(self, constructor, tag, *args, **kwargs):
+    if 'path' not in kwargs:
+      kwargs['path'] = str(tmp_path)
+    return orig_init(self, constructor, tag, *args, **kwargs)
+
+  monkeypatch.setattr(
+      multi_process_shared.MultiProcessShared, '__init__', mock_init)
+
+
 class CallableCounter(object):
   def __init__(self, start=0):
     self.running = start
@@ -285,23 +305,6 @@ class MultiProcessSharedTest(unittest.TestCase):
 
 
 class MultiProcessSharedSpawnProcessTest(unittest.TestCase):
-  def setUp(self):
-    tempdir = tempfile.gettempdir()
-    for tag in ['basic',
-                'main',
-                'to_delete',
-                'to_keep',
-                'mix1',
-                'mix2',
-                'test_process_exit',
-                'thundering_herd_test',
-                'transient_test']:
-      for ext in ['', '.address', '.address.error']:
-        try:
-          os.remove(os.path.join(tempdir, tag + ext))
-        except OSError:
-          pass
-
   def tearDown(self):
     for p in multiprocessing.active_children():
       if p.is_alive():

Reply via email to