junaiddshaukat commented on code in PR #38843: URL: https://github.com/apache/beam/pull/38843#discussion_r3381682442
########## runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ChainedExecutableStageTest.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.util.construction.Environments; +import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.Test; + +/** + * End-to-end test that exercises non-{@code byte[]} value types flowing across an {@code + * ExecutableStage} boundary. + * + * <p>Builds a pipeline {@code Impulse -> MapElements<byte[], Integer> -> Redistribute.arbitrarily() + * -> ParDo<Integer, Void>(record into SharedTestCollector)}. The Redistribute boundary terminates + * the upstream {@code ExecutableStage} (because the runner registers Redistribute as a + * runner-native transform in {@link KafkaStreamsPipelineTranslator#knownUrns()}), so the + * Integer-emitting stage and the recording stage live in separate stages and the Integer payload + * actually flows stage-to-stage through the runner — not just through user code inside a single + * fused stage. That is what proves the {@code byte[]}-erasure cast we used to do was a latent bug, + * and that the type-agnostic {@code KStreamsPayload<?>} edge handles it correctly. + */ +public class ChainedExecutableStageTest { + + private static final String JOB_ID = "ks-chained-stage-test"; + private static final String APPLICATION_ID = "ks-chained-stage-test"; + private static final int EXPECTED_VALUE = 42; + + /** + * Records the integer payload it sees so the test can verify it survived the harness round-trip + * across the Redistribute boundary. + */ + private static class RecordingFn extends DoFn<Integer, Void> { + private final SharedTestCollector<Integer> collector; + + RecordingFn(SharedTestCollector<Integer> collector) { + this.collector = collector; + } + + @ProcessElement + public void processElement(@Element Integer input) { + collector.record(input); + } + } + + @Test + public void chainedStagesPropagateIntegerValueAcrossRedistributeBoundary() throws Exception { + try (SharedTestCollector<Integer> collector = SharedTestCollector.create()) { + Pipeline pipeline = Pipeline.create(pipelineOptions()); + pipeline + .apply("impulse", Impulse.create()) + .apply( + "to-integer", + MapElements.into(TypeDescriptors.integers()).via((byte[] ignored) -> EXPECTED_VALUE)) + .setTypeDescriptor(TypeDescriptor.of(Integer.class)) + .apply("redistribute", Redistribute.<Integer>arbitrarily()) + .apply("record", ParDo.of(new RecordingFn(collector))); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); Review Comment: Done added an assertion right after `prepareForTranslation` that counts ExecutableStage URN transforms in the prepared pipeline proto and requires exactly two. Verified locally that the two stages are the upstream MapElements stage and the downstream RecordingFn ParDo stage, separated by the runner-native Redistribute boundary. If fusion ever collapsed them the test fails before we even hit the topology driver. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
