[
https://issues.apache.org/jira/browse/BEAM-6294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738701#comment-16738701
]
Thomas Weise commented on BEAM-6294:
------------------------------------
Inserting shuffle into the synthetic source example:
{code:java}
diff --git a/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py
b/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py
index 0cfaf5d142..cb614841f2 100644
--- a/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py
+++ b/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py
@@ -80,6 +80,7 @@ def run(argv=None):
.set_interval_ms(known_args.interval_ms))
_ = (messages | 'decode' >> beam.Map(lambda x: ('', 1))
+ | 'reshuffle' >> beam.Reshuffle()
| 'window' >> beam.WindowInto(window.GlobalWindows(),
trigger=Repeatedly(
AfterProcessingTime(5 * 1000)), {code}
yields following error:
{code:java}
java.lang.RuntimeException: Failed to process element with SDK harness.
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.processElement(ExecutableStageDoFnOperator.java:583)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:460)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: org.apache.beam.sdk.values.KV cannot
be cast to [B
at
org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
at
org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:520)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:480)
at
org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.accept(BeamFnDataBufferingOutboundObserver.java:127)
at
org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.accept(BeamFnDataBufferingOutboundObserver.java:45)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.processElement(ExecutableStageDoFnOperator.java:581)
... 7 more {code}
(Works fine after reverting that shuffle PR.)
> Use Flink's redistribute for reshuffle.
> ---------------------------------------
>
> Key: BEAM-6294
> URL: https://issues.apache.org/jira/browse/BEAM-6294
> Project: Beam
> Issue Type: New Feature
> Components: runner-flink, sdk-py-core
> Reporter: Robert Bradshaw
> Assignee: Robert Bradshaw
> Priority: Major
> Fix For: 2.10.0
>
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Python needs to publish the URN over the FnAPI which is pretty easy, but
> Flink also needs to ensure that the composite structure does not get fused.
> Unlike with GBK, we can't assume all runners implement this as a primitive.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)