arr2036 opened a new pull request, #38355: URL: https://github.com/apache/beam/pull/38355
## Summary `_DeferredCall.get()` in `sdk_worker.py` used a generator expression in the argument unpack position: ```python return self._func(*(arg.get(timeout) for arg in self._args)) ``` Under sustained load this raises `SystemError: Objects/tupleobject.c:927: bad argument to internal function`. ## Root cause CPython builds the argument tuple incrementally when unpacking a generator, calling `_PyTuple_Resize` after each yield to grow the tuple by one slot. `_PyTuple_Resize` guards: ```c // cpython/Objects/tupleobject.c (Python 3.11) // https://github.com/python/cpython/blob/3.11/Objects/tupleobject.c#L232 if (v == NULL || !PyTuple_Check(v) || (Py_SIZE(v) != 0 && Py_REFCNT(v) != 1)) { *pv = 0; Py_XDECREF(v); PyErr_BadInternalCall(); return -1; } ``` Between generator yields, a GC cycle can run (triggered by allocations inside `arg.get()`). The GC can increment the refcount on the partially-built tuple. When `_PyTuple_Resize` next fires, `Py_REFCNT(v) != 1` is true and it raises `SystemError`. The call site is `bundle_processor.py:637` (`state.commit()` -> `to_await.get()` -> `_DeferredCall.get()`), which is on the hot path for every bundle that commits state. High-throughput streaming pipelines hit this continuously. ## Observed stack trace (Python 3.11, Beam 2.73.0, production worker) ``` File ".../sdk_worker.py", line 316, in _execute response = task() File ".../sdk_worker.py", line 390, in <lambda> lambda: self.create_worker().do_instruction(request), request) File ".../sdk_worker.py", line 707, in process_bundle bundle_processor.process_bundle(instruction_id)) File ".../bundle_processor.py", line 1310, in process_bundle op.finish() File ".../bundle_processor.py", line 1028, in commit state.commit() File ".../bundle_processor.py", line 637, in commit to_await.get() File ".../sdk_worker.py", line 1457, in get return self._func(*(arg.get(timeout) for arg in self._args)) SystemError: Objects/tupleobject.c:927: bad argument to internal function ``` ## Fix Change the generator expression to a list comprehension: ```python return self._func(*[arg.get(timeout) for arg in self._args]) ``` CPython materialises the full list before the call. `CALL_FUNCTION_EX` receives a list and takes the `PySequence_Fast` path, which for a list is a no-op (just incref). The argument tuple is allocated once at its final size via `PyTuple_New`. `_PyTuple_Resize` is never called. `_DeferredCall.__init__` already wraps non-`_Future` args eagerly, so materialising the list before the call preserves existing semantics. ## Why `_DeferredCall.wait()` is unaffected `wait()` also uses a generator expression (`all(arg.wait(timeout) for arg in self._args)`) but passes it to `all()`, which is a C built-in that consumes the generator without building a resizable tuple. The crash path is specific to `*(gen)` unpacking in a function call. ## Tests Added `DeferredCallTest` in `sdk_worker_test.py` covering single-arg, multi-arg, non-Future arg wrapping, mixed Future+value, zero-arg, and return value type preservation. The crash itself requires specific CPython internal timing (GC cycle between `PySequence_Tuple`'s calls to the generator's `__next__`, against the C-level partial tuple) and cannot be triggered deterministically from pure Python without a C extension or a CPython debug build. The argument for the fix rests on the CPython source and the observed production stack trace above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
