junaiddshaukat opened a new pull request, #38764:
URL: https://github.com/apache/beam/pull/38764
## Summary
Third sub-issue under the Kafka Streams runner GSoC 2026 project.
Adds the ExecutableStage translator and SDK-harness bridge so fused
stateless user code (ParDo etc.) actually runs in the SDK harness over
the Fn API and its outputs flow back into the topology.
Per design doc §4.2 and the live discussion with @je-ik on the issue.
## What's in this PR
- `KafkaStreamsExecutableStageContextFactory` mirroring Flink's pattern.
- `ExecutableStageProcessor` (the harness bridge) +
`ExecutableStageTranslator`.
- URN dispatch registers `ExecutableStage.URN`; `prepareForTranslation` now
runs `GreedyPipelineFuser`.
- `KStreamsPayload.toString` uses `MoreObjects.toStringHelper` (post-merge
tweak from #38689).
- Test deps: `testImplementation project(':sdks:java:harness')` for the
EMBEDDED environment.
## Tests
- `ExecutableStageTranslatorTest` builds `Impulse -> ParDo` via the Beam
Java SDK, drives the resulting topology under `TopologyTestDriver`
with the EMBEDDED environment, and asserts via side effect that the
DoFn ran in the SDK harness with the expected input. Approach
discussed with @je-ik on #38743 — because the ParDo's output has no
downstream consumer, the stage has no output PCollection and the
harness does not deliver the value back to the runner (per Beam
semantics), so the bridge is verified by a recorded side effect.
- New `SharedTestCollector<T>` helper: instances are `Serializable`
but their identity is a UUID; the actual storage lives in a static
registry keyed by UUID. Survives the runner cloning the DoFn
(current or future EMBEDDED behaviour).
- `KafkaStreamsPipelineTranslatorTest` updated so the Impulse case
builds via the Beam SDK (validator-compliant proto for the fuser)
and the URN-rejection case calls `translate` directly to keep the
dispatch-loop contract isolated.
## Validation
- `./gradlew :runners:kafka-streams:check` green locally (14 tests).
- No `@SuppressWarnings` of any flavor in the new code.
## Notes / deferred
- The watermark-driven bundle flush in `ExecutableStageProcessor` is
provisional. When the `WatermarkManager` lands, the output watermark
will be `min()` across received watermarks and the flush should fire
only when that minimum moves forward, not on every received
watermark — comment in the processor flags this.
- `KStreamsPayload` Coder + Kafka `Serde` for topic-boundary
serialization is still deferred to the first sub-issue that
introduces a topic boundary (GBK / repartition).
- ExecutableStage with state, timers, side inputs is out of scope.
Closes #38743
Refs #18479
cc @je-ik
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]