[ 
https://issues.apache.org/jira/browse/BEAM-12704?focusedWorklogId=641768&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-641768
 ]

ASF GitHub Bot logged work on BEAM-12704:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Aug/21 15:58
            Start Date: 25/Aug/21 15:58
    Worklog Time Spent: 10m 
      Work Description: dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695893901



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new 
FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new 
DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new 
DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new 
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, 
value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + 
unboundedSource, e);
     }
+  }
 
-    return source;
+  private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+      String pCollectionId, RunnerApi.Components components) {
+
+    PipelineNode.PCollectionNode pCollectionNode =
+        PipelineNode.pCollection(pCollectionId, 
components.getPcollectionsOrThrow(pCollectionId));
+    RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+    String coderId =
+        WireCoders.addSdkWireCoder(
+            pCollectionNode,
+            componentsBuilder,
+            
RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(componentsBuilder.build());
+    try {
+      @SuppressWarnings("unchecked")
+      WindowedValue.FullWindowedValueCoder<T> res =
+          (WindowedValue.FullWindowedValueCoder<T>) 
rehydratedComponents.getCoder(coderId);
+      return res;
+    } catch (IOException ex) {
+      throw new IllegalStateException(ex);

Review comment:
       That's not true. This can be any exception that has been thrown deep 
down the stack.
   
   Think about this as a scoping / providing more context, you basically say 
this exception (I don't care which exception) affected "coder retrieval".
   
   Typical users won't search the source code to get more context about where 
the exception has been thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 641768)
    Time Spent: 3h 40m  (was: 3.5h)

> Fix primitive Read on portable Flink Runner
> -------------------------------------------
>
>                 Key: BEAM-12704
>                 URL: https://issues.apache.org/jira/browse/BEAM-12704
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>    Affects Versions: 2.32.0
>            Reporter: Jan Lukavský
>            Assignee: Jan Lukavský
>            Priority: P2
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> {{ReadSourcePortableTest}} is not testing correctly the expansion into 
> primitive Read. As a result, the primitive Read operation is broken.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to