arr2036 opened a new pull request, #38355:
URL: https://github.com/apache/beam/pull/38355

   ## Summary
   
   `_DeferredCall.get()` in `sdk_worker.py` used a generator expression in the 
argument unpack position:
   
   ```python
   return self._func(*(arg.get(timeout) for arg in self._args))
   ```
   
   Under sustained load this raises `SystemError: Objects/tupleobject.c:927: 
bad argument to internal function`.
   
   ## Root cause
   
   CPython builds the argument tuple incrementally when unpacking a generator, 
calling `_PyTuple_Resize` after each yield to grow the tuple by one slot. 
`_PyTuple_Resize` guards:
   
   ```c
   // cpython/Objects/tupleobject.c (Python 3.11)
   // https://github.com/python/cpython/blob/3.11/Objects/tupleobject.c#L232
   if (v == NULL || !PyTuple_Check(v) ||
       (Py_SIZE(v) != 0 && Py_REFCNT(v) != 1)) {
       *pv = 0;
       Py_XDECREF(v);
       PyErr_BadInternalCall();
       return -1;
   }
   ```
   
   Between generator yields, a GC cycle can run (triggered by allocations 
inside `arg.get()`). The GC can increment the refcount on the partially-built 
tuple. When `_PyTuple_Resize` next fires, `Py_REFCNT(v) != 1` is true and it 
raises `SystemError`.
   
   The call site is `bundle_processor.py:637` (`state.commit()` -> 
`to_await.get()` -> `_DeferredCall.get()`), which is on the hot path for every 
bundle that commits state. High-throughput streaming pipelines hit this 
continuously.
   
   ## Observed stack trace (Python 3.11, Beam 2.73.0, production worker)
   
   ```
   File ".../sdk_worker.py", line 316, in _execute
       response = task()
   File ".../sdk_worker.py", line 390, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
   File ".../sdk_worker.py", line 707, in process_bundle
       bundle_processor.process_bundle(instruction_id))
   File ".../bundle_processor.py", line 1310, in process_bundle
       op.finish()
   File ".../bundle_processor.py", line 1028, in commit
       state.commit()
   File ".../bundle_processor.py", line 637, in commit
       to_await.get()
   File ".../sdk_worker.py", line 1457, in get
       return self._func(*(arg.get(timeout) for arg in self._args))
   SystemError: Objects/tupleobject.c:927: bad argument to internal function
   ```
   
   ## Fix
   
   Change the generator expression to a list comprehension:
   
   ```python
   return self._func(*[arg.get(timeout) for arg in self._args])
   ```
   
   CPython materialises the full list before the call. `CALL_FUNCTION_EX` 
receives a list and takes the `PySequence_Fast` path, which for a list is a 
no-op (just incref). The argument tuple is allocated once at its final size via 
`PyTuple_New`. `_PyTuple_Resize` is never called.
   
   `_DeferredCall.__init__` already wraps non-`_Future` args eagerly, so 
materialising the list before the call preserves existing semantics.
   
   ## Why `_DeferredCall.wait()` is unaffected
   
   `wait()` also uses a generator expression (`all(arg.wait(timeout) for arg in 
self._args)`) but passes it to `all()`, which is a C built-in that consumes the 
generator without building a resizable tuple. The crash path is specific to 
`*(gen)` unpacking in a function call.
   
   ## Tests
   
   Added `DeferredCallTest` in `sdk_worker_test.py` covering single-arg, 
multi-arg, non-Future arg wrapping, mixed Future+value, zero-arg, and return 
value type preservation.
   
   The crash itself requires specific CPython internal timing (GC cycle between 
`PySequence_Tuple`'s calls to the generator's `__next__`, against the C-level 
partial tuple) and cannot be triggered deterministically from pure Python 
without a C extension or a CPython debug build. The argument for the fix rests 
on the CPython source and the observed production stack trace above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to