junaiddshaukat opened a new pull request, #38843:
URL: https://github.com/apache/beam/pull/38843

   ## Summary
   Direct follow-up to #38764, per Slack with @je-ik:
   
   1. Implement `Redistribute.arbitrarily()` as a runner-native passthrough so 
the fuser terminates stages at the Redistribute boundary, enabling 
chained-stage E2E tests without dragging GBK in.
   2. Drop the `byte[]` type bound on `ExecutableStageProcessor` / 
`KStreamsPayload` at the processor edge so non-byte[] outputs (e.g. `Integer` 
from `MapElements`) flow stage-to-stage without the cast lying about runtime 
type.
   3. Add a chained `Impulse -> MapElements<Integer> -> Redistribute -> ParDo` 
E2E test under the EMBEDDED Java SDK harness, asserting the Integer survives 
the runner round-trip via `SharedTestCollector`.
   
   ## Wiring details
   - `knownUrns()` exposed; `TrivialNativeTransformExpander.forKnownUrns(...)` 
runs before `GreedyPipelineFuser.fuse(...)` so the Redistribute composite's 
sub-transforms (which include GBK, not yet supported) are stripped before 
fusion.
   - `@AutoService(NativeTransforms.IsNativeTransform.class)` registers 
Redistribute URN so `QueryablePipeline` treats the stripped composite as a 
primitive producer of its outputs. Mirrors Flink's `IsFlinkNativeTransform`.
   
   ## Out of scope
   - `Redistribute.byKey()` URN — rehashing belongs with the GroupByKey 
sub-issue.
   - WatermarkManager — separate sub-issue.
   
   ## Validation
   `./gradlew :runners:kafka-streams:check` green locally, 15 tests pass (incl. 
new `ChainedExecutableStageTest`).
   
   Closes #38840
   Refs #18479
   cc @je-ik


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to