[
https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930432#comment-16930432
]
zhijiang commented on FLINK-14087:
----------------------------------
Thanks for reporting this [~jiangyu].
We actually refactored the internal logic for RebalancePartitioner before. In
the past it would check whether the selected channel index is beyond the number
of channels before return. If exceeds, it would return the first channel index.
But it is breaking my previous assumption that multiple RecordWriter instances
would share the same ChannelSelector instance. I need to double check the
process of StreamGraph generation whether it would happen this case. It seems
not reasonable to share the same internal state for different output edges. If
so, for different parallelism in one output edge, it is not strictly rebalanced.
Could you share me your topology structure or the codes you submit the job,
then I could easily debug the process of generating StreamGraph.
> throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using
> RebalancePartitioner.
> ---------------------------------------------------------------------------------------------------
>
> Key: FLINK-14087
> URL: https://issues.apache.org/jira/browse/FLINK-14087
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.8.0, 1.8.1, 1.9.0
> Reporter: luojiangyu
> Priority: Major
>
> There is the condition the RecordWriter sharing the ChannelSelector instance.
> When two RecordWriter instance shared the same ChannelSelector Instance , It
> may throws
> java.lang.ArrayIndexOutOfBoundsException . For example, two recordWriter
> instance shared the RebalancePartitioner instance. the RebalancePartitioner
> instance setup 2 number of Channels when the first RecordWriter initializing,
> next the some RebalancePartitioner instance setup 3 number of channels When
> the second RecordWriter initializing. this throws
> ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the
> data.
> The Exception likes
> |java.lang.RuntimeException: 2 at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.ArrayIndexOutOfBoundsException: 2 at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
> ... 14 more|
--
This message was sent by Atlassian Jira
(v8.3.2#803003)