[
https://issues.apache.org/jira/browse/BEAM-10016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117378#comment-17117378
]
Luke Cwik edited comment on BEAM-10016 at 5/27/20, 4:19 AM:
------------------------------------------------------------
The issue is that the GreedyPipelineFuser (and related classes) doesn't take
into account the change in the encoding from the flattens input to the flattens
output in certain scenarios where the flatten isn't being merged with an
existing stage.
Normally one could copy the coder from the flatten's output PCollection to all
the input PCollections to fix this but this doesn't hold when dealing with
cross language pipelines because we could have
{code:java}
ParDo(Java) -> PC(big endian int coder) -> Flatten(Python) ->
PC(varint coder)
ParDo(Go) -> PCollection(little endian int coder) /{code}
The Python SDK in this case would know big endian int coder, little endian int
coder and varint coder but Java/Go would only know the big endian int coder and
little endian int coder respectively.
The solution in the above example is to make the Python SDK perform the
transcoding by having it execute the flatten. Only flattens where the
input/output coder matches can be done by a runner since no transcoding is
necessary.
An alternative would be to require flattens have the same input and output
coders but this has its own problems since it would be a backwards incompatible
change or to insert identity ParDo's within SDKs to make sure that input/output
coders match whenever there is a Flatten.
was (Author: lcwik):
The issue is that the GreedyPipelineFuser (and related classes) doesn't take
into account the change in the encoding from the flattens input to the flattens
output in certain scenarios where the flatten isn't being merged with an
existing stage.
Normally one could copy the coder from the flatten's output PCollection to all
the input PCollections to fix this but this doesn't hold when dealing with
cross language pipelines because we could have
{code:java}
ParDo(Java) -> PC(big endian int coder) -> Flatten(Python) ->
PC(varint coder)
ParDo(Go) -> PCollection(little endian int coder) /{code}
The Python SDK in this case would know big endian int coder, little endian int
coder and varint coder but Java/Go would only know the big endian int coder and
little endian int coder respectively.
The solution in the above example is to make the Python SDK perform the
transcoding by having it execute the flatten. Only flattens where the
input/output coder matches can be done by a runner since no transcoding is
necessary.
> Flink postcommits failing testFlattenWithDifferentInputAndOutputCoders2
> -----------------------------------------------------------------------
>
> Key: BEAM-10016
> URL: https://issues.apache.org/jira/browse/BEAM-10016
> Project: Beam
> Issue Type: Bug
> Components: test-failures
> Reporter: Kyle Weaver
> Assignee: Maximilian Michels
> Priority: P2
>
> Both beam_PostCommit_Java_PVR_Flink_Batch and
> beam_PostCommit_Java_PVR_Flink_Streaming are failing newly added test
> org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2.
> SEVERE: Error in task code: CHAIN MapPartition (MapPartition at [6]{Values,
> FlatMapElements, PAssert$0}) -> FlatMap (FlatMap at ExtractOutput[0]) -> Map
> (Key Extractor) -> GroupCombine (GroupCombine at GroupCombine:
> PAssert$0/GroupGlobally/GatherAllOutputs/GroupByKey) -> Map (Key Extractor)
> (2/2) java.lang.ClassCastException: org.apache.beam.sdk.values.KV cannot be
> cast to [B
> at
> org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
> at
> org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:590)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$CountingFnDataReceiver.accept(SdkHarnessClient.java:667)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.processElements(FlinkExecutableStageFunction.java:271)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:203)
> at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)