This is an automated email from the ASF dual-hosted git repository.

je-ik pushed a commit to branch feat/18479-kafka-streams-runner-skeleton
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/feat/18479-kafka-streams-runner-skeleton by this push:
     new 4c02a7789a2 [GSoC 2026] Kafka Streams runner — Redistribute translator 
+ ExecutableStage type-agnostic edge (#38843)
4c02a7789a2 is described below

commit 4c02a7789a233aaf84c06caf19bd7e4d86a1308c
Author: M Junaid Shaukat <[email protected]>
AuthorDate: Wed Jun 10 14:24:27 2026 +0500

    [GSoC 2026] Kafka Streams runner — Redistribute translator + 
ExecutableStage type-agnostic edge (#38843)
    
    * Add Redistribute (arbitrarily) translator + type-agnostic ExecutableStage 
edge
    * Drain pendingOutputs via poll() loop instead of forEach + clear
    * Assert ChainedExecutableStageTest pipeline actually has two 
ExecutableStages
---
 .../translation/ExecutableStageProcessor.java      |  43 +++---
 .../KafkaStreamsPipelineTranslator.java            |  53 ++++++-
 .../translation/RedistributeTranslator.java        |  54 +++++++
 .../translation/ChainedExecutableStageTest.java    | 163 +++++++++++++++++++++
 4 files changed, 288 insertions(+), 25 deletions(-)

diff --git 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
index 7417088bb67..b8eb5413b27 100644
--- 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
+++ 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
  * receivers.
  */
 class ExecutableStageProcessor
-    implements Processor<byte[], KStreamsPayload<byte[]>, byte[], 
KStreamsPayload<byte[]>> {
+    implements Processor<byte[], KStreamsPayload<?>, byte[], 
KStreamsPayload<?>> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ExecutableStageProcessor.class);
 
@@ -68,9 +68,13 @@ class ExecutableStageProcessor
 
   // pendingOutputs is enqueued by SDK harness threads (inside the 
OutputReceiverFactory callback)
   // and drained by the Kafka Streams processing thread on bundle close; needs 
to be thread-safe.
-  private final Queue<WindowedValue<byte[]>> pendingOutputs = new 
ConcurrentLinkedQueue<>();
+  // The element type is intentionally wildcarded: the runner does not need to 
know the runtime
+  // value type — the bundle factory handles all coder application at the 
Fn-API boundary using
+  // the PCollection coders from the ExecutableStagePayload. Pretending the 
type was byte[] was
+  // only safe because the Impulse output coder happens to be ByteArrayCoder.
+  private final Queue<WindowedValue<?>> pendingOutputs = new 
ConcurrentLinkedQueue<>();
 
-  private @Nullable ProcessorContext<byte[], KStreamsPayload<byte[]>> context;
+  private @Nullable ProcessorContext<byte[], KStreamsPayload<?>> context;
   private @Nullable ExecutableStageContext stageContext;
   private @Nullable StageBundleFactory stageBundleFactory;
   private @Nullable RemoteBundle currentBundle;
@@ -81,7 +85,7 @@ class ExecutableStageProcessor
   }
 
   @Override
-  public void init(ProcessorContext<byte[], KStreamsPayload<byte[]>> context) {
+  public void init(ProcessorContext<byte[], KStreamsPayload<?>> context) {
     this.context = context;
     ExecutableStage executableStage = 
ExecutableStage.fromPayload(stagePayload);
     this.stageContext = 
KafkaStreamsExecutableStageContextFactory.getInstance().get(jobInfo);
@@ -89,8 +93,8 @@ class ExecutableStageProcessor
   }
 
   @Override
-  public void process(Record<byte[], KStreamsPayload<byte[]>> record) {
-    KStreamsPayload<byte[]> payload = record.value();
+  public void process(Record<byte[], KStreamsPayload<?>> record) {
+    KStreamsPayload<?> payload = record.value();
     if (payload.isWatermark()) {
       // NOTE: flushing the bundle on every received watermark is provisional. 
Once the
       // WatermarkManager lands, a stage will receive watermarks from multiple 
parent instances and
@@ -122,7 +126,7 @@ class ExecutableStageProcessor
             // after the bundle closes.
             return receivedElement -> {
               if (receivedElement != null) {
-                pendingOutputs.add((WindowedValue<byte[]>) receivedElement);
+                pendingOutputs.add((WindowedValue<?>) receivedElement);
               }
             };
           }
@@ -143,7 +147,7 @@ class ExecutableStageProcessor
     return receiver;
   }
 
-  private void closeBundleAndFlush(Record<byte[], KStreamsPayload<byte[]>> 
record) {
+  private void closeBundleAndFlush(Record<byte[], KStreamsPayload<?>> record) {
     RemoteBundle bundle = currentBundle;
     if (bundle == null) {
       return;
@@ -157,26 +161,25 @@ class ExecutableStageProcessor
     } finally {
       currentBundle = null;
     }
-    ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx = 
checkInitialized(context);
+    ProcessorContext<byte[], KStreamsPayload<?>> ctx = 
checkInitialized(context);
     // The harness has finished the bundle (close() returned) so no further 
enqueues happen.
-    // ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to 
drain via forEach.
-    pendingOutputs.forEach(
-        output ->
-            ctx.forward(
-                new Record<byte[], KStreamsPayload<byte[]>>(
-                    record.key(), KStreamsPayload.data(output), 
record.timestamp())));
-    pendingOutputs.clear();
+    // Drain via poll() so each element is removed as it is forwarded.
+    WindowedValue<?> output;
+    while ((output = pendingOutputs.poll()) != null) {
+      ctx.forward(
+          new Record<byte[], KStreamsPayload<?>>(
+              record.key(), KStreamsPayload.data(output), record.timestamp()));
+    }
   }
 
-  private void forwardWatermark(
-      Record<byte[], KStreamsPayload<byte[]>> record, long watermarkMillis) {
+  private void forwardWatermark(Record<byte[], KStreamsPayload<?>> record, 
long watermarkMillis) {
     // TODO(#38743 / WatermarkManager): a watermark must reach every parallel 
instance of every
     // downstream processor, but ctx.forward routes to one downstream 
partition per Kafka Streams'
     // partitioning. The simplest correct approach is to fan the watermark out 
to all downstream
     // partitions; that wiring lands with the WatermarkManager sub-issue (per 
Jan on PR #38764).
-    ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx = 
checkInitialized(context);
+    ProcessorContext<byte[], KStreamsPayload<?>> ctx = 
checkInitialized(context);
     ctx.forward(
-        new Record<byte[], KStreamsPayload<byte[]>>(
+        new Record<byte[], KStreamsPayload<?>>(
             record.key(), KStreamsPayload.watermark(watermarkMillis), 
record.timestamp()));
   }
 
diff --git 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
index 5042f542616..4e227749e1a 100644
--- 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
+++ 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
@@ -17,16 +17,21 @@
  */
 package org.apache.beam.runners.kafka.streams.translation;
 
+import com.google.auto.service.AutoService;
 import java.util.Map;
+import java.util.Set;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.util.construction.NativeTransforms;
 import org.apache.beam.sdk.util.construction.PTransformTranslation;
 import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
 import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.sdk.util.construction.graph.PipelineNode;
 import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
+import 
org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 
 /**
  * Translates a portable Beam pipeline into a Kafka Streams {@link
@@ -45,6 +50,7 @@ public class KafkaStreamsPipelineTranslator {
     this(
         ImmutableMap.<String, PTransformTranslator>builder()
             .put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new 
ImpulseTranslator())
+            .put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, new 
RedistributeTranslator())
             .put(ExecutableStage.URN, new ExecutableStageTranslator())
             .build());
   }
@@ -59,11 +65,28 @@ public class KafkaStreamsPipelineTranslator {
   }
 
   /**
-   * Fuses the pipeline so that stateless user code is grouped into {@code 
ExecutableStage} nodes.
+   * Returns the set of URNs this translator handles natively. {@link
+   * TrivialNativeTransformExpander} uses this set to strip the sub-transforms 
of runner-native
+   * composites (e.g. {@code Redistribute.arbitrarily}) before fusion, so they 
survive into
+   * translation as leaves instead of being expanded into primitives the 
runner does not implement
+   * yet (e.g. GroupByKey).
+   */
+  public Set<String> knownUrns() {
+    return urnToTranslator.keySet();
+  }
+
+  /**
+   * Prepares the pipeline for translation:
+   *
+   * <ol>
+   *   <li>Trim sub-transforms of runner-native composites listed in {@link 
#knownUrns()} so the
+   *       fuser leaves them as primitives.
+   *   <li>Fuse remaining stateless user code into {@code ExecutableStage} 
nodes via {@link
+   *       GreedyPipelineFuser}.
+   * </ol>
    *
-   * <p>Runner-executed primitives that have their own translator (e.g. 
Impulse) are left intact;
-   * everything else is fused. If the pipeline already contains {@code 
ExecutableStage} transforms
-   * it is returned unchanged.
+   * <p>If the pipeline already contains {@code ExecutableStage} transforms it 
is returned
+   * unchanged.
    */
   public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) 
{
     boolean alreadyFused =
@@ -72,7 +95,8 @@ public class KafkaStreamsPipelineTranslator {
     if (alreadyFused) {
       return pipeline;
     }
-    return GreedyPipelineFuser.fuse(pipeline).toPipeline();
+    RunnerApi.Pipeline trimmed = 
TrivialNativeTransformExpander.forKnownUrns(pipeline, knownUrns());
+    return GreedyPipelineFuser.fuse(trimmed).toPipeline();
   }
 
   /**
@@ -99,4 +123,23 @@ public class KafkaStreamsPipelineTranslator {
       translator.translate(node.getId(), pipeline, context);
     }
   }
+
+  /**
+   * Tells the SDK that URNs handled directly by the Kafka Streams runner 
should be treated as
+   * primitives by {@link QueryablePipeline}. Mirrors Flink's {@code 
IsFlinkNativeTransform}
+   * pattern. Without this, {@link TrivialNativeTransformExpander} strips a 
composite's
+   * sub-transforms but {@link QueryablePipeline} still does not recognise the 
composite itself as a
+   * producer of its outputs, and pipeline validation fails with "consumed but 
never produced".
+   */
+  @AutoService(NativeTransforms.IsNativeTransform.class)
+  public static class IsKafkaStreamsNativeTransform implements 
NativeTransforms.IsNativeTransform {
+    private static final Set<String> URNS =
+        ImmutableSet.of(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN);
+
+    @Override
+    public boolean test(RunnerApi.PTransform pTransform) {
+      String urn = PTransformTranslation.urnForTransformOrNull(pTransform);
+      return urn != null && URNS.contains(urn);
+    }
+  }
 }
diff --git 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java
 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java
new file mode 100644
index 00000000000..72c47db8d4b
--- /dev/null
+++ 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+
+/**
+ * Runner-native translator for {@code 
beam:transform:redistribute_arbitrarily:v1}.
+ *
+ * <p>The default Beam expansion of {@code Redistribute.arbitrarily()} goes 
through {@code
+ * GroupByKey} for materialization. The Kafka Streams runner does not need to 
do any actual
+ * redistribution in the single-instance topology — Kafka Streams is already 
handling per-task
+ * processing — so we provide a passthrough: the output PCollection is mapped 
to the same processor
+ * that already produces the input PCollection. No topology node is added.
+ *
+ * <p>The translator is registered in {@link 
KafkaStreamsPipelineTranslator#knownUrns()} so {@link
+ * org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander} 
strips the
+ * sub-transforms of {@code Redistribute} before the fuser runs. That keeps 
the Redistribute
+ * boundary intact and lets the fuser split adjacent stages correctly.
+ *
+ * <p>The keyed variant ({@code redistribute_by_key:v1}) is intentionally not 
handled here: it
+ * implies a rehash/partition step that the runner can implement on top of 
GroupByKey when that
+ * lands, but cannot reasonably no-op the way the arbitrary variant can.
+ */
+class RedistributeTranslator implements PTransformTranslator {
+
+  @Override
+  public void translate(
+      String transformId, RunnerApi.Pipeline pipeline, 
KafkaStreamsTranslationContext context) {
+    RunnerApi.PTransform transform = 
pipeline.getComponents().getTransformsOrThrow(transformId);
+    String inputPCollectionId = 
Iterables.getOnlyElement(transform.getInputsMap().values());
+    String parentProcessor = 
context.getProcessorNameForPCollection(inputPCollectionId);
+    String outputPCollectionId = 
Iterables.getOnlyElement(transform.getOutputsMap().values());
+    // Passthrough: downstream lookups for the output PCollection resolve to 
the producer of the
+    // input PCollection. No KS Processor / state store / source is added.
+    context.registerPCollectionProducer(outputPCollectionId, parentProcessor);
+  }
+}
diff --git 
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java
 
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java
new file mode 100644
index 00000000000..8ac2175dc39
--- /dev/null
+++ 
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Redistribute;
+import org.apache.beam.sdk.util.construction.Environments;
+import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.Test;
+
+/**
+ * End-to-end test that exercises non-{@code byte[]} value types flowing 
across an {@code
+ * ExecutableStage} boundary.
+ *
+ * <p>Builds a pipeline {@code Impulse -> MapElements<byte[], Integer> -> 
Redistribute.arbitrarily()
+ * -> ParDo<Integer, Void>(record into SharedTestCollector)}. The Redistribute 
boundary terminates
+ * the upstream {@code ExecutableStage} (because the runner registers 
Redistribute as a
+ * runner-native transform in {@link 
KafkaStreamsPipelineTranslator#knownUrns()}), so the
+ * Integer-emitting stage and the recording stage live in separate stages and 
the Integer payload
+ * actually flows stage-to-stage through the runner — not just through user 
code inside a single
+ * fused stage. That is what proves the {@code byte[]}-erasure cast we used to 
do was a latent bug,
+ * and that the type-agnostic {@code KStreamsPayload<?>} edge handles it 
correctly.
+ */
+public class ChainedExecutableStageTest {
+
+  private static final String JOB_ID = "ks-chained-stage-test";
+  private static final String APPLICATION_ID = "ks-chained-stage-test";
+  private static final int EXPECTED_VALUE = 42;
+
+  /**
+   * Records the integer payload it sees so the test can verify it survived 
the harness round-trip
+   * across the Redistribute boundary.
+   */
+  private static class RecordingFn extends DoFn<Integer, Void> {
+    private final SharedTestCollector<Integer> collector;
+
+    RecordingFn(SharedTestCollector<Integer> collector) {
+      this.collector = collector;
+    }
+
+    @ProcessElement
+    public void processElement(@Element Integer input) {
+      collector.record(input);
+    }
+  }
+
+  @Test
+  public void chainedStagesPropagateIntegerValueAcrossRedistributeBoundary() 
throws Exception {
+    try (SharedTestCollector<Integer> collector = 
SharedTestCollector.create()) {
+      Pipeline pipeline = Pipeline.create(pipelineOptions());
+      pipeline
+          .apply("impulse", Impulse.create())
+          .apply(
+              "to-integer",
+              MapElements.into(TypeDescriptors.integers()).via((byte[] 
ignored) -> EXPECTED_VALUE))
+          .setTypeDescriptor(TypeDescriptor.of(Integer.class))
+          .apply("redistribute", Redistribute.<Integer>arbitrarily())
+          .apply("record", ParDo.of(new RecordingFn(collector)));
+
+      RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+
+      KafkaStreamsPipelineOptions options =
+          pipeline.getOptions().as(KafkaStreamsPipelineOptions.class);
+      KafkaStreamsPipelineTranslator translator = new 
KafkaStreamsPipelineTranslator();
+      JobInfo jobInfo =
+          JobInfo.create(
+              JOB_ID, options.getJobName(), "", 
PipelineOptionsTranslation.toProto(options));
+      KafkaStreamsTranslationContext context =
+          translator.createTranslationContext(jobInfo, options);
+
+      RunnerApi.Pipeline preparedPipeline = 
translator.prepareForTranslation(pipelineProto);
+
+      // Verify the Redistribute boundary actually split the user code into 
two separate stages.
+      // Without this, the test could pass even if the fuser collapsed 
MapElements and the
+      // RecordingFn into one stage — in which case the Integer payload would 
flow through user
+      // code inside a single stage and never cross the runner-side 
ExecutableStage edge that
+      // this test exists to exercise.
+      long executableStageCount =
+          preparedPipeline.getComponents().getTransformsMap().values().stream()
+              .filter(t -> ExecutableStage.URN.equals(t.getSpec().getUrn()))
+              .count();
+      assertThat(
+          "expected two ExecutableStages (upstream MapElements + downstream 
RecordingFn ParDo) "
+              + "separated by the Redistribute boundary",
+          executableStageCount,
+          is(2L));
+
+      translator.translate(context, preparedPipeline);
+
+      Topology topology = context.getTopology();
+      try (TopologyTestDriver driver = new TopologyTestDriver(topology, 
streamsConfig())) {
+        driver.advanceWallClockTime(Duration.ofSeconds(1));
+        driver.advanceWallClockTime(Duration.ofSeconds(1));
+      }
+
+      List<Integer> recorded = collector.recorded();
+      assertThat(recorded.size(), is(1));
+      assertThat(recorded.get(0), is(EXPECTED_VALUE));
+    }
+  }
+
+  private static PipelineOptions pipelineOptions() {
+    PipelineOptions options =
+        PipelineOptionsFactory.fromArgs("--applicationId=" + 
APPLICATION_ID).create();
+    options.setRunner(CrashingRunner.class);
+    
options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID);
+    options
+        .as(PortablePipelineOptions.class)
+        .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
+    return options;
+  }
+
+  private static Properties streamsConfig() {
+    Properties props = new Properties();
+    props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    props.put(
+        StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.ByteArray().getClass().getName());
+    props.put(
+        StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArray().getClass().getName());
+    return props;
+  }
+}

Reply via email to