This is an automated email from the ASF dual-hosted git repository.
je-ik pushed a commit to branch feat/18479-kafka-streams-runner-skeleton
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to
refs/heads/feat/18479-kafka-streams-runner-skeleton by this push:
new 4c02a7789a2 [GSoC 2026] Kafka Streams runner — Redistribute translator
+ ExecutableStage type-agnostic edge (#38843)
4c02a7789a2 is described below
commit 4c02a7789a233aaf84c06caf19bd7e4d86a1308c
Author: M Junaid Shaukat <[email protected]>
AuthorDate: Wed Jun 10 14:24:27 2026 +0500
[GSoC 2026] Kafka Streams runner — Redistribute translator +
ExecutableStage type-agnostic edge (#38843)
* Add Redistribute (arbitrarily) translator + type-agnostic ExecutableStage
edge
* Drain pendingOutputs via poll() loop instead of forEach + clear
* Assert ChainedExecutableStageTest pipeline actually has two
ExecutableStages
---
.../translation/ExecutableStageProcessor.java | 43 +++---
.../KafkaStreamsPipelineTranslator.java | 53 ++++++-
.../translation/RedistributeTranslator.java | 54 +++++++
.../translation/ChainedExecutableStageTest.java | 163 +++++++++++++++++++++
4 files changed, 288 insertions(+), 25 deletions(-)
diff --git
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
index 7417088bb67..b8eb5413b27 100644
---
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
+++
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
* receivers.
*/
class ExecutableStageProcessor
- implements Processor<byte[], KStreamsPayload<byte[]>, byte[],
KStreamsPayload<byte[]>> {
+ implements Processor<byte[], KStreamsPayload<?>, byte[],
KStreamsPayload<?>> {
private static final Logger LOG =
LoggerFactory.getLogger(ExecutableStageProcessor.class);
@@ -68,9 +68,13 @@ class ExecutableStageProcessor
// pendingOutputs is enqueued by SDK harness threads (inside the
OutputReceiverFactory callback)
// and drained by the Kafka Streams processing thread on bundle close; needs
to be thread-safe.
- private final Queue<WindowedValue<byte[]>> pendingOutputs = new
ConcurrentLinkedQueue<>();
+ // The element type is intentionally wildcarded: the runner does not need to
know the runtime
+ // value type — the bundle factory handles all coder application at the
Fn-API boundary using
+ // the PCollection coders from the ExecutableStagePayload. Pretending the
type was byte[] was
+ // only safe because the Impulse output coder happens to be ByteArrayCoder.
+ private final Queue<WindowedValue<?>> pendingOutputs = new
ConcurrentLinkedQueue<>();
- private @Nullable ProcessorContext<byte[], KStreamsPayload<byte[]>> context;
+ private @Nullable ProcessorContext<byte[], KStreamsPayload<?>> context;
private @Nullable ExecutableStageContext stageContext;
private @Nullable StageBundleFactory stageBundleFactory;
private @Nullable RemoteBundle currentBundle;
@@ -81,7 +85,7 @@ class ExecutableStageProcessor
}
@Override
- public void init(ProcessorContext<byte[], KStreamsPayload<byte[]>> context) {
+ public void init(ProcessorContext<byte[], KStreamsPayload<?>> context) {
this.context = context;
ExecutableStage executableStage =
ExecutableStage.fromPayload(stagePayload);
this.stageContext =
KafkaStreamsExecutableStageContextFactory.getInstance().get(jobInfo);
@@ -89,8 +93,8 @@ class ExecutableStageProcessor
}
@Override
- public void process(Record<byte[], KStreamsPayload<byte[]>> record) {
- KStreamsPayload<byte[]> payload = record.value();
+ public void process(Record<byte[], KStreamsPayload<?>> record) {
+ KStreamsPayload<?> payload = record.value();
if (payload.isWatermark()) {
// NOTE: flushing the bundle on every received watermark is provisional.
Once the
// WatermarkManager lands, a stage will receive watermarks from multiple
parent instances and
@@ -122,7 +126,7 @@ class ExecutableStageProcessor
// after the bundle closes.
return receivedElement -> {
if (receivedElement != null) {
- pendingOutputs.add((WindowedValue<byte[]>) receivedElement);
+ pendingOutputs.add((WindowedValue<?>) receivedElement);
}
};
}
@@ -143,7 +147,7 @@ class ExecutableStageProcessor
return receiver;
}
- private void closeBundleAndFlush(Record<byte[], KStreamsPayload<byte[]>>
record) {
+ private void closeBundleAndFlush(Record<byte[], KStreamsPayload<?>> record) {
RemoteBundle bundle = currentBundle;
if (bundle == null) {
return;
@@ -157,26 +161,25 @@ class ExecutableStageProcessor
} finally {
currentBundle = null;
}
- ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx =
checkInitialized(context);
+ ProcessorContext<byte[], KStreamsPayload<?>> ctx =
checkInitialized(context);
// The harness has finished the bundle (close() returned) so no further
enqueues happen.
- // ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to
drain via forEach.
- pendingOutputs.forEach(
- output ->
- ctx.forward(
- new Record<byte[], KStreamsPayload<byte[]>>(
- record.key(), KStreamsPayload.data(output),
record.timestamp())));
- pendingOutputs.clear();
+ // Drain via poll() so each element is removed as it is forwarded.
+ WindowedValue<?> output;
+ while ((output = pendingOutputs.poll()) != null) {
+ ctx.forward(
+ new Record<byte[], KStreamsPayload<?>>(
+ record.key(), KStreamsPayload.data(output), record.timestamp()));
+ }
}
- private void forwardWatermark(
- Record<byte[], KStreamsPayload<byte[]>> record, long watermarkMillis) {
+ private void forwardWatermark(Record<byte[], KStreamsPayload<?>> record,
long watermarkMillis) {
// TODO(#38743 / WatermarkManager): a watermark must reach every parallel
instance of every
// downstream processor, but ctx.forward routes to one downstream
partition per Kafka Streams'
// partitioning. The simplest correct approach is to fan the watermark out
to all downstream
// partitions; that wiring lands with the WatermarkManager sub-issue (per
Jan on PR #38764).
- ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx =
checkInitialized(context);
+ ProcessorContext<byte[], KStreamsPayload<?>> ctx =
checkInitialized(context);
ctx.forward(
- new Record<byte[], KStreamsPayload<byte[]>>(
+ new Record<byte[], KStreamsPayload<?>>(
record.key(), KStreamsPayload.watermark(watermarkMillis),
record.timestamp()));
}
diff --git
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
index 5042f542616..4e227749e1a 100644
---
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
+++
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
@@ -17,16 +17,21 @@
*/
package org.apache.beam.runners.kafka.streams.translation;
+import com.google.auto.service.AutoService;
import java.util.Map;
+import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.util.construction.NativeTransforms;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
import org.apache.beam.sdk.util.construction.graph.PipelineNode;
import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
+import
org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
/**
* Translates a portable Beam pipeline into a Kafka Streams {@link
@@ -45,6 +50,7 @@ public class KafkaStreamsPipelineTranslator {
this(
ImmutableMap.<String, PTransformTranslator>builder()
.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new
ImpulseTranslator())
+ .put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, new
RedistributeTranslator())
.put(ExecutableStage.URN, new ExecutableStageTranslator())
.build());
}
@@ -59,11 +65,28 @@ public class KafkaStreamsPipelineTranslator {
}
/**
- * Fuses the pipeline so that stateless user code is grouped into {@code
ExecutableStage} nodes.
+ * Returns the set of URNs this translator handles natively. {@link
+ * TrivialNativeTransformExpander} uses this set to strip the sub-transforms
of runner-native
+ * composites (e.g. {@code Redistribute.arbitrarily}) before fusion, so they
survive into
+ * translation as leaves instead of being expanded into primitives the
runner does not implement
+ * yet (e.g. GroupByKey).
+ */
+ public Set<String> knownUrns() {
+ return urnToTranslator.keySet();
+ }
+
+ /**
+ * Prepares the pipeline for translation:
+ *
+ * <ol>
+ * <li>Trim sub-transforms of runner-native composites listed in {@link
#knownUrns()} so the
+ * fuser leaves them as primitives.
+ * <li>Fuse remaining stateless user code into {@code ExecutableStage}
nodes via {@link
+ * GreedyPipelineFuser}.
+ * </ol>
*
- * <p>Runner-executed primitives that have their own translator (e.g.
Impulse) are left intact;
- * everything else is fused. If the pipeline already contains {@code
ExecutableStage} transforms
- * it is returned unchanged.
+ * <p>If the pipeline already contains {@code ExecutableStage} transforms it
is returned
+ * unchanged.
*/
public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline)
{
boolean alreadyFused =
@@ -72,7 +95,8 @@ public class KafkaStreamsPipelineTranslator {
if (alreadyFused) {
return pipeline;
}
- return GreedyPipelineFuser.fuse(pipeline).toPipeline();
+ RunnerApi.Pipeline trimmed =
TrivialNativeTransformExpander.forKnownUrns(pipeline, knownUrns());
+ return GreedyPipelineFuser.fuse(trimmed).toPipeline();
}
/**
@@ -99,4 +123,23 @@ public class KafkaStreamsPipelineTranslator {
translator.translate(node.getId(), pipeline, context);
}
}
+
+ /**
+ * Tells the SDK that URNs handled directly by the Kafka Streams runner
should be treated as
+ * primitives by {@link QueryablePipeline}. Mirrors Flink's {@code
IsFlinkNativeTransform}
+ * pattern. Without this, {@link TrivialNativeTransformExpander} strips a
composite's
+ * sub-transforms but {@link QueryablePipeline} still does not recognise the
composite itself as a
+ * producer of its outputs, and pipeline validation fails with "consumed but
never produced".
+ */
+ @AutoService(NativeTransforms.IsNativeTransform.class)
+ public static class IsKafkaStreamsNativeTransform implements
NativeTransforms.IsNativeTransform {
+ private static final Set<String> URNS =
+ ImmutableSet.of(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN);
+
+ @Override
+ public boolean test(RunnerApi.PTransform pTransform) {
+ String urn = PTransformTranslation.urnForTransformOrNull(pTransform);
+ return urn != null && URNS.contains(urn);
+ }
+ }
}
diff --git
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java
new file mode 100644
index 00000000000..72c47db8d4b
--- /dev/null
+++
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+
+/**
+ * Runner-native translator for {@code
beam:transform:redistribute_arbitrarily:v1}.
+ *
+ * <p>The default Beam expansion of {@code Redistribute.arbitrarily()} goes
through {@code
+ * GroupByKey} for materialization. The Kafka Streams runner does not need to
do any actual
+ * redistribution in the single-instance topology — Kafka Streams is already
handling per-task
+ * processing — so we provide a passthrough: the output PCollection is mapped
to the same processor
+ * that already produces the input PCollection. No topology node is added.
+ *
+ * <p>The translator is registered in {@link
KafkaStreamsPipelineTranslator#knownUrns()} so {@link
+ * org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander}
strips the
+ * sub-transforms of {@code Redistribute} before the fuser runs. That keeps
the Redistribute
+ * boundary intact and lets the fuser split adjacent stages correctly.
+ *
+ * <p>The keyed variant ({@code redistribute_by_key:v1}) is intentionally not
handled here: it
+ * implies a rehash/partition step that the runner can implement on top of
GroupByKey when that
+ * lands, but cannot reasonably no-op the way the arbitrary variant can.
+ */
+class RedistributeTranslator implements PTransformTranslator {
+
+ @Override
+ public void translate(
+ String transformId, RunnerApi.Pipeline pipeline,
KafkaStreamsTranslationContext context) {
+ RunnerApi.PTransform transform =
pipeline.getComponents().getTransformsOrThrow(transformId);
+ String inputPCollectionId =
Iterables.getOnlyElement(transform.getInputsMap().values());
+ String parentProcessor =
context.getProcessorNameForPCollection(inputPCollectionId);
+ String outputPCollectionId =
Iterables.getOnlyElement(transform.getOutputsMap().values());
+ // Passthrough: downstream lookups for the output PCollection resolve to
the producer of the
+ // input PCollection. No KS Processor / state store / source is added.
+ context.registerPCollectionProducer(outputPCollectionId, parentProcessor);
+ }
+}
diff --git
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java
new file mode 100644
index 00000000000..8ac2175dc39
--- /dev/null
+++
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Redistribute;
+import org.apache.beam.sdk.util.construction.Environments;
+import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.Test;
+
+/**
+ * End-to-end test that exercises non-{@code byte[]} value types flowing
across an {@code
+ * ExecutableStage} boundary.
+ *
+ * <p>Builds a pipeline {@code Impulse -> MapElements<byte[], Integer> ->
Redistribute.arbitrarily()
+ * -> ParDo<Integer, Void>(record into SharedTestCollector)}. The Redistribute
boundary terminates
+ * the upstream {@code ExecutableStage} (because the runner registers
Redistribute as a
+ * runner-native transform in {@link
KafkaStreamsPipelineTranslator#knownUrns()}), so the
+ * Integer-emitting stage and the recording stage live in separate stages and
the Integer payload
+ * actually flows stage-to-stage through the runner — not just through user
code inside a single
+ * fused stage. That is what proves the {@code byte[]}-erasure cast we used to
do was a latent bug,
+ * and that the type-agnostic {@code KStreamsPayload<?>} edge handles it
correctly.
+ */
+public class ChainedExecutableStageTest {
+
+ private static final String JOB_ID = "ks-chained-stage-test";
+ private static final String APPLICATION_ID = "ks-chained-stage-test";
+ private static final int EXPECTED_VALUE = 42;
+
+ /**
+ * Records the integer payload it sees so the test can verify it survived
the harness round-trip
+ * across the Redistribute boundary.
+ */
+ private static class RecordingFn extends DoFn<Integer, Void> {
+ private final SharedTestCollector<Integer> collector;
+
+ RecordingFn(SharedTestCollector<Integer> collector) {
+ this.collector = collector;
+ }
+
+ @ProcessElement
+ public void processElement(@Element Integer input) {
+ collector.record(input);
+ }
+ }
+
+ @Test
+ public void chainedStagesPropagateIntegerValueAcrossRedistributeBoundary()
throws Exception {
+ try (SharedTestCollector<Integer> collector =
SharedTestCollector.create()) {
+ Pipeline pipeline = Pipeline.create(pipelineOptions());
+ pipeline
+ .apply("impulse", Impulse.create())
+ .apply(
+ "to-integer",
+ MapElements.into(TypeDescriptors.integers()).via((byte[]
ignored) -> EXPECTED_VALUE))
+ .setTypeDescriptor(TypeDescriptor.of(Integer.class))
+ .apply("redistribute", Redistribute.<Integer>arbitrarily())
+ .apply("record", ParDo.of(new RecordingFn(collector)));
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+
+ KafkaStreamsPipelineOptions options =
+ pipeline.getOptions().as(KafkaStreamsPipelineOptions.class);
+ KafkaStreamsPipelineTranslator translator = new
KafkaStreamsPipelineTranslator();
+ JobInfo jobInfo =
+ JobInfo.create(
+ JOB_ID, options.getJobName(), "",
PipelineOptionsTranslation.toProto(options));
+ KafkaStreamsTranslationContext context =
+ translator.createTranslationContext(jobInfo, options);
+
+ RunnerApi.Pipeline preparedPipeline =
translator.prepareForTranslation(pipelineProto);
+
+ // Verify the Redistribute boundary actually split the user code into
two separate stages.
+ // Without this, the test could pass even if the fuser collapsed
MapElements and the
+ // RecordingFn into one stage — in which case the Integer payload would
flow through user
+ // code inside a single stage and never cross the runner-side
ExecutableStage edge that
+ // this test exists to exercise.
+ long executableStageCount =
+ preparedPipeline.getComponents().getTransformsMap().values().stream()
+ .filter(t -> ExecutableStage.URN.equals(t.getSpec().getUrn()))
+ .count();
+ assertThat(
+ "expected two ExecutableStages (upstream MapElements + downstream
RecordingFn ParDo) "
+ + "separated by the Redistribute boundary",
+ executableStageCount,
+ is(2L));
+
+ translator.translate(context, preparedPipeline);
+
+ Topology topology = context.getTopology();
+ try (TopologyTestDriver driver = new TopologyTestDriver(topology,
streamsConfig())) {
+ driver.advanceWallClockTime(Duration.ofSeconds(1));
+ driver.advanceWallClockTime(Duration.ofSeconds(1));
+ }
+
+ List<Integer> recorded = collector.recorded();
+ assertThat(recorded.size(), is(1));
+ assertThat(recorded.get(0), is(EXPECTED_VALUE));
+ }
+ }
+
+ private static PipelineOptions pipelineOptions() {
+ PipelineOptions options =
+ PipelineOptionsFactory.fromArgs("--applicationId=" +
APPLICATION_ID).create();
+ options.setRunner(CrashingRunner.class);
+
options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID);
+ options
+ .as(PortablePipelineOptions.class)
+ .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
+ return options;
+ }
+
+ private static Properties streamsConfig() {
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(
+ StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
+ props.put(
+ StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
+ return props;
+ }
+}