This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a9cd0179508 Fix SystemError in _DeferredCall.get() under GC pressure 
(#38355)
a9cd0179508 is described below

commit a9cd0179508aac066f133e0bc21986f961f2f342
Author: Arran Cudbard-Bell <[email protected]>
AuthorDate: Fri May 8 14:13:18 2026 -0400

    Fix SystemError in _DeferredCall.get() under GC pressure (#38355)
    
    CPython builds the argument tuple incrementally via _PyTuple_Resize when
    unpacking a generator: f(*(gen)).  _PyTuple_Resize guards that
    Py_REFCNT(v) == 1 before resizing a non-empty tuple.  Under sustained
    allocation pressure a GC cycle can run between generator yields and
    temporarily increment the refcount on the partial tuple, causing
    _PyTuple_Resize to call PyErr_BadInternalCall().
    
    Change the generator expression to a list comprehension.  CPython builds
    the list first and passes it to CALL_FUNCTION_EX, which takes the
    PySequence_Fast list path and never calls _PyTuple_Resize.
    
    Co-authored-by: Arran Cudbard-Bell <[email protected]>
---
 .../apache_beam/runners/worker/sdk_worker.py       |  9 +++++-
 .../apache_beam/runners/worker/sdk_worker_test.py  | 32 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index e1f17296057..a79cb0e8de6 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -1448,7 +1448,14 @@ class _DeferredCall(_Future[T]):
 
   def get(self, timeout=None):
     # type: (Optional[float]) -> T
-    return self._func(*(arg.get(timeout) for arg in self._args))
+    # List comprehension, not generator: *(gen) causes CPython to build the
+    # argument tuple incrementally via _PyTuple_Resize, which asserts
+    # Py_REFCNT(v)==1. A GC cycle between yields can increment that refcount,
+    # raising SystemError (Objects/tupleobject.c:927). See
+    # https://github.com/python/cpython/issues/127058 (fixed in 3.14.0a3+:
+    # https://github.com/python/cpython/commit/5a23994). *[list] allocates the
+    # tuple once at its final size, avoiding the resize entirely.
+    return self._func(*[arg.get(timeout) for arg in self._args])
 
   def set(self, value):
     # type: (T) -> _Future[T]
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 7b53f274cac..76e428f0646 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -704,6 +704,38 @@ class ShortIdCacheTest(unittest.TestCase):
           % case.info)
 
 
+class DeferredCallTest(unittest.TestCase):
+  """Tests for _DeferredCall.get()."""
+  def test_get_single_arg(self):
+    f = sdk_worker._Future().set(42)
+    call = sdk_worker._DeferredCall(lambda x: x, f)
+    self.assertEqual(call.get(), 42)
+
+  def test_get_multiple_args(self):
+    futures = [sdk_worker._Future().set(i) for i in range(5)]
+    call = sdk_worker._DeferredCall(lambda *args: sum(args), *futures)
+    self.assertEqual(call.get(), sum(range(5)))
+
+  def test_get_non_future_args_are_wrapped(self):
+    # __init__ wraps non-Future values in _Future().set(v); get() must work.
+    call = sdk_worker._DeferredCall(lambda x, y: x * y, 3, 7)
+    self.assertEqual(call.get(), 21)
+
+  def test_get_mixed_future_and_value_args(self):
+    a = sdk_worker._Future().set(10)
+    call = sdk_worker._DeferredCall(lambda x, y: x + y, a, 5)
+    self.assertEqual(call.get(), 15)
+
+  def test_get_zero_args(self):
+    call = sdk_worker._DeferredCall(lambda: 99)
+    self.assertEqual(call.get(), 99)
+
+  def test_get_preserves_return_value_type(self):
+    f = sdk_worker._Future().set({'key': 'val'})
+    call = sdk_worker._DeferredCall(lambda d: d, f)
+    self.assertEqual(call.get(), {'key': 'val'})
+
+
 def monitoringInfoMetadata(info):
   return {
       descriptor.name: value

Reply via email to