[ 
https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031354#comment-17031354
 ] 

Yun Gao edited comment on FLINK-14087 at 2/6/20 8:01 AM:
---------------------------------------------------------

Hi all, we have met with a similar problem online recently: uses write the code 
{code:java}
 a = env.addSource().rebalance();
 b = a.map()
 c = b.map() {code}
 

and the rebalance partition is then shared between the edges _a -> b_ and _a -> 
c_, which causes half of tasks of _b_ and _c_ do not receive records. As a 
temporary fix, we asked users to use the following code instead
{code:java}
a = env.addSource();
b = a.rebalance().map();
c = b.rebalance().map()
{code}
 

 

In summary, currently when downstream operators are connected to the same 
rebalance() PartitionTransformation, the partitioner is shared between all the 
edges at runtime. Otherwise when the downstream operators are connected to 
different rebalance() PartitionTransformation, each edge will have its own 
partitioner at runtime. I think this should also be reasonable, but it seems to 
be a little divergence from the intuitive thought of users.

 

Therefore, do we still think always using distinct partitioner for each edge is 
better ? If so, we should copy the partitioner for each edge when creating 
graph. The down side of the change will still be that if users really want to 
share the same partitioner between the edges, he would not be able to achieve 
this after the change.


was (Author: gaoyunhaii):
Hi all, we have met with a similar problem online recently: uses write the code

 
{code:java}
 a = env.addSource().rebalance();
 b = a.map()
 c = b.map() {code}
 

and the rebalance partition is then shared between the edges _a -> b_ and _a -> 
c_, which causes half of tasks of _b_ and _c_ do not receive records. As a 
temporary fix, we asked users to use the following code instead

 
{code:java}
a = env.addSource();
b = a.rebalance().map();
c = b.rebalance().map()
{code}
 

 

In summary, currently when downstream operators are connected to the same 
rebalance() PartitionTransformation, the partitioner is shared between all the 
edges at runtime. Otherwise when the downstream operators are connected to 
different rebalance() PartitionTransformation, each edge will have its own 
partitioner at runtime. I think this should also be reasonable, but it seems to 
be a little divergence from the intuitive thought of users.

 

Therefore, do we still think always using distinct partitioner for each edge is 
better ? If so, we should copy the partitioner for each edge when creating 
graph. The down side of the change will still be that if users really want to 
share the same partitioner between the edges, he would not be able to achieve 
this after the change.

> throws java.lang.ArrayIndexOutOfBoundsException  when emiting the data using 
> RebalancePartitioner. 
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14087
>                 URL: https://issues.apache.org/jira/browse/FLINK-14087
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.8.0, 1.8.1, 1.9.0
>            Reporter: luojiangyu
>            Priority: Major
>         Attachments: image-2019-09-16-19-14-39-403.png, 
> image-2019-09-16-19-15-34-639.png
>
>
> There is the condition the RecordWriter sharing the ChannelSelector instance.
> When two RecordWriter instance shared the same ChannelSelector Instance , It 
> may throws 
> java.lang.ArrayIndexOutOfBoundsException .  For example,  two recordWriter 
> instance shared the RebalancePartitioner instance. the RebalancePartitioner 
> instance setup 2 number of Channels when the first RecordWriter initializing, 
> next the some RebalancePartitioner instance setup 3 number of  channels When 
> the second RecordWriter initializing. this  throws 
> ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the 
> data.
> The Exception likes
> |java.lang.RuntimeException: 2 at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>  at 
> com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.ArrayIndexOutOfBoundsException: 2 at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
>  ... 14 more|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to