junaiddshaukat commented on code in PR #38843:
URL: https://github.com/apache/beam/pull/38843#discussion_r3381682442


##########
runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Redistribute;
+import org.apache.beam.sdk.util.construction.Environments;
+import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.Test;
+
+/**
+ * End-to-end test that exercises non-{@code byte[]} value types flowing 
across an {@code
+ * ExecutableStage} boundary.
+ *
+ * <p>Builds a pipeline {@code Impulse -> MapElements<byte[], Integer> -> 
Redistribute.arbitrarily()
+ * -> ParDo<Integer, Void>(record into SharedTestCollector)}. The Redistribute 
boundary terminates
+ * the upstream {@code ExecutableStage} (because the runner registers 
Redistribute as a
+ * runner-native transform in {@link 
KafkaStreamsPipelineTranslator#knownUrns()}), so the
+ * Integer-emitting stage and the recording stage live in separate stages and 
the Integer payload
+ * actually flows stage-to-stage through the runner — not just through user 
code inside a single
+ * fused stage. That is what proves the {@code byte[]}-erasure cast we used to 
do was a latent bug,
+ * and that the type-agnostic {@code KStreamsPayload<?>} edge handles it 
correctly.
+ */
+public class ChainedExecutableStageTest {
+
+  private static final String JOB_ID = "ks-chained-stage-test";
+  private static final String APPLICATION_ID = "ks-chained-stage-test";
+  private static final int EXPECTED_VALUE = 42;
+
+  /**
+   * Records the integer payload it sees so the test can verify it survived 
the harness round-trip
+   * across the Redistribute boundary.
+   */
+  private static class RecordingFn extends DoFn<Integer, Void> {
+    private final SharedTestCollector<Integer> collector;
+
+    RecordingFn(SharedTestCollector<Integer> collector) {
+      this.collector = collector;
+    }
+
+    @ProcessElement
+    public void processElement(@Element Integer input) {
+      collector.record(input);
+    }
+  }
+
+  @Test
+  public void chainedStagesPropagateIntegerValueAcrossRedistributeBoundary() 
throws Exception {
+    try (SharedTestCollector<Integer> collector = 
SharedTestCollector.create()) {
+      Pipeline pipeline = Pipeline.create(pipelineOptions());
+      pipeline
+          .apply("impulse", Impulse.create())
+          .apply(
+              "to-integer",
+              MapElements.into(TypeDescriptors.integers()).via((byte[] 
ignored) -> EXPECTED_VALUE))
+          .setTypeDescriptor(TypeDescriptor.of(Integer.class))
+          .apply("redistribute", Redistribute.<Integer>arbitrarily())
+          .apply("record", ParDo.of(new RecordingFn(collector)));
+
+      RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);

Review Comment:
   Done added an assertion right after `prepareForTranslation` that counts 
ExecutableStage URN transforms in the prepared pipeline proto and requires 
exactly two. Verified locally that the two stages are the upstream MapElements 
stage and the downstream RecordingFn ParDo stage, separated by the 
runner-native Redistribute boundary. If fusion ever collapsed them the test 
fails before we even hit the topology driver. Thanks!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to