hjtran opened a new pull request, #38268:
URL: https://github.com/apache/beam/pull/38268
## Summary
Adds `PCollection.with_side_outputs(**kwargs)` to the Python SDK so
transforms can return a single, chainable `PCollection` that *also* carries
named side-output PCollections, accessible via `result.side_outputs.<tag>`.
```python
class MyFilter(beam.PTransform):
def expand(self, pcoll):
results = pcoll | beam.ParDo(FilterDoFn()).with_outputs('dropped',
main='main')
return results['main'].with_side_outputs(dropped=results['dropped'])
# Most users — just chain
inputs | MyFilter() | NextTransform()
# When you need the side output
result = inputs | MyFilter()
result.side_outputs.dropped
```
Today, transform authors face a tradeoff between returning a `PCollection`
(chainable, but loses side outputs) or a `dict`/`PCollectionTuple` (preserves
side outputs, but breaks chaining). This change lets them have both.
## Design
- New `_SideOutputsContainer` exposes attribute (`container.dropped`) and
item (`container["dropped"]`) access plus iteration / `len` / `in`.
- `PCollection.with_side_outputs(**kwargs)` returns a `copy.copy(self)` with
the side-output mapping attached. The original is unchanged. Validation: each
value must be a `PCollection` (TypeError) on the same pipeline (ValueError).
- `Pipeline._apply_internal` and `Pipeline._replace` both call a shared
helper that registers the side outputs on the wrapping `AppliedPTransform` —
*only* for the top-level returned `PCollection`. This makes side outputs
first-class in the pipeline graph (visible to runners, the proto,
visualization, and YAML's `Transform.tag` resolution).
- The helper enforces two apply-time invariants:
- **Provenance:** each side output's producer must be the wrapping
transform itself or one of its descendants. Foreign PCollections are rejected
with `ValueError`.
- **Tag collision:** if a side-output tag already exists in the wrapping
transform's outputs, the value must be the same `PCollection` object
(idempotent); otherwise `ValueError`.
- `__copy__` is added to `PCollection` so `copy.copy(self)` doesn't dispatch
to the existing `__reduce_ex__` anti-pickling hook.
## Chaining behavior
`pc | A | B` is unchanged. `B` receives `A`'s main output as a normal
PCollection. `A`'s side outputs are not carried forward — capture the
intermediate result if you need them. This matches the behavior described in
the proposal and requires no special code.
## Caveats
- `**kwargs` syntax restricts tags to valid Python identifiers. Tags with
hyphens/spaces are not supported here; users with arbitrary string tags should
keep using the existing dict / `DoOutputsTuple` patterns.
- `.side_outputs` is a construction-time ergonomic, not a durable graph
property. After `from_runner_api`, the reconstructed PCollection won't have
`_side_outputs` populated, but the side outputs themselves still exist as named
outputs on the producer's `AppliedPTransform`.
- Side outputs do not participate in composite-boundary type checking
(matches the existing behavior of side outputs accessed via `DoOutputsTuple`).
## Follow-ups (not in this PR)
- `ParDo.with_side_outputs(...)` convenience method.
- Beam YAML dot-modifier integration (the existing `get_pcollection` already
resolves `Transform.tag` by walking the producer's outputs dict, so this should
mostly "just work" once verified).
## Tests
`pvalue_test.py` covers: copy semantics, attribute / index access,
missing-tag errors, type/pipeline validation, empty container, double-call
replace.
`pipeline_test.py` covers: end-to-end materialization with `assert_that`,
the canonical `MyFilter`-style composite wrapping `with_outputs`,
foreign-pcollection rejection, tag-collision rejection, idempotent same-object
collision, replacement-path registration via `Pipeline.replace_all`, runner API
round-trip, and the nested-return non-flattening case.
## Test plan
- [x] `cd sdks/python && python -m pytest apache_beam/pvalue_test.py
apache_beam/pipeline_test.py` — 84 passed, 1 skipped
- [x] `ruff check --config=sdks/python/ruff.toml` on the four changed files
— clean
- [x] `yapf --diff` on the four changed files — clean
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]