gemini-code-assist[bot] commented on code in PR #37036:
URL: https://github.com/apache/beam/pull/37036#discussion_r2600515691
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java:
##########
@@ -263,6 +271,129 @@ public void translateNode(
}
}
+ /**
+ * Translator for {@link SplittableParDo.PrimitiveUnboundedRead}.
+ *
+ * <p>This handles the case where Read.Unbounded is converted to
PrimitiveUnboundedRead by {@link
+ *
SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}.
+ */
+ private static class PrimitiveUnboundedReadTranslator<T>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ SplittableParDo.PrimitiveUnboundedRead<T>> {
+
+ @Override
+ public void translateNode(
+ SplittableParDo.PrimitiveUnboundedRead<T> transform,
+ FlinkStreamingTranslationContext context) {
+
+ PCollection<T> output = context.getOutput(transform);
+
+ DataStream<WindowedValue<T>> source;
+ DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
+ TypeInformation<WindowedValue<T>> outputTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ Coder<T> coder = context.getOutput(transform).getCoder();
+
+ TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+ new CoderTypeInformation<>(
+ WindowedValues.getFullCoder(
+ ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
+ output.getWindowingStrategy().getWindowFn().windowCoder()),
+ context.getPipelineOptions());
+
+ // Get source directly from PrimitiveUnboundedRead (not via
ReadTranslation)
+ UnboundedSource<T, ?> rawSource = transform.getSource();
+
+ String fullName = getCurrentTransformName(context);
+ try {
+ int parallelism =
+ context.getExecutionEnvironment().getMaxParallelism() > 0
+ ? context.getExecutionEnvironment().getMaxParallelism()
+ : context.getExecutionEnvironment().getParallelism();
+
+ FlinkUnboundedSource<T> unboundedSource =
+ FlinkSource.unbounded(
+ transform.getName(),
+ rawSource,
+ new SerializablePipelineOptions(context.getPipelineOptions()),
+ parallelism);
+ nonDedupSource =
+ context
+ .getExecutionEnvironment()
+ .fromSource(
+ unboundedSource, WatermarkStrategy.noWatermarks(),
fullName, withIdTypeInfo)
+ .uid(fullName);
+
+ if (rawSource.requiresDeduping()) {
+ source =
+ nonDedupSource
+ .keyBy(new ValueWithRecordIdKeySelector<>())
+ .transform(
+ "deduping",
+ outputTypeInfo,
+ new DedupingOperator<>(context.getPipelineOptions()))
+ .uid(format("%s/__deduplicated__", fullName));
+ } else {
+ source =
+ nonDedupSource
+ .flatMap(new StripIdsMap<>(context.getPipelineOptions()))
+ .returns(outputTypeInfo);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error while translating UnboundedSource: "
+ rawSource, e);
+ }
+
+ context.setOutputDataStream(output, source);
+ }
+ }
+
+ /**
+ * Translator for {@link SplittableParDo.PrimitiveBoundedRead}.
+ *
+ * <p>This handles the case where Read.Bounded is converted to
PrimitiveBoundedRead by {@link
+ *
SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}.
+ */
+ private static class PrimitiveBoundedReadTranslator<T>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ SplittableParDo.PrimitiveBoundedRead<T>> {
+
+ @Override
+ public void translateNode(
+ SplittableParDo.PrimitiveBoundedRead<T> transform,
+ FlinkStreamingTranslationContext context) {
+
+ PCollection<T> output = context.getOutput(transform);
+ TypeInformation<WindowedValue<T>> outputTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ // Get source directly from PrimitiveBoundedRead (not via
ReadTranslation)
+ BoundedSource<T> rawSource = transform.getSource();
+
+ String fullName = getCurrentTransformName(context);
+ int parallelism =
+ context.getExecutionEnvironment().getMaxParallelism() > 0
+ ? context.getExecutionEnvironment().getMaxParallelism()
+ : context.getExecutionEnvironment().getParallelism();
+
+ FlinkBoundedSource<T> flinkBoundedSource =
+ FlinkSource.bounded(
+ fullName,
+ rawSource,
+ new SerializablePipelineOptions(context.getPipelineOptions()),
+ parallelism);
+
+ DataStream<WindowedValue<T>> source =
+ context
+ .getExecutionEnvironment()
+ .fromSource(
+ flinkBoundedSource, WatermarkStrategy.noWatermarks(),
fullName, outputTypeInfo)
+ .uid(fullName);
+
+ context.setOutputDataStream(output, source);
+ }
Review Comment:

This translator is missing some important logic that is present in the
corresponding `BoundedReadSourceTranslator`. This could lead to incorrect
behavior or lack of robustness. Specifically:
* **Missing `try-catch` block:** The logic is not wrapped in a `try-catch`
block for robust error handling.
* **Missing `.returns(outputTypeInfo)`:** The call to `.returns()` is
missing after `.fromSource()`. This is important for Flink's type system to
avoid issues with type erasure.
* **Missing batch mode logic:** The check for `!context.isStreaming()` and
setting the `slotSharingGroup` is missing. This is necessary for correct
behavior in batch mode.
* **Inconsistent `stepName`:** It uses `fullName` for the `stepName` in
`FlinkSource.bounded()`, while other read translators use
`transform.getName()`. For consistency, it would be better to use
`transform.getName()`.
Please update this translator to include the missing logic to align with
`BoundedReadSourceTranslator`. After that, consider refactoring both to reduce
code duplication. Here is a suggestion that incorporates these fixes:
```java
public void translateNode(
SplittableParDo.PrimitiveBoundedRead<T> transform,
FlinkStreamingTranslationContext context) {
PCollection<T> output = context.getOutput(transform);
TypeInformation<WindowedValue<T>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
// Get source directly from PrimitiveBoundedRead (not via
ReadTranslation)
BoundedSource<T> rawSource = transform.getSource();
String fullName = getCurrentTransformName(context);
int parallelism =
context.getExecutionEnvironment().getMaxParallelism() > 0
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();
FlinkBoundedSource<T> flinkBoundedSource =
FlinkSource.bounded(
transform.getName(), // Using transform.getName() for
consistency
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator<WindowedValue<T>>
source;
try {
source =
context
.getExecutionEnvironment()
.fromSource(
flinkBoundedSource, WatermarkStrategy.noWatermarks(),
fullName, outputTypeInfo)
.uid(fullName)
.returns(outputTypeInfo);
if (!context.isStreaming()
&& context
.getPipelineOptions()
.as(FlinkPipelineOptions.class)
.getForceSlotSharingGroup()) {
source = source.slotSharingGroup(FORCED_SLOT_GROUP);
}
} catch (Exception e) {
throw new RuntimeException("Error while translating BoundedSource: "
+ rawSource, e);
}
context.setOutputDataStream(output, source);
}
```
##########
CHANGES.md:
##########
@@ -84,6 +84,7 @@
## Bugfixes
+* Fixed Flink classic runner failing with "No translator known for
PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO
after SDF-to-primitive-read conversion (Java)
([#XXXXX](https://github.com/apache/beam/issues/XXXXX)).
Review Comment:

Please replace the placeholder `XXXXX` with the actual issue number, which
appears to be `37035` based on the pull request description.
```suggestion
* Fixed Flink classic runner failing with "No translator known for
PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO
after SDF-to-primitive-read conversion (Java)
([#37035](https://github.com/apache/beam/issues/37035)).
```
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java:
##########
@@ -263,6 +271,129 @@ public void translateNode(
}
}
+ /**
+ * Translator for {@link SplittableParDo.PrimitiveUnboundedRead}.
+ *
+ * <p>This handles the case where Read.Unbounded is converted to
PrimitiveUnboundedRead by {@link
+ *
SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}.
+ */
+ private static class PrimitiveUnboundedReadTranslator<T>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ SplittableParDo.PrimitiveUnboundedRead<T>> {
+
+ @Override
+ public void translateNode(
+ SplittableParDo.PrimitiveUnboundedRead<T> transform,
+ FlinkStreamingTranslationContext context) {
+
+ PCollection<T> output = context.getOutput(transform);
+
+ DataStream<WindowedValue<T>> source;
+ DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
+ TypeInformation<WindowedValue<T>> outputTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ Coder<T> coder = context.getOutput(transform).getCoder();
+
+ TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+ new CoderTypeInformation<>(
+ WindowedValues.getFullCoder(
+ ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
+ output.getWindowingStrategy().getWindowFn().windowCoder()),
+ context.getPipelineOptions());
+
+ // Get source directly from PrimitiveUnboundedRead (not via
ReadTranslation)
+ UnboundedSource<T, ?> rawSource = transform.getSource();
+
+ String fullName = getCurrentTransformName(context);
+ try {
+ int parallelism =
+ context.getExecutionEnvironment().getMaxParallelism() > 0
+ ? context.getExecutionEnvironment().getMaxParallelism()
+ : context.getExecutionEnvironment().getParallelism();
+
+ FlinkUnboundedSource<T> unboundedSource =
+ FlinkSource.unbounded(
+ transform.getName(),
+ rawSource,
+ new SerializablePipelineOptions(context.getPipelineOptions()),
+ parallelism);
+ nonDedupSource =
+ context
+ .getExecutionEnvironment()
+ .fromSource(
+ unboundedSource, WatermarkStrategy.noWatermarks(),
fullName, withIdTypeInfo)
+ .uid(fullName);
+
+ if (rawSource.requiresDeduping()) {
+ source =
+ nonDedupSource
+ .keyBy(new ValueWithRecordIdKeySelector<>())
+ .transform(
+ "deduping",
+ outputTypeInfo,
+ new DedupingOperator<>(context.getPipelineOptions()))
+ .uid(format("%s/__deduplicated__", fullName));
+ } else {
+ source =
+ nonDedupSource
+ .flatMap(new StripIdsMap<>(context.getPipelineOptions()))
+ .returns(outputTypeInfo);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error while translating UnboundedSource: "
+ rawSource, e);
+ }
+
+ context.setOutputDataStream(output, source);
+ }
+ }
Review Comment:

The `translateNode` method in this translator is nearly identical to
`UnboundedReadSourceTranslator.translateNode`. The only significant difference
is how `rawSource` is obtained. To improve maintainability and avoid code
duplication, consider extracting the common logic into a shared helper method.
This would make the code cleaner and less prone to inconsistencies if one of
the translators is updated in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]