gemini-code-assist[bot] commented on code in PR #38764:
URL: https://github.com/apache/beam/pull/38764#discussion_r3336633894


##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.values.WindowedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kafka Streams {@link Processor} that executes a fused {@link 
ExecutableStage} (stateless user
+ * code such as ParDo) in the Beam SDK harness over the Fn API.
+ *
+ * <p>For each {@link KStreamsPayload#isData() data} payload it unwraps the 
{@link WindowedValue}
+ * and feeds it to the harness through the stage's main input {@link 
FnDataReceiver}. Harness
+ * outputs are collected on the harness threads into {@link #pendingOutputs} 
and then flushed
+ * downstream on the Kafka Streams processing thread when the bundle closes — 
Kafka Streams' {@link
+ * ProcessorContext#forward} must only be called from the processing thread, 
so outputs are never
+ * forwarded directly from a harness callback.
+ *
+ * <p>A {@link KStreamsPayload#isWatermark() watermark} payload marks a bundle 
boundary: the open
+ * bundle (if any) is closed (flushing outputs), and the watermark is then 
forwarded downstream so
+ * that subsequent stages observe it after all data of the bundle.
+ *
+ * <p>This is the Kafka Streams analogue of Flink's {@code 
ExecutableStageDoFnOperator} and Spark's
+ * {@code SparkExecutableStageFunction}. State, timers, and side inputs are 
out of scope for this
+ * first version: the stage is executed with {@link 
StateRequestHandler#unsupported()} and no timer
+ * receivers.
+ */
+class ExecutableStageProcessor
+    implements Processor<byte[], KStreamsPayload<byte[]>, byte[], 
KStreamsPayload<byte[]>> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExecutableStageProcessor.class);
+
+  private final RunnerApi.ExecutableStagePayload stagePayload;
+  private final JobInfo jobInfo;
+
+  private final Queue<WindowedValue<byte[]>> pendingOutputs = new 
ArrayDeque<>();

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   `pendingOutputs` is accessed concurrently: it is populated by SDK harness 
threads inside the `FnDataReceiver` callback and drained by the Kafka Streams 
processing thread. Since `ArrayDeque` is not thread-safe and lacks memory 
visibility guarantees across threads, this can lead to race conditions or data 
corruption.
   
   Using `ConcurrentLinkedQueue` provides a thread-safe, lock-free queue that 
is ideal for this producer-consumer pattern.
   
   ```suggestion
     private final Queue<WindowedValue<byte[]>> pendingOutputs = new 
ConcurrentLinkedQueue<>();
   ```



##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.ArrayDeque;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Replace the `ArrayDeque` import with `ConcurrentLinkedQueue` to support the 
thread-safe queue implementation for `pendingOutputs`.
   
   ```suggestion
   import java.util.concurrent.ConcurrentLinkedQueue;
   ```



##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.kafka.streams.Topology;
+
+/**
+ * Translates the {@code beam:runner:executable_stage:v1} URN.
+ *
+ * <p>Adds an {@link ExecutableStageProcessor} node to the topology, wired to 
the processor that
+ * produces the stage's input PCollection (resolved through {@link
+ * KafkaStreamsTranslationContext#getProcessorNameForPCollection}). The 
processor runs the fused
+ * user code in the SDK harness; its single output PCollection is registered 
so downstream
+ * translators can attach to this node.
+ *
+ * <p>Multi-output stages (additional outputs / side inputs / state / timers) 
are out of scope for
+ * this first version and are rejected so the limitation fails fast rather 
than silently dropping
+ * outputs.
+ */
+class ExecutableStageTranslator implements PTransformTranslator {
+
+  @Override
+  public void translate(
+      String transformId, RunnerApi.Pipeline pipeline, 
KafkaStreamsTranslationContext context) {
+    RunnerApi.PTransform transform = 
pipeline.getComponents().getTransformsOrThrow(transformId);
+
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+    } catch (IOException e) {
+      throw new IllegalArgumentException(
+          "Failed to parse ExecutableStagePayload for transform " + 
transformId, e);
+    }
+
+    String inputPCollectionId = 
Iterables.getOnlyElement(transform.getInputsMap().values());
+    String parentProcessor = 
context.getProcessorNameForPCollection(inputPCollectionId);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Instead of using 
`Iterables.getOnlyElement(transform.getInputsMap().values())`, which will fail 
with a generic `IllegalArgumentException` if there are side inputs, we can 
directly retrieve the main input PCollection ID from `stagePayload.getInput()`.
   
   Additionally, we should explicitly check and reject side inputs, user 
states, and timers using `stagePayload` to fail fast with a clear, descriptive 
error message.
   
   ```java
       if (stagePayload.getSideInputsCount() > 0) {
         throw new UnsupportedOperationException(
             "ExecutableStage "
                 + transformId
                 + " has side inputs; side inputs are not yet supported by the 
Kafka Streams runner.");
       }
       if (stagePayload.getUserStatesCount() > 0 || 
stagePayload.getTimersCount() > 0) {
         throw new UnsupportedOperationException(
             "ExecutableStage "
                 + transformId
                 + " has user states or timers; these are not yet supported by 
the Kafka Streams runner.");
       }
   
       String inputPCollectionId = stagePayload.getInput();
       String parentProcessor = 
context.getProcessorNameForPCollection(inputPCollectionId);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to