junaiddshaukat opened a new pull request, #38764:
URL: https://github.com/apache/beam/pull/38764

   ## Summary
   Third sub-issue under the Kafka Streams runner GSoC 2026 project.
   Adds the ExecutableStage translator and SDK-harness bridge so fused
   stateless user code (ParDo etc.) actually runs in the SDK harness over
   the Fn API and its outputs flow back into the topology.
   
   Per design doc §4.2 and the live discussion with @je-ik on the issue.
   
   ## What's in this PR
   - `KafkaStreamsExecutableStageContextFactory` mirroring Flink's pattern.
   - `ExecutableStageProcessor` (the harness bridge) + 
`ExecutableStageTranslator`.
   - URN dispatch registers `ExecutableStage.URN`; `prepareForTranslation` now 
runs `GreedyPipelineFuser`.
   - `KStreamsPayload.toString` uses `MoreObjects.toStringHelper` (post-merge 
tweak from #38689).
   - Test deps: `testImplementation project(':sdks:java:harness')` for the 
EMBEDDED environment.
   
   ## Tests
   - `ExecutableStageTranslatorTest` builds `Impulse -> ParDo` via the Beam
     Java SDK, drives the resulting topology under `TopologyTestDriver`
     with the EMBEDDED environment, and asserts via side effect that the
     DoFn ran in the SDK harness with the expected input. Approach
     discussed with @je-ik on #38743 — because the ParDo's output has no
     downstream consumer, the stage has no output PCollection and the
     harness does not deliver the value back to the runner (per Beam
     semantics), so the bridge is verified by a recorded side effect.
   - New `SharedTestCollector<T>` helper: instances are `Serializable`
     but their identity is a UUID; the actual storage lives in a static
     registry keyed by UUID. Survives the runner cloning the DoFn
     (current or future EMBEDDED behaviour).
   - `KafkaStreamsPipelineTranslatorTest` updated so the Impulse case
     builds via the Beam SDK (validator-compliant proto for the fuser)
     and the URN-rejection case calls `translate` directly to keep the
     dispatch-loop contract isolated.
   
   ## Validation
   - `./gradlew :runners:kafka-streams:check` green locally (14 tests).
   - No `@SuppressWarnings` of any flavor in the new code.
   
   ## Notes / deferred
   - The watermark-driven bundle flush in `ExecutableStageProcessor` is
     provisional. When the `WatermarkManager` lands, the output watermark
     will be `min()` across received watermarks and the flush should fire
     only when that minimum moves forward, not on every received
     watermark — comment in the processor flags this.
   - `KStreamsPayload` Coder + Kafka `Serde` for topic-boundary
     serialization is still deferred to the first sub-issue that
     introduces a topic boundary (GBK / repartition).
   - ExecutableStage with state, timers, side inputs is out of scope.
   
   Closes #38743
   Refs #18479
   cc @je-ik


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to