[
https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031354#comment-17031354
]
Yun Gao edited comment on FLINK-14087 at 2/6/20 8:01 AM:
---------------------------------------------------------
Hi all, we have met with a similar problem online recently: uses write the code
{code:java}
a = env.addSource().rebalance();
b = a.map()
c = b.map() {code}
and the rebalance partition is then shared between the edges _a -> b_ and _a ->
c_, which causes half of tasks of _b_ and _c_ do not receive records. As a
temporary fix, we asked users to use the following code instead
{code:java}
a = env.addSource();
b = a.rebalance().map();
c = b.rebalance().map()
{code}
In summary, currently when downstream operators are connected to the same
rebalance() PartitionTransformation, the partitioner is shared between all the
edges at runtime. Otherwise when the downstream operators are connected to
different rebalance() PartitionTransformation, each edge will have its own
partitioner at runtime. I think this should also be reasonable, but it seems to
be a little divergence from the intuitive thought of users.
Therefore, do we still think always using distinct partitioner for each edge is
better ? If so, we should copy the partitioner for each edge when creating
graph. The down side of the change will still be that if users really want to
share the same partitioner between the edges, he would not be able to achieve
this after the change.
was (Author: gaoyunhaii):
Hi all, we have met with a similar problem online recently: uses write the code
{code:java}
a = env.addSource().rebalance();
b = a.map()
c = b.map() {code}
and the rebalance partition is then shared between the edges _a -> b_ and _a ->
c_, which causes half of tasks of _b_ and _c_ do not receive records. As a
temporary fix, we asked users to use the following code instead
{code:java}
a = env.addSource();
b = a.rebalance().map();
c = b.rebalance().map()
{code}
In summary, currently when downstream operators are connected to the same
rebalance() PartitionTransformation, the partitioner is shared between all the
edges at runtime. Otherwise when the downstream operators are connected to
different rebalance() PartitionTransformation, each edge will have its own
partitioner at runtime. I think this should also be reasonable, but it seems to
be a little divergence from the intuitive thought of users.
Therefore, do we still think always using distinct partitioner for each edge is
better ? If so, we should copy the partitioner for each edge when creating
graph. The down side of the change will still be that if users really want to
share the same partitioner between the edges, he would not be able to achieve
this after the change.
> throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using
> RebalancePartitioner.
> ---------------------------------------------------------------------------------------------------
>
> Key: FLINK-14087
> URL: https://issues.apache.org/jira/browse/FLINK-14087
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.8.0, 1.8.1, 1.9.0
> Reporter: luojiangyu
> Priority: Major
> Attachments: image-2019-09-16-19-14-39-403.png,
> image-2019-09-16-19-15-34-639.png
>
>
> There is the condition the RecordWriter sharing the ChannelSelector instance.
> When two RecordWriter instance shared the same ChannelSelector Instance , It
> may throws
> java.lang.ArrayIndexOutOfBoundsException . For example, two recordWriter
> instance shared the RebalancePartitioner instance. the RebalancePartitioner
> instance setup 2 number of Channels when the first RecordWriter initializing,
> next the some RebalancePartitioner instance setup 3 number of channels When
> the second RecordWriter initializing. this throws
> ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the
> data.
> The Exception likes
> |java.lang.RuntimeException: 2 at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.ArrayIndexOutOfBoundsException: 2 at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
> ... 14 more|
--
This message was sent by Atlassian Jira
(v8.3.4#803005)