hjtran opened a new pull request, #38268:
URL: https://github.com/apache/beam/pull/38268

   ## Summary
   
   Adds `PCollection.with_side_outputs(**kwargs)` to the Python SDK so 
transforms can return a single, chainable `PCollection` that *also* carries 
named side-output PCollections, accessible via `result.side_outputs.<tag>`.
   
   ```python
   class MyFilter(beam.PTransform):
       def expand(self, pcoll):
           results = pcoll | beam.ParDo(FilterDoFn()).with_outputs('dropped', 
main='main')
           return results['main'].with_side_outputs(dropped=results['dropped'])
   
   # Most users — just chain
   inputs | MyFilter() | NextTransform()
   
   # When you need the side output
   result = inputs | MyFilter()
   result.side_outputs.dropped
   ```
   
   Today, transform authors face a tradeoff between returning a `PCollection` 
(chainable, but loses side outputs) or a `dict`/`PCollectionTuple` (preserves 
side outputs, but breaks chaining). This change lets them have both.
   
   ## Design
   
   - New `_SideOutputsContainer` exposes attribute (`container.dropped`) and 
item (`container["dropped"]`) access plus iteration / `len` / `in`.
   - `PCollection.with_side_outputs(**kwargs)` returns a `copy.copy(self)` with 
the side-output mapping attached. The original is unchanged. Validation: each 
value must be a `PCollection` (TypeError) on the same pipeline (ValueError).
   - `Pipeline._apply_internal` and `Pipeline._replace` both call a shared 
helper that registers the side outputs on the wrapping `AppliedPTransform` — 
*only* for the top-level returned `PCollection`. This makes side outputs 
first-class in the pipeline graph (visible to runners, the proto, 
visualization, and YAML's `Transform.tag` resolution).
   - The helper enforces two apply-time invariants:
     - **Provenance:** each side output's producer must be the wrapping 
transform itself or one of its descendants. Foreign PCollections are rejected 
with `ValueError`.
     - **Tag collision:** if a side-output tag already exists in the wrapping 
transform's outputs, the value must be the same `PCollection` object 
(idempotent); otherwise `ValueError`.
   - `__copy__` is added to `PCollection` so `copy.copy(self)` doesn't dispatch 
to the existing `__reduce_ex__` anti-pickling hook.
   
   ## Chaining behavior
   
   `pc | A | B` is unchanged. `B` receives `A`'s main output as a normal 
PCollection. `A`'s side outputs are not carried forward — capture the 
intermediate result if you need them. This matches the behavior described in 
the proposal and requires no special code.
   
   ## Caveats
   
   - `**kwargs` syntax restricts tags to valid Python identifiers. Tags with 
hyphens/spaces are not supported here; users with arbitrary string tags should 
keep using the existing dict / `DoOutputsTuple` patterns.
   - `.side_outputs` is a construction-time ergonomic, not a durable graph 
property. After `from_runner_api`, the reconstructed PCollection won't have 
`_side_outputs` populated, but the side outputs themselves still exist as named 
outputs on the producer's `AppliedPTransform`.
   - Side outputs do not participate in composite-boundary type checking 
(matches the existing behavior of side outputs accessed via `DoOutputsTuple`).
   
   ## Follow-ups (not in this PR)
   
   - `ParDo.with_side_outputs(...)` convenience method.
   - Beam YAML dot-modifier integration (the existing `get_pcollection` already 
resolves `Transform.tag` by walking the producer's outputs dict, so this should 
mostly "just work" once verified).
   
   ## Tests
   
   `pvalue_test.py` covers: copy semantics, attribute / index access, 
missing-tag errors, type/pipeline validation, empty container, double-call 
replace.
   
   `pipeline_test.py` covers: end-to-end materialization with `assert_that`, 
the canonical `MyFilter`-style composite wrapping `with_outputs`, 
foreign-pcollection rejection, tag-collision rejection, idempotent same-object 
collision, replacement-path registration via `Pipeline.replace_all`, runner API 
round-trip, and the nested-return non-flattening case.
   
   ## Test plan
   
   - [x] `cd sdks/python && python -m pytest apache_beam/pvalue_test.py 
apache_beam/pipeline_test.py` — 84 passed, 1 skipped
   - [x] `ruff check --config=sdks/python/ruff.toml` on the four changed files 
— clean
   - [x] `yapf --diff` on the four changed files — clean


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to