[
https://issues.apache.org/jira/browse/BEAM-12704?focusedWorklogId=641770&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-641770
]
ASF GitHub Bot logged work on BEAM-12704:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 25/Aug/21 16:00
Start Date: 25/Aug/21 16:00
Worklog Time Spent: 10m
Work Description: dmvk commented on a change in pull request #15370:
URL: https://github.com/apache/beam/pull/15370#discussion_r695895434
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -534,19 +594,55 @@ public void flatMap(T t, Collector<T> collector) {
source =
nonDedupSource
.keyBy(new
FlinkStreamingTransformTranslators.ValueWithRecordIdKeySelector<>())
- .transform("deduping", outputTypeInfo, new
DedupingOperator<>(pipelineOptions))
- .uid(format("%s/__deduplicated__", transformName));
+ .transform("deduping", sdkTypeInformation, new
DedupingOperator<>(pipelineOptions))
+ .uid(format("%s/__deduplicated__", transformName))
+ .returns(sdkTypeInformation);
} else {
source =
nonDedupSource
.flatMap(new
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
- .returns(outputTypeInfo);
+ .returns(sdkTypeInformation);
}
+
+ return source.map(value -> intoWireTypes(sdkCoder, wireCoder,
value)).returns(outputTypeInfo);
} catch (Exception e) {
throw new RuntimeException("Error while translating UnboundedSource: " +
unboundedSource, e);
}
+ }
- return source;
+ private static <T> WindowedValue.FullWindowedValueCoder<T> getSdkCoder(
+ String pCollectionId, RunnerApi.Components components) {
+
+ PipelineNode.PCollectionNode pCollectionNode =
+ PipelineNode.pCollection(pCollectionId,
components.getPcollectionsOrThrow(pCollectionId));
+ RunnerApi.Components.Builder componentsBuilder = components.toBuilder();
+ String coderId =
+ WireCoders.addSdkWireCoder(
+ pCollectionNode,
+ componentsBuilder,
+
RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
+ RehydratedComponents rehydratedComponents =
+ RehydratedComponents.forComponents(componentsBuilder.build());
+ try {
+ @SuppressWarnings("unchecked")
+ WindowedValue.FullWindowedValueCoder<T> res =
+ (WindowedValue.FullWindowedValueCoder<T>)
rehydratedComponents.getCoder(coderId);
+ return res;
+ } catch (IOException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ private static <InputT, OutputT> WindowedValue<OutputT> intoWireTypes(
+ Coder<WindowedValue<InputT>> inCoder,
+ Coder<WindowedValue<OutputT>> outCoder,
+ WindowedValue<InputT> value) {
+
+ try {
+ return CoderUtils.decodeFromByteArray(outCoder,
CoderUtils.encodeToByteArray(inCoder, value));
+ } catch (CoderException ex) {
+ throw new IllegalStateException(ex);
Review comment:
Same as above. It makes a huge difference if you say "I wasn't able to
translate element from format X to format Y" then for example "buffer overflow".
Which one would you find more useful?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 641770)
Time Spent: 3h 50m (was: 3h 40m)
> Fix primitive Read on portable Flink Runner
> -------------------------------------------
>
> Key: BEAM-12704
> URL: https://issues.apache.org/jira/browse/BEAM-12704
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Affects Versions: 2.32.0
> Reporter: Jan Lukavský
> Assignee: Jan Lukavský
> Priority: P2
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> {{ReadSourcePortableTest}} is not testing correctly the expansion into
> primitive Read. As a result, the primitive Read operation is broken.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)