[ 
https://issues.apache.org/jira/browse/BEAM-12704?focusedWorklogId=641708&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-641708
 ]

ASF GitHub Bot logged work on BEAM-12704:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Aug/21 14:13
            Start Date: 25/Aug/21 14:13
    Worklog Time Spent: 10m 
      Work Description: je-ik commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695791725



##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
##########
@@ -340,8 +340,7 @@ public OutputT expand(InputT input) {
 
     boolean isJavaSDKCompatible(RunnerApi.Components components, String 
coderId) {
       RunnerApi.Coder coder = components.getCodersOrThrow(coderId);
-      if 
(!CoderTranslation.JAVA_SERIALIZED_CODER_URN.equals(coder.getSpec().getUrn())
-          && 
!CoderTranslation.KNOWN_CODER_URNS.containsValue(coder.getSpec().getUrn())) {
+      if (!CoderTranslation.isKnownCoderURN(coder.getSpec().getUrn())) {

Review comment:
       This was changed when I was trying to get `beam:coders:java:0.1` URN to 
`CoderTranslation`. It is not needed, reverted.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new 
FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new 
DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new 
DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new 
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, 
value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + 
unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(

Review comment:
       done

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
##########
@@ -150,7 +151,8 @@ public boolean reachedEnd() throws IOException {
 
   @Override
   public void close() throws IOException {
-    metricContainer.registerMetricsForPipelineResult();
+    Optional.ofNullable(metricContainer)
+        .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult);

Review comment:
       fixed

##########
File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
##########
@@ -81,7 +96,8 @@ public static void tearDown() throws InterruptedException {
 
   @Test(timeout = 120_000)
   public void testExecution() throws Exception {
-    PipelineOptions options = 
PipelineOptionsFactory.fromArgs("--experiments=beam_fn_api").create();
+    PipelineOptions options =
+        
PipelineOptionsFactory.fromArgs("--experiments=use_deprecated_read").create();

Review comment:
       I think that is tested in other tests.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new 
FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new 
DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new 
DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new 
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, 
value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + 
unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, 
components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            
RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) 
rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);
+    }
+  }
+
+  private static <InputT, OutputT> WindowedValue<OutputT> intoWireTypes(
+      Coder<WindowedValue<InputT>> inCoder,
+      Coder<WindowedValue<OutputT>> outCoder,
+      WindowedValue<InputT> value) {
+
+    try {
+      return CoderUtils.decodeFromByteArray(outCoder, 
CoderUtils.encodeToByteArray(inCoder, value));
+    } catch (CoderException ex) {
+      throw new IllegalStateException(ex);

Review comment:
       I don't see any added value of the message, the cause will be visible 
from the stack trace.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -86,12 +86,11 @@ public T createInstance() {
   public T copy(T t) {
     if (fasterCopy) {
       return t;
-    } else {
-      try {
-        return CoderUtils.clone(coder, t);
-      } catch (CoderException e) {
-        throw new RuntimeException("Could not clone.", e);
-      }
+    }
+    try {
+      return CoderUtils.clone(coder, t);
+    } catch (Exception e) {
+      throw new RuntimeException("Could not clone value " + t + " using " + 
coder, e);

Review comment:
       part of debugging, reverted.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new 
FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new 
DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new 
DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new 
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, 
value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + 
unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, 
components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            
RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) 
rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);
+    }
+  }
+
+  private static <InputT, OutputT> WindowedValue<OutputT> intoWireTypes(
+      Coder<WindowedValue<InputT>> inCoder,
+      Coder<WindowedValue<OutputT>> outCoder,
+      WindowedValue<InputT> value) {
+
+    try {
+      return CoderUtils.decodeFromByteArray(outCoder, 
CoderUtils.encodeToByteArray(inCoder, value));

Review comment:
       It definitely is suboptimal, but given the performance penalty of 
portability itself I seriously doubt this will be addressed separately. This 
can be optimized when (and if) we get to the stage that we will unify classical 
and portable runners (so that classical runners will be portable runners that 
are able to inline complete pipeline, because the pipeline is written in the 
same SDK as the runner).

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -500,28 +547,41 @@ public void flatMap(T t, Collector<T> collector) {
 
     final DataStream<WindowedValue<T>> source;
     final DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
-    Coder<WindowedValue<T>> windowCoder =
-        instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo =
-        new CoderTypeInformation<>(windowCoder, pipelineOptions);
+    @SuppressWarnings("unchecked")
+    UnboundedSource<T, ?> unboundedSource =
+        (UnboundedSource<T, ?>) 
ReadTranslation.unboundedSourceFromProto(payload);
 
-    WindowingStrategy windowStrategy =
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, ?> windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
-    TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
-        new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(
-                ValueWithRecordId.ValueWithRecordIdCoder.of(
-                    ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()),
-            pipelineOptions);
-
-    UnboundedSource unboundedSource = 
ReadTranslation.unboundedSourceFromProto(payload);
 
     try {
+
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)
+              (Coder) instantiateCoder(outputCollectionId, 
pipeline.getComponents());

Review comment:
       fixed

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -474,20 +477,64 @@ public void flatMap(T t, Collector<T> collector) {
       throw new RuntimeException("Failed to parse ReadPayload from transform", 
e);
     }
 
-    Preconditions.checkState(
-        payload.getIsBounded() != RunnerApi.IsBounded.Enum.BOUNDED,
-        "Bounded reads should run inside an environment instead of being 
translated by the Runner.");
+    final DataStream<WindowedValue<T>> source;
+    if (payload.getIsBounded() == RunnerApi.IsBounded.Enum.BOUNDED) {
+      source =
+          translateBoundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    } else {
+      source =
+          translateUnboundedSource(
+              transform.getUniqueName(),
+              outputCollectionId,
+              payload,
+              pipeline,
+              context.getPipelineOptions(),
+              context.getExecutionEnvironment());
+    }
+    context.addDataStream(outputCollectionId, source);
+  }
 
-    DataStream<WindowedValue<T>> source =
-        translateUnboundedSource(
-            transform.getUniqueName(),
-            outputCollectionId,
-            payload,
-            pipeline,
-            context.getPipelineOptions(),
-            context.getExecutionEnvironment());
+  private <T> DataStream<WindowedValue<T>> translateBoundedSource(
+      String transformName,
+      String outputCollectionId,
+      RunnerApi.ReadPayload payload,
+      RunnerApi.Pipeline pipeline,
+      FlinkPipelineOptions pipelineOptions,
+      StreamExecutionEnvironment env) {
 
-    context.addDataStream(outputCollectionId, source);
+    try {
+      @SuppressWarnings("unchecked")
+      BoundedSource<T> boundedSource =
+          (BoundedSource<T>) ReadTranslation.boundedSourceFromProto(payload);
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)

Review comment:
       fixed

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new 
FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new 
DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new 
DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new 
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, 
value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + 
unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, 
components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            
RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) 
rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);

Review comment:
       same as below, the message will not bring any more information here.

##########
File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourcePortableTest.java
##########
@@ -90,10 +106,16 @@ public void testExecution() throws Exception {
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
     Pipeline p = Pipeline.create(options);
-    PCollection<Long> result = p.apply(GenerateSequence.from(0L).to(10L));
+    PCollection<Long> result =
+        p.apply(Read.from(new Source(10)))
+            // FIXME: the test fails without this

Review comment:
       That is unrelated, there is some bug in TrivialNativeTransformExpander 
or GreedyPipelineFuser, or somewhere in between.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -500,28 +547,41 @@ public void flatMap(T t, Collector<T> collector) {
 
     final DataStream<WindowedValue<T>> source;
     final DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
-    Coder<WindowedValue<T>> windowCoder =
-        instantiateCoder(outputCollectionId, pipeline.getComponents());
 
-    TypeInformation<WindowedValue<T>> outputTypeInfo =
-        new CoderTypeInformation<>(windowCoder, pipelineOptions);
+    @SuppressWarnings("unchecked")
+    UnboundedSource<T, ?> unboundedSource =
+        (UnboundedSource<T, ?>) 
ReadTranslation.unboundedSourceFromProto(payload);
 
-    WindowingStrategy windowStrategy =
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, ?> windowStrategy =
         getWindowingStrategy(outputCollectionId, pipeline.getComponents());
-    TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
-        new CoderTypeInformation<>(
-            WindowedValue.getFullCoder(
-                ValueWithRecordId.ValueWithRecordIdCoder.of(
-                    ((WindowedValueCoder) windowCoder).getValueCoder()),
-                windowStrategy.getWindowFn().windowCoder()),
-            pipelineOptions);
-
-    UnboundedSource unboundedSource = 
ReadTranslation.unboundedSourceFromProto(payload);
 
     try {
+
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> wireCoder =
+          (WindowedValue.FullWindowedValueCoder<T>)
+              (Coder) instantiateCoder(outputCollectionId, 
pipeline.getComponents());
+
+      WindowedValue.FullWindowedValueCoder<T> sdkCoder =
+          getSdkCoder(outputCollectionId, pipeline.getComponents());
+
+      CoderTypeInformation<WindowedValue<T>> outputTypeInfo =
+          new CoderTypeInformation<>(wireCoder, pipelineOptions);
+
+      CoderTypeInformation<WindowedValue<T>> sdkTypeInformation =
+          new CoderTypeInformation<>(sdkCoder, pipelineOptions);
+
+      TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+          new CoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  
ValueWithRecordId.ValueWithRecordIdCoder.of(sdkCoder.getValueCoder()),
+                  windowStrategy.getWindowFn().windowCoder()),
+              pipelineOptions);
+
       int parallelism =

Review comment:
       that is not related to this change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 641708)
    Time Spent: 2h 50m  (was: 2h 40m)

> Fix primitive Read on portable Flink Runner
> -------------------------------------------
>
>                 Key: BEAM-12704
>                 URL: https://issues.apache.org/jira/browse/BEAM-12704
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>    Affects Versions: 2.32.0
>            Reporter: Jan Lukavský
>            Assignee: Jan Lukavský
>            Priority: P2
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> {{ReadSourcePortableTest}} is not testing correctly the expansion into 
> primitive Read. As a result, the primitive Read operation is broken.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to