[ 
https://issues.apache.org/jira/browse/BEAM-12704?focusedWorklogId=641687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-641687
 ]

ASF GitHub Bot logged work on BEAM-12704:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Aug/21 13:36
            Start Date: 25/Aug/21 13:36
    Worklog Time Spent: 10m 
      Work Description: dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695709072



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -86,12 +86,11 @@ public T createInstance() {
   public T copy(T t) {
     if (fasterCopy) {
       return t;
-    } else {
-      try {
-        return CoderUtils.clone(coder, t);
-      } catch (CoderException e) {
-        throw new RuntimeException("Could not clone.", e);
-      }
+    }
+    try {
+      return CoderUtils.clone(coder, t);
+    } catch (Exception e) {
+      throw new RuntimeException("Could not clone value " + t + " using " + 
coder, e);

Review comment:
       Unrelated change.
   
   Can this result in a massive log message?

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new 
FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new 
DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new 
DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new 
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, 
value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + 
unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(

Review comment:
       It would be good to add javadoc for this and `intoWireTypes`. Something 
along the lines of "this simulates wired protocol of SDK harness (we basically 
need to get its response format)".

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
##########
@@ -340,8 +340,7 @@ public OutputT expand(InputT input) {
 
     boolean isJavaSDKCompatible(RunnerApi.Components components, String 
coderId) {
       RunnerApi.Coder coder = components.getCodersOrThrow(coderId);
-      if 
(!CoderTranslation.JAVA_SERIALIZED_CODER_URN.equals(coder.getSpec().getUrn())
-          && 
!CoderTranslation.KNOWN_CODER_URNS.containsValue(coder.getSpec().getUrn())) {
+      if (!CoderTranslation.isKnownCoderURN(coder.getSpec().getUrn())) {

Review comment:
       I don't think this is correct `isKnownCoderURN` method doesn't say 
anything about java compatibility 
   
   I'd prefer removing this change

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -500,28 +547,41 @@ public void flatMap(T t, Collector<T> collector) {
 
     final DataStream<WindowedValue<T>> source;
     final DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
-    Coder<WindowedValue<T>> windowCoder =
-        instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo =
-        new CoderTypeInformation<>(windowCoder, pipelineOptions);
+    @SuppressWarnings("unchecked")
+    UnboundedSource<T, ?> unboundedSource =
+        (UnboundedSource<T, ?>) 
ReadTranslation.unboundedSourceFromProto(payload);
 
-    WindowingStrategy windowStrategy =
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, ?> windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
-    TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
-        new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(
-                ValueWithRecordId.ValueWithRecordIdCoder.of(
-                    ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()),
-            pipelineOptions);
-
-    UnboundedSource unboundedSource = 
ReadTranslation.unboundedSourceFromProto(payload);
 
     try {
+
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)
+              (Coder) instantiateCoder(outputCollectionId, 
pipeline.getComponents());
+
+      WindowedValue.FullWindowedValueCoder<T> sdkCoder =
+          getSdkCoder(outputCollectionId, pipeline.getComponents());
+
+      CoderTypeInformation<WindowedValue<T>> outputTypeInfo =
+          new CoderTypeInformation<>(wireCoder, pipelineOptions);
+
+      CoderTypeInformation<WindowedValue<T>> sdkTypeInformation =
+          new CoderTypeInformation<>(sdkCoder, pipelineOptions);
+
+      TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+          new CoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  
ValueWithRecordId.ValueWithRecordIdCoder.of(sdkCoder.getValueCoder()),
+                  windowStrategy.getWindowFn().windowCoder()),
+              pipelineOptions);
+
       int parallelism =

Review comment:
       parallelism calculation seems incorrect

##########
File path: website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
##########
@@ -77,11 +77,6 @@
   <td>Remove unneeded deep copy between operators. See 
https://issues.apache.org/jira/browse/BEAM-11146</td>
   <td>Default: <code>false</code></td>
 </tr>
-<tr>
-  <td><code>filesToStage</code></td>

Review comment:
       Why is this removed?

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
##########
@@ -150,7 +151,8 @@ public boolean reachedEnd() throws IOException {
 
   @Override
   public void close() throws IOException {
-    metricContainer.registerMetricsForPipelineResult();
+    Optional.ofNullable(metricContainer)
+        .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult);
     // TODO null check can be removed once FLINK-3796 is fixed

Review comment:
       FLINK-3796 is already fixed, it should be no longer necessary to check 
whether `metricContainer` and `reader` are null.
   
   

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -500,28 +547,41 @@ public void flatMap(T t, Collector<T> collector) {
 
     final DataStream<WindowedValue<T>> source;
     final DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
-    Coder<WindowedValue<T>> windowCoder =
-        instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo =
-        new CoderTypeInformation<>(windowCoder, pipelineOptions);
+    @SuppressWarnings("unchecked")
+    UnboundedSource<T, ?> unboundedSource =
+        (UnboundedSource<T, ?>) 
ReadTranslation.unboundedSourceFromProto(payload);
 
-    WindowingStrategy windowStrategy =
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, ?> windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
-    TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
-        new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(
-                ValueWithRecordId.ValueWithRecordIdCoder.of(
-                    ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()),
-            pipelineOptions);
-
-    UnboundedSource unboundedSource = 
ReadTranslation.unboundedSourceFromProto(payload);
 
     try {
+
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)
+              (Coder) instantiateCoder(outputCollectionId, 
pipeline.getComponents());
+
+      WindowedValue.FullWindowedValueCoder<T> sdkCoder =
+          getSdkCoder(outputCollectionId, pipeline.getComponents());
+
+      CoderTypeInformation<WindowedValue<T>> outputTypeInfo =
+          new CoderTypeInformation<>(wireCoder, pipelineOptions);
+
+      CoderTypeInformation<WindowedValue<T>> sdkTypeInformation =
+          new CoderTypeInformation<>(sdkCoder, pipelineOptions);
+
+      TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+          new CoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  
ValueWithRecordId.ValueWithRecordIdCoder.of(sdkCoder.getValueCoder()),
+                  windowStrategy.getWindowFn().windowCoder()),
+              pipelineOptions);
+
       int parallelism =

Review comment:
       (the variable name seems wrong)

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
##########
@@ -150,7 +151,8 @@ public boolean reachedEnd() throws IOException {
 
   @Override
   public void close() throws IOException {
-    metricContainer.registerMetricsForPipelineResult();
+    Optional.ofNullable(metricContainer)
+        .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult);

Review comment:
       I know that you personally find `Optional.ofNullable` more readable, but 
this shouldn't come at cost of inconsistent code style. (the reader check right 
bellow uses a classic if-statement nullness check, please stick to that)

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -500,28 +547,41 @@ public void flatMap(T t, Collector<T> collector) {
 
     final DataStream<WindowedValue<T>> source;
     final DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
-    Coder<WindowedValue<T>> windowCoder =
-        instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo =
-        new CoderTypeInformation<>(windowCoder, pipelineOptions);
+    @SuppressWarnings("unchecked")
+    UnboundedSource<T, ?> unboundedSource =
+        (UnboundedSource<T, ?>) 
ReadTranslation.unboundedSourceFromProto(payload);
 
-    WindowingStrategy windowStrategy =
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, ?> windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
-    TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
-        new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(
-                ValueWithRecordId.ValueWithRecordIdCoder.of(
-                    ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()),
-            pipelineOptions);
-
-    UnboundedSource unboundedSource = 
ReadTranslation.unboundedSourceFromProto(payload);
 
     try {
+
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)
+              (Coder) instantiateCoder(outputCollectionId, 
pipeline.getComponents());

Review comment:
       nit: double cast

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -474,20 +477,64 @@ public void flatMap(T t, Collector<T> collector) {
       throw new RuntimeException("Failed to parse ReadPayload from transform", 
e);
     }
 
-    Preconditions.checkState(
-        payload.getIsBounded() != RunnerApi.IsBounded.Enum.BOUNDED,
-        "Bounded reads should run inside an environment instead of being 
translated by the Runner.");
+    final DataStream<WindowedValue<T>> source;
+    if (payload.getIsBounded() == RunnerApi.IsBounded.Enum.BOUNDED) {
+      source =
+          translateBoundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    } else {
+      source =
+          translateUnboundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    }
+    context.addDataStream(outputCollectionId, source);
+  }
 
-    DataStream<WindowedValue<T>> source =
-        translateUnboundedSource(
-            transform.getUniqueName(),
-            outputCollectionId,
-            payload,
-            pipeline,
-            context.getPipelineOptions(),
-            context.getExecutionEnvironment());
+  private <T> DataStream<WindowedValue<T>> translateBoundedSource(
+      String transformName,
+      String outputCollectionId,
+      RunnerApi.ReadPayload payload,
+      RunnerApi.Pipeline pipeline,
+      FlinkPipelineOptions pipelineOptions,
+      StreamExecutionEnvironment env) {
 
-    context.addDataStream(outputCollectionId, source);
+    try {
+      @SuppressWarnings("unchecked")
+      BoundedSource<T> boundedSource =
+          (BoundedSource<T>) ReadTranslation.boundedSourceFromProto(payload);
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)

Review comment:
       nit: double cast

##########
File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
##########
@@ -81,7 +96,8 @@ public static void tearDown() throws InterruptedException {
 
   @Test(timeout = 120_000)
   public void testExecution() throws Exception {
-    PipelineOptions options = 
PipelineOptionsFactory.fromArgs("--experiments=beam_fn_api").create();
+    PipelineOptions options =
+        
PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create();

Review comment:
       should we also test a scenario without the flag?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 641687)
    Time Spent: 2h 10m  (was: 2h)

> Fix primitive Read on portable Flink Runner
> -------------------------------------------
>
>                 Key: BEAM-12704
>                 URL: https://issues.apache.org/jira/browse/BEAM-12704
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>    Affects Versions: 2.32.0
>            Reporter: Jan Lukavský
>            Assignee: Jan Lukavský
>            Priority: P2
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> {{ReadSourcePortableTest}} is not testing correctly the expansion into 
> primitive Read. As a result, the primitive Read operation is broken.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to