gemini-code-assist[bot] commented on code in PR #38764: URL: https://github.com/apache/beam/pull/38764#discussion_r3336633894
########## runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.util.ArrayDeque; +import java.util.Queue; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.control.BundleProgressHandler; +import org.apache.beam.runners.fnexecution.control.ExecutableStageContext; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.construction.graph.ExecutableStage; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kafka Streams {@link Processor} that executes a fused {@link ExecutableStage} (stateless user + * code such as ParDo) in the Beam SDK harness over the Fn API. + * + * <p>For each {@link KStreamsPayload#isData() data} payload it unwraps the {@link WindowedValue} + * and feeds it to the harness through the stage's main input {@link FnDataReceiver}. Harness + * outputs are collected on the harness threads into {@link #pendingOutputs} and then flushed + * downstream on the Kafka Streams processing thread when the bundle closes — Kafka Streams' {@link + * ProcessorContext#forward} must only be called from the processing thread, so outputs are never + * forwarded directly from a harness callback. + * + * <p>A {@link KStreamsPayload#isWatermark() watermark} payload marks a bundle boundary: the open + * bundle (if any) is closed (flushing outputs), and the watermark is then forwarded downstream so + * that subsequent stages observe it after all data of the bundle. + * + * <p>This is the Kafka Streams analogue of Flink's {@code ExecutableStageDoFnOperator} and Spark's + * {@code SparkExecutableStageFunction}. State, timers, and side inputs are out of scope for this + * first version: the stage is executed with {@link StateRequestHandler#unsupported()} and no timer + * receivers. + */ +class ExecutableStageProcessor + implements Processor<byte[], KStreamsPayload<byte[]>, byte[], KStreamsPayload<byte[]>> { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutableStageProcessor.class); + + private final RunnerApi.ExecutableStagePayload stagePayload; + private final JobInfo jobInfo; + + private final Queue<WindowedValue<byte[]>> pendingOutputs = new ArrayDeque<>(); Review Comment:  `pendingOutputs` is accessed concurrently: it is populated by SDK harness threads inside the `FnDataReceiver` callback and drained by the Kafka Streams processing thread. Since `ArrayDeque` is not thread-safe and lacks memory visibility guarantees across threads, this can lead to race conditions or data corruption. Using `ConcurrentLinkedQueue` provides a thread-safe, lock-free queue that is ideal for this producer-consumer pattern. ```suggestion private final Queue<WindowedValue<byte[]>> pendingOutputs = new ConcurrentLinkedQueue<>(); ``` ########## runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.util.ArrayDeque; Review Comment:  Replace the `ArrayDeque` import with `ConcurrentLinkedQueue` to support the thread-safe queue implementation for `pendingOutputs`. ```suggestion import java.util.concurrent.ConcurrentLinkedQueue; ``` ########## runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.io.IOException; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.kafka.streams.Topology; + +/** + * Translates the {@code beam:runner:executable_stage:v1} URN. + * + * <p>Adds an {@link ExecutableStageProcessor} node to the topology, wired to the processor that + * produces the stage's input PCollection (resolved through {@link + * KafkaStreamsTranslationContext#getProcessorNameForPCollection}). The processor runs the fused + * user code in the SDK harness; its single output PCollection is registered so downstream + * translators can attach to this node. + * + * <p>Multi-output stages (additional outputs / side inputs / state / timers) are out of scope for + * this first version and are rejected so the limitation fails fast rather than silently dropping + * outputs. + */ +class ExecutableStageTranslator implements PTransformTranslator { + + @Override + public void translate( + String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) { + RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId); + + RunnerApi.ExecutableStagePayload stagePayload; + try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); + } catch (IOException e) { + throw new IllegalArgumentException( + "Failed to parse ExecutableStagePayload for transform " + transformId, e); + } + + String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); + String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); Review Comment:  Instead of using `Iterables.getOnlyElement(transform.getInputsMap().values())`, which will fail with a generic `IllegalArgumentException` if there are side inputs, we can directly retrieve the main input PCollection ID from `stagePayload.getInput()`. Additionally, we should explicitly check and reject side inputs, user states, and timers using `stagePayload` to fail fast with a clear, descriptive error message. ```java if (stagePayload.getSideInputsCount() > 0) { throw new UnsupportedOperationException( "ExecutableStage " + transformId + " has side inputs; side inputs are not yet supported by the Kafka Streams runner."); } if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() > 0) { throw new UnsupportedOperationException( "ExecutableStage " + transformId + " has user states or timers; these are not yet supported by the Kafka Streams runner."); } String inputPCollectionId = stagePayload.getInput(); String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
