[
https://issues.apache.org/jira/browse/BEAM-12704?focusedWorklogId=641776&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-641776
]
ASF GitHub Bot logged work on BEAM-12704:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 25/Aug/21 16:05
Start Date: 25/Aug/21 16:05
Worklog Time Spent: 10m
Work Description: dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695899629
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +596,74 @@ public void flatMap(T t, Collector<T> collector) {
source =
nonDedupSource
.keyBy(new
FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
- .transform("deduping", outputTypeInfo, new
DedupingOperator<>(pipelineOptions))
- .uid(format("%s/__deduplicated__", transformName));
+ .transform("deduping", sdkTypeInformation, new
DedupingOperator<>(pipelineOptions))
+ .uid(format("%s/__deduplicated__", transformName))
+ .returns(sdkTypeInformation);
} else {
source =
nonDedupSource
.flatMap(new
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
- .returns(outputTypeInfo);
+ .returns(sdkTypeInformation);
}
+
+ return source.map(value -> intoWireTypes(sdkCoder, wireCoder,
value)).returns(outputTypeInfo);
} catch (Exception e) {
throw new RuntimeException("Error while translating UnboundedSource: " +
unboundedSource, e);
}
+ }
+
+ /**
+ * Get SDK coder for given PCollection. The SDK coder is the coder that the
SDK-harness would have
+ * used to encode data before passing to the runner over {@link
SdkHarnessClient}.
Review comment:
```suggestion
* used to encode data before passing them to the runner over {@link
SdkHarnessClient}.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 641776)
Time Spent: 4h 20m (was: 4h 10m)
> Fix primitive Read on portable Flink Runner
> -------------------------------------------
>
> Key: BEAM-12704
> URL: https://issues.apache.org/jira/browse/BEAM-12704
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Affects Versions: 2.32.0
> Reporter: Jan Lukavský
> Assignee: Jan Lukavský
> Priority: P2
> Time Spent: 4h 20m
> Remaining Estimate: 0h
>
> {{ReadSourcePortableTest}} is not testing correctly the expansion into
> primitive Read. As a result, the primitive Read operation is broken.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)