This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a9cd0179508 Fix SystemError in _DeferredCall.get() under GC pressure
(#38355)
a9cd0179508 is described below
commit a9cd0179508aac066f133e0bc21986f961f2f342
Author: Arran Cudbard-Bell <[email protected]>
AuthorDate: Fri May 8 14:13:18 2026 -0400
Fix SystemError in _DeferredCall.get() under GC pressure (#38355)
CPython builds the argument tuple incrementally via _PyTuple_Resize when
unpacking a generator: f(*(gen)). _PyTuple_Resize guards that
Py_REFCNT(v) == 1 before resizing a non-empty tuple. Under sustained
allocation pressure a GC cycle can run between generator yields and
temporarily increment the refcount on the partial tuple, causing
_PyTuple_Resize to call PyErr_BadInternalCall().
Change the generator expression to a list comprehension. CPython builds
the list first and passes it to CALL_FUNCTION_EX, which takes the
PySequence_Fast list path and never calls _PyTuple_Resize.
Co-authored-by: Arran Cudbard-Bell <[email protected]>
---
.../apache_beam/runners/worker/sdk_worker.py | 9 +++++-
.../apache_beam/runners/worker/sdk_worker_test.py | 32 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index e1f17296057..a79cb0e8de6 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -1448,7 +1448,14 @@ class _DeferredCall(_Future[T]):
def get(self, timeout=None):
# type: (Optional[float]) -> T
- return self._func(*(arg.get(timeout) for arg in self._args))
+ # List comprehension, not generator: *(gen) causes CPython to build the
+ # argument tuple incrementally via _PyTuple_Resize, which asserts
+ # Py_REFCNT(v)==1. A GC cycle between yields can increment that refcount,
+ # raising SystemError (Objects/tupleobject.c:927). See
+ # https://github.com/python/cpython/issues/127058 (fixed in 3.14.0a3+:
+ # https://github.com/python/cpython/commit/5a23994). *[list] allocates the
+ # tuple once at its final size, avoiding the resize entirely.
+ return self._func(*[arg.get(timeout) for arg in self._args])
def set(self, value):
# type: (T) -> _Future[T]
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 7b53f274cac..76e428f0646 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -704,6 +704,38 @@ class ShortIdCacheTest(unittest.TestCase):
% case.info)
+class DeferredCallTest(unittest.TestCase):
+ """Tests for _DeferredCall.get()."""
+ def test_get_single_arg(self):
+ f = sdk_worker._Future().set(42)
+ call = sdk_worker._DeferredCall(lambda x: x, f)
+ self.assertEqual(call.get(), 42)
+
+ def test_get_multiple_args(self):
+ futures = [sdk_worker._Future().set(i) for i in range(5)]
+ call = sdk_worker._DeferredCall(lambda *args: sum(args), *futures)
+ self.assertEqual(call.get(), sum(range(5)))
+
+ def test_get_non_future_args_are_wrapped(self):
+ # __init__ wraps non-Future values in _Future().set(v); get() must work.
+ call = sdk_worker._DeferredCall(lambda x, y: x * y, 3, 7)
+ self.assertEqual(call.get(), 21)
+
+ def test_get_mixed_future_and_value_args(self):
+ a = sdk_worker._Future().set(10)
+ call = sdk_worker._DeferredCall(lambda x, y: x + y, a, 5)
+ self.assertEqual(call.get(), 15)
+
+ def test_get_zero_args(self):
+ call = sdk_worker._DeferredCall(lambda: 99)
+ self.assertEqual(call.get(), 99)
+
+ def test_get_preserves_return_value_type(self):
+ f = sdk_worker._Future().set({'key': 'val'})
+ call = sdk_worker._DeferredCall(lambda d: d, f)
+ self.assertEqual(call.get(), {'key': 'val'})
+
+
def monitoringInfoMetadata(info):
return {
descriptor.name: value