hjtran opened a new pull request, #30743:
URL: https://github.com/apache/beam/pull/30743
This is a proof-of-concept draft for fixing/implementing deferred side
inputs with Combiners. I could use feedback on how to proceed and then I'll
clean it up as well.
**Issue:**
Currently if you try to use a side input with a combine function, you end up
with a traceback during the pipeline translation step:
```
File
"/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py",
line 1367, in lift_combiners
expansion = lifted_stages if can_lift(transform) else unlifted_stages
^^^^^^^^^^^^^^^^^^^
File
"/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py",
line 1239, in can_lift
context.components.pcollections[only_element(
^^^^^^^^^^^^^
File
"/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py",
line 2082, in only_element
element, = iterable
^^^^^^^^
ValueError: too many values to unpack (expected 1)
```
**Fix?:**
The issue is because in the code where we determine whether or not to lift a
combiner, we assume the combiner has a single input pcoll, which is not true
when the combiner has a deferred side input.
I went through fixing this and then realized that we also don't have
plumbing for deferred side inputs implemented at the `operations.py` layer
either. It seemed like a lot of work to duplicate all the side input plumbing
that ParDo has when the phases of the combiners can be expressed as ParDos
anyways.
Inside of `direct/helper_transforms.py` we actually already define a
ParDo-based version of a lifted combiner. I tried adding an override to use it
only if we have a combiner with side inputs but at that point we've already
substituted away the side input for a `ArgumentPlaceholder`s. I instead updated
it and returned it in `CombinePerKey.__new__`. I couldn't put it in
`CombinePerKey.expand` since `CombinePerKey` has the special urn that would've
resulted in the subtransforms getting ignored
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]