gemini-code-assist[bot] commented on code in PR #38843:
URL: https://github.com/apache/beam/pull/38843#discussion_r3372267621


##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/RedistributeTranslator.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+
+/**
+ * Runner-native translator for {@code 
beam:transform:redistribute_arbitrarily:v1}.
+ *
+ * <p>The default Beam expansion of {@code Redistribute.arbitrarily()} goes 
through {@code
+ * GroupByKey} for materialization. The Kafka Streams runner does not need to 
do any actual
+ * redistribution in the single-instance topology — Kafka Streams is already 
handling per-task
+ * processing — so we provide a passthrough: the output PCollection is mapped 
to the same processor
+ * that already produces the input PCollection. No topology node is added.
+ *
+ * <p>The translator is registered in {@link 
KafkaStreamsPipelineTranslator#knownUrns()} so {@link
+ * org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander} 
strips the
+ * sub-transforms of {@code Redistribute} before the fuser runs. That keeps 
the Redistribute
+ * boundary intact and lets the fuser split adjacent stages correctly.
+ *
+ * <p>The keyed variant ({@code redistribute_by_key:v1}) is intentionally not 
handled here: it
+ * implies a rehash/partition step that the runner can implement on top of 
GroupByKey when that
+ * lands, but cannot reasonably no-op the way the arbitrary variant can.
+ */
+class RedistributeTranslator implements PTransformTranslator {
+
+  @Override
+  public void translate(
+      String transformId, RunnerApi.Pipeline pipeline, 
KafkaStreamsTranslationContext context) {
+    RunnerApi.PTransform transform = 
pipeline.getComponents().getTransformsOrThrow(transformId);
+    String inputPCollectionId = 
Iterables.getOnlyElement(transform.getInputsMap().values());
+    String parentProcessor = 
context.getProcessorNameForPCollection(inputPCollectionId);
+    String outputPCollectionId = 
Iterables.getOnlyElement(transform.getOutputsMap().values());

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Add a defensive null check for `parentProcessor`. If the input PCollection 
does not have a registered producer processor (e.g., due to an unexpected 
pipeline structure or translation order issue), this will fail fast with a 
clear error message instead of causing a downstream `NullPointerException`.
   
   ```java
       String parentProcessor = 
context.getProcessorNameForPCollection(inputPCollectionId);
       if (parentProcessor == null) {
         throw new IllegalStateException(
             String.format(
                 "No producer processor found for input PCollection %s of 
transform %s",
                 inputPCollectionId, transformId));
       }
       String outputPCollectionId = 
Iterables.getOnlyElement(transform.getOutputsMap().values());
   ```



##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java:
##########
@@ -157,26 +161,25 @@ private void closeBundleAndFlush(Record<byte[], 
KStreamsPayload<byte[]>> record)
     } finally {
       currentBundle = null;
     }
-    ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx = 
checkInitialized(context);
+    ProcessorContext<byte[], KStreamsPayload<?>> ctx = 
checkInitialized(context);
     // The harness has finished the bundle (close() returned) so no further 
enqueues happen.
     // ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to 
drain via forEach.
     pendingOutputs.forEach(
         output ->
             ctx.forward(
-                new Record<byte[], KStreamsPayload<byte[]>>(
+                new Record<byte[], KStreamsPayload<?>>(
                     record.key(), KStreamsPayload.data(output), 
record.timestamp())));
     pendingOutputs.clear();

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Draining the queue using `poll()` in a loop is more idiomatic and robust 
than using `forEach` followed by `clear()`. It avoids relying on the 
weakly-consistent iterator of `ConcurrentLinkedQueue` and ensures elements are 
safely removed as they are processed, preventing any potential race conditions 
or memory retention issues if the queue is accessed concurrently.
   
   ```suggestion
       // The harness has finished the bundle (close() returned) so no further 
enqueues happen.
       WindowedValue<?> output;
       while ((output = pendingOutputs.poll()) != null) {
         ctx.forward(
             new Record<>(
                 record.key(), KStreamsPayload.data(output), 
record.timestamp()));
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to