[ 
https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931289#comment-16931289
 ] 

zhijiang edited comment on FLINK-14087 at 9/17/19 10:48 AM:
------------------------------------------------------------

Thanks for the quick response [~aljoscha]!

The topology from [~jiangyu] is like this :

DataStream dataStream = env.addSource().rebalance();
 dataStream.map().setParallelism(2).filter();
 dataStream.map().setParallelism(3).sink();

 

In StreamGraph#addVirtualPartitionNode, the {{RebalancePartitioner}} is cached 
inside the structure of {{virtualPartitionNodes}}.

And during StreamGraph#addEdgeInternal(), the same {{RebalancePartitioner}} 
instance would be fetched from virtualPartitionNodes while adding different 
edges.


was (Author: zjwang):
Thanks for the quick response [~aljoscha]!

The topology from [~jiangyu] is like this :

DataStream dataStream = env.addSource().rebalance();
dataStream.map().setParallelism(2).filter();
dataStream.map().setParallelism(3).sink();

 

In StreamGraph#addVirtualPartitionNode, the {{RebalancePartitioner}} is cached 
inside the structure of {{virtualPartitionNodes}}.

And during StreamGraph#addEdgeInternal(), the same {{RebalancePartitioner}} 
instance would be fetched from {{virtualPartitionNodes }}while adding different 
edges.

> throws java.lang.ArrayIndexOutOfBoundsException  when emiting the data using 
> RebalancePartitioner. 
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14087
>                 URL: https://issues.apache.org/jira/browse/FLINK-14087
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.8.0, 1.8.1, 1.9.0
>            Reporter: luojiangyu
>            Priority: Major
>         Attachments: image-2019-09-16-19-14-39-403.png, 
> image-2019-09-16-19-15-34-639.png
>
>
> There is the condition the RecordWriter sharing the ChannelSelector instance.
> When two RecordWriter instance shared the same ChannelSelector Instance , It 
> may throws 
> java.lang.ArrayIndexOutOfBoundsException .  For example,  two recordWriter 
> instance shared the RebalancePartitioner instance. the RebalancePartitioner 
> instance setup 2 number of Channels when the first RecordWriter initializing, 
> next the some RebalancePartitioner instance setup 3 number of  channels When 
> the second RecordWriter initializing. this  throws 
> ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the 
> data.
> The Exception likes
> |java.lang.RuntimeException: 2 at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>  at 
> com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>  at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.ArrayIndexOutOfBoundsException: 2 at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
>  ... 14 more|



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to