gemini-code-assist[bot] commented on code in PR #38764:
URL: https://github.com/apache/beam/pull/38764#discussion_r3336630501


##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.ArrayDeque;

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Since the queue is accessed concurrently by SDK harness threads (which 
enqueue outputs) and the Kafka Streams processing thread (which flushes them), 
we should use a thread-safe queue like `ConcurrentLinkedQueue` instead of 
`ArrayDeque` to avoid race conditions and data corruption.
   
   ```suggestion
   import java.util.concurrent.ConcurrentLinkedQueue;
   ```



##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import 
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext;
+import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
+import 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+
+/**
+ * Provides one {@link ExecutableStageContext.Factory} per job for the Kafka 
Streams runner.
+ *
+ * <p>Mirrors {@code FlinkExecutableStageContextFactory}: a singleton that 
hands out reference-
+ * counted {@link DefaultExecutableStageContext}s keyed by job id, so the SDK 
harness environment
+ * for a job is created once and shared across the {@link 
ImpulseProcessor}/executable-stage
+ * processors that run within the same JVM instance.
+ */
+public class KafkaStreamsExecutableStageContextFactory implements 
ExecutableStageContext.Factory {
+
+  private static final KafkaStreamsExecutableStageContextFactory INSTANCE =
+      new KafkaStreamsExecutableStageContextFactory();
+
+  private final ConcurrentMap<String, ExecutableStageContext.Factory> 
jobFactories =
+      new ConcurrentHashMap<>();
+
+  private KafkaStreamsExecutableStageContextFactory() {}
+
+  public static KafkaStreamsExecutableStageContextFactory getInstance() {
+    return INSTANCE;
+  }
+
+  @Override
+  public ExecutableStageContext get(JobInfo jobInfo) {
+    ExecutableStageContext.Factory jobFactory =
+        jobFactories.computeIfAbsent(
+            jobInfo.jobId(),
+            k ->
+                ReferenceCountingExecutableStageContextFactory.create(
+                    DefaultExecutableStageContext::create,
+                    // Release the context synchronously once its reference 
count drops to zero;
+                    // the runner does not keep contexts alive across stages 
beyond their use.
+                    (caller) -> true));

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `jobFactories` map is a static `ConcurrentHashMap` that stores 
`ExecutableStageContext.Factory` instances per job. If we do not remove the 
factory from the map when the reference count drops to zero, it will cause a 
memory leak of the factory and its associated resources for every job run in 
the JVM. We should remove the job ID from `jobFactories` in the releaser 
callback.
   
   ```suggestion
                       (caller) -> {
                         jobFactories.remove(jobInfo.jobId());
                         return true;
                       }));
   ```



##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.values.WindowedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kafka Streams {@link Processor} that executes a fused {@link 
ExecutableStage} (stateless user
+ * code such as ParDo) in the Beam SDK harness over the Fn API.
+ *
+ * <p>For each {@link KStreamsPayload#isData() data} payload it unwraps the 
{@link WindowedValue}
+ * and feeds it to the harness through the stage's main input {@link 
FnDataReceiver}. Harness
+ * outputs are collected on the harness threads into {@link #pendingOutputs} 
and then flushed
+ * downstream on the Kafka Streams processing thread when the bundle closes — 
Kafka Streams' {@link
+ * ProcessorContext#forward} must only be called from the processing thread, 
so outputs are never
+ * forwarded directly from a harness callback.
+ *
+ * <p>A {@link KStreamsPayload#isWatermark() watermark} payload marks a bundle 
boundary: the open
+ * bundle (if any) is closed (flushing outputs), and the watermark is then 
forwarded downstream so
+ * that subsequent stages observe it after all data of the bundle.
+ *
+ * <p>This is the Kafka Streams analogue of Flink's {@code 
ExecutableStageDoFnOperator} and Spark's
+ * {@code SparkExecutableStageFunction}. State, timers, and side inputs are 
out of scope for this
+ * first version: the stage is executed with {@link 
StateRequestHandler#unsupported()} and no timer
+ * receivers.
+ */
+class ExecutableStageProcessor
+    implements Processor<byte[], KStreamsPayload<byte[]>, byte[], 
KStreamsPayload<byte[]>> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExecutableStageProcessor.class);
+
+  private final RunnerApi.ExecutableStagePayload stagePayload;
+  private final JobInfo jobInfo;
+
+  private final Queue<WindowedValue<byte[]>> pendingOutputs = new 
ArrayDeque<>();

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Instantiate `pendingOutputs` as a `ConcurrentLinkedQueue` to ensure 
thread-safe operations when elements are added from harness threads and polled 
from the processing thread.
   
   ```suggestion
     private final Queue<WindowedValue<byte[]>> pendingOutputs = new 
ConcurrentLinkedQueue<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to