gemini-code-assist[bot] commented on code in PR #37036:
URL: https://github.com/apache/beam/pull/37036#discussion_r2600515691


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java:
##########
@@ -263,6 +271,129 @@ public void translateNode(
     }
   }
 
+  /**
+   * Translator for {@link SplittableParDo.PrimitiveUnboundedRead}.
+   *
+   * <p>This handles the case where Read.Unbounded is converted to 
PrimitiveUnboundedRead by {@link
+   * 
SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}.
+   */
+  private static class PrimitiveUnboundedReadTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+          SplittableParDo.PrimitiveUnboundedRead<T>> {
+
+    @Override
+    public void translateNode(
+        SplittableParDo.PrimitiveUnboundedRead<T> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<T> output = context.getOutput(transform);
+
+      DataStream<WindowedValue<T>> source;
+      DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
+      TypeInformation<WindowedValue<T>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      Coder<T> coder = context.getOutput(transform).getCoder();
+
+      TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+          new CoderTypeInformation<>(
+              WindowedValues.getFullCoder(
+                  ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
+                  output.getWindowingStrategy().getWindowFn().windowCoder()),
+              context.getPipelineOptions());
+
+      // Get source directly from PrimitiveUnboundedRead (not via 
ReadTranslation)
+      UnboundedSource<T, ?> rawSource = transform.getSource();
+
+      String fullName = getCurrentTransformName(context);
+      try {
+        int parallelism =
+            context.getExecutionEnvironment().getMaxParallelism() > 0
+                ? context.getExecutionEnvironment().getMaxParallelism()
+                : context.getExecutionEnvironment().getParallelism();
+
+        FlinkUnboundedSource<T> unboundedSource =
+            FlinkSource.unbounded(
+                transform.getName(),
+                rawSource,
+                new SerializablePipelineOptions(context.getPipelineOptions()),
+                parallelism);
+        nonDedupSource =
+            context
+                .getExecutionEnvironment()
+                .fromSource(
+                    unboundedSource, WatermarkStrategy.noWatermarks(), 
fullName, withIdTypeInfo)
+                .uid(fullName);
+
+        if (rawSource.requiresDeduping()) {
+          source =
+              nonDedupSource
+                  .keyBy(new ValueWithRecordIdKeySelector<>())
+                  .transform(
+                      "deduping",
+                      outputTypeInfo,
+                      new DedupingOperator<>(context.getPipelineOptions()))
+                  .uid(format("%s/__deduplicated__", fullName));
+        } else {
+          source =
+              nonDedupSource
+                  .flatMap(new StripIdsMap<>(context.getPipelineOptions()))
+                  .returns(outputTypeInfo);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error while translating UnboundedSource: " 
+ rawSource, e);
+      }
+
+      context.setOutputDataStream(output, source);
+    }
+  }
+
+  /**
+   * Translator for {@link SplittableParDo.PrimitiveBoundedRead}.
+   *
+   * <p>This handles the case where Read.Bounded is converted to 
PrimitiveBoundedRead by {@link
+   * 
SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}.
+   */
+  private static class PrimitiveBoundedReadTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+          SplittableParDo.PrimitiveBoundedRead<T>> {
+
+    @Override
+    public void translateNode(
+        SplittableParDo.PrimitiveBoundedRead<T> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<T> output = context.getOutput(transform);
+      TypeInformation<WindowedValue<T>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      // Get source directly from PrimitiveBoundedRead (not via 
ReadTranslation)
+      BoundedSource<T> rawSource = transform.getSource();
+
+      String fullName = getCurrentTransformName(context);
+      int parallelism =
+          context.getExecutionEnvironment().getMaxParallelism() > 0
+              ? context.getExecutionEnvironment().getMaxParallelism()
+              : context.getExecutionEnvironment().getParallelism();
+
+      FlinkBoundedSource<T> flinkBoundedSource =
+          FlinkSource.bounded(
+              fullName,
+              rawSource,
+              new SerializablePipelineOptions(context.getPipelineOptions()),
+              parallelism);
+
+      DataStream<WindowedValue<T>> source =
+          context
+              .getExecutionEnvironment()
+              .fromSource(
+                  flinkBoundedSource, WatermarkStrategy.noWatermarks(), 
fullName, outputTypeInfo)
+              .uid(fullName);
+
+      context.setOutputDataStream(output, source);
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This translator is missing some important logic that is present in the 
corresponding `BoundedReadSourceTranslator`. This could lead to incorrect 
behavior or lack of robustness. Specifically:
   
   *   **Missing `try-catch` block:** The logic is not wrapped in a `try-catch` 
block for robust error handling.
   *   **Missing `.returns(outputTypeInfo)`:** The call to `.returns()` is 
missing after `.fromSource()`. This is important for Flink's type system to 
avoid issues with type erasure.
   *   **Missing batch mode logic:** The check for `!context.isStreaming()` and 
setting the `slotSharingGroup` is missing. This is necessary for correct 
behavior in batch mode.
   *   **Inconsistent `stepName`:** It uses `fullName` for the `stepName` in 
`FlinkSource.bounded()`, while other read translators use 
`transform.getName()`. For consistency, it would be better to use 
`transform.getName()`.
   
   Please update this translator to include the missing logic to align with 
`BoundedReadSourceTranslator`. After that, consider refactoring both to reduce 
code duplication. Here is a suggestion that incorporates these fixes:
   
   ```java
       public void translateNode(
           SplittableParDo.PrimitiveBoundedRead<T> transform,
           FlinkStreamingTranslationContext context) {
   
         PCollection<T> output = context.getOutput(transform);
         TypeInformation<WindowedValue<T>> outputTypeInfo =
             context.getTypeInfo(context.getOutput(transform));
   
         // Get source directly from PrimitiveBoundedRead (not via 
ReadTranslation)
         BoundedSource<T> rawSource = transform.getSource();
   
         String fullName = getCurrentTransformName(context);
         int parallelism =
             context.getExecutionEnvironment().getMaxParallelism() > 0
                 ? context.getExecutionEnvironment().getMaxParallelism()
                 : context.getExecutionEnvironment().getParallelism();
   
         FlinkBoundedSource<T> flinkBoundedSource =
             FlinkSource.bounded(
                 transform.getName(), // Using transform.getName() for 
consistency
                 rawSource,
                 new SerializablePipelineOptions(context.getPipelineOptions()),
                 parallelism);
   
         
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator<WindowedValue<T>>
 source;
         try {
           source =
               context
                   .getExecutionEnvironment()
                   .fromSource(
                       flinkBoundedSource, WatermarkStrategy.noWatermarks(), 
fullName, outputTypeInfo)
                   .uid(fullName)
                   .returns(outputTypeInfo);
   
           if (!context.isStreaming()
               && context
                   .getPipelineOptions()
                   .as(FlinkPipelineOptions.class)
                   .getForceSlotSharingGroup()) {
             source = source.slotSharingGroup(FORCED_SLOT_GROUP);
           }
         } catch (Exception e) {
           throw new RuntimeException("Error while translating BoundedSource: " 
+ rawSource, e);
         }
   
         context.setOutputDataStream(output, source);
       }
   ```



##########
CHANGES.md:
##########
@@ -84,6 +84,7 @@
 
 ## Bugfixes
 
+* Fixed Flink classic runner failing with "No translator known for 
PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO 
after SDF-to-primitive-read conversion (Java) 
([#XXXXX](https://github.com/apache/beam/issues/XXXXX)).

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Please replace the placeholder `XXXXX` with the actual issue number, which 
appears to be `37035` based on the pull request description.
   
   ```suggestion
   * Fixed Flink classic runner failing with "No translator known for 
PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO 
after SDF-to-primitive-read conversion (Java) 
([#37035](https://github.com/apache/beam/issues/37035)).
   ```



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java:
##########
@@ -263,6 +271,129 @@ public void translateNode(
     }
   }
 
+  /**
+   * Translator for {@link SplittableParDo.PrimitiveUnboundedRead}.
+   *
+   * <p>This handles the case where Read.Unbounded is converted to 
PrimitiveUnboundedRead by {@link
+   * 
SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}.
+   */
+  private static class PrimitiveUnboundedReadTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+          SplittableParDo.PrimitiveUnboundedRead<T>> {
+
+    @Override
+    public void translateNode(
+        SplittableParDo.PrimitiveUnboundedRead<T> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<T> output = context.getOutput(transform);
+
+      DataStream<WindowedValue<T>> source;
+      DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
+      TypeInformation<WindowedValue<T>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      Coder<T> coder = context.getOutput(transform).getCoder();
+
+      TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+          new CoderTypeInformation<>(
+              WindowedValues.getFullCoder(
+                  ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
+                  output.getWindowingStrategy().getWindowFn().windowCoder()),
+              context.getPipelineOptions());
+
+      // Get source directly from PrimitiveUnboundedRead (not via 
ReadTranslation)
+      UnboundedSource<T, ?> rawSource = transform.getSource();
+
+      String fullName = getCurrentTransformName(context);
+      try {
+        int parallelism =
+            context.getExecutionEnvironment().getMaxParallelism() > 0
+                ? context.getExecutionEnvironment().getMaxParallelism()
+                : context.getExecutionEnvironment().getParallelism();
+
+        FlinkUnboundedSource<T> unboundedSource =
+            FlinkSource.unbounded(
+                transform.getName(),
+                rawSource,
+                new SerializablePipelineOptions(context.getPipelineOptions()),
+                parallelism);
+        nonDedupSource =
+            context
+                .getExecutionEnvironment()
+                .fromSource(
+                    unboundedSource, WatermarkStrategy.noWatermarks(), 
fullName, withIdTypeInfo)
+                .uid(fullName);
+
+        if (rawSource.requiresDeduping()) {
+          source =
+              nonDedupSource
+                  .keyBy(new ValueWithRecordIdKeySelector<>())
+                  .transform(
+                      "deduping",
+                      outputTypeInfo,
+                      new DedupingOperator<>(context.getPipelineOptions()))
+                  .uid(format("%s/__deduplicated__", fullName));
+        } else {
+          source =
+              nonDedupSource
+                  .flatMap(new StripIdsMap<>(context.getPipelineOptions()))
+                  .returns(outputTypeInfo);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error while translating UnboundedSource: " 
+ rawSource, e);
+      }
+
+      context.setOutputDataStream(output, source);
+    }
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `translateNode` method in this translator is nearly identical to 
`UnboundedReadSourceTranslator.translateNode`. The only significant difference 
is how `rawSource` is obtained. To improve maintainability and avoid code 
duplication, consider extracting the common logic into a shared helper method. 
This would make the code cleaner and less prone to inconsistencies if one of 
the translators is updated in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to