[ 
https://issues.apache.org/jira/browse/BEAM-12704?focusedWorklogId=641776&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-641776
 ]

ASF GitHub Bot logged work on BEAM-12704:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Aug/21 16:05
            Start Date: 25/Aug/21 16:05
    Worklog Time Spent: 10m 
      Work Description: dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695899629



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +596,74 @@ public void flatMap(T t, Collector<T> collector) {
         source =
             nonDedupSource
                 .keyBy(new 
FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
-                .transform("deduping", outputTypeInfo, new 
DedupingOperator<>(pipelineOptions))
-                .uid(format("%s/__deduplicated__", transformName));
+                .transform("deduping", sdkTypeInformation, new 
DedupingOperator<>(pipelineOptions))
+                .uid(format("%s/__deduplicated__", transformName))
+                .returns(sdkTypeInformation);
       } else {
         source =
             nonDedupSource
                 .flatMap(new 
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(sdkTypeInformation);
       }
+
+      return source.map(value -> intoWireTypes(sdkCoder, wireCoder, 
value)).returns(outputTypeInfo);
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + 
unboundedSource, e);
     }
+  }
+
+  /**
+   * Get SDK coder for given PCollection. The SDK coder is the coder that the 
SDK-harness would have
+   * used to encode data before passing to the runner over {@link 
SdkHarnessClient}.

Review comment:
       ```suggestion
      * used to encode data before passing them to the runner over {@link 
SdkHarnessClient}.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 641776)
    Time Spent: 4h 20m  (was: 4h 10m)

> Fix primitive Read on portable Flink Runner
> -------------------------------------------
>
>                 Key: BEAM-12704
>                 URL: https://issues.apache.org/jira/browse/BEAM-12704
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>    Affects Versions: 2.32.0
>            Reporter: Jan Lukavský
>            Assignee: Jan Lukavský
>            Priority: P2
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> {{ReadSourcePortableTest}} is not testing correctly the expansion into 
> primitive Read. As a result, the primitive Read operation is broken.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to