[
https://issues.apache.org/jira/browse/FLINK-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931171#comment-16931171
]
zhijiang commented on FLINK-14087:
----------------------------------
Thanks for the further offline confirmation with me [~jiangyu].
After checking the relevant process in StreamGraph, the StreamPartitioner is
actually shared for different edges via the structure of
{{virtualPartitionNodes}}. IMO this sharing mechanism is not very reasonable
because of two issues:
* From semantic aspect, it is the global rebalance among different stream
edges, which would cause actually imbalanced case among different parallelism
of one edge. It seems more make sense to rebalance partially in one edge scope.
* It would limit the runtime implementation or it assumes that there is no
state maintaince in runtime implementation. Otherwise it would bring unexpected
behavior. In previous version of RebalancePartitioner, it would check whether
the selected channel index is beyond the number of channels for every call, so
it hides this potential issue before, no matter with whether this behavior is
actually balanced or not. And the latest refactoring work removes this check
and makes the number of channels as the property of partitioner, then it
exposes this bug.
Considering the solution, I prefer to adjust the process of stream graph to
build separate partitioner instance for every stream edge. Are there other
suggestions or inputs [~pnowojski] ?
> throws java.lang.ArrayIndexOutOfBoundsException when emiting the data using
> RebalancePartitioner.
> ---------------------------------------------------------------------------------------------------
>
> Key: FLINK-14087
> URL: https://issues.apache.org/jira/browse/FLINK-14087
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.8.0, 1.8.1, 1.9.0
> Reporter: luojiangyu
> Priority: Major
> Attachments: image-2019-09-16-19-14-39-403.png,
> image-2019-09-16-19-15-34-639.png
>
>
> There is the condition the RecordWriter sharing the ChannelSelector instance.
> When two RecordWriter instance shared the same ChannelSelector Instance , It
> may throws
> java.lang.ArrayIndexOutOfBoundsException . For example, two recordWriter
> instance shared the RebalancePartitioner instance. the RebalancePartitioner
> instance setup 2 number of Channels when the first RecordWriter initializing,
> next the some RebalancePartitioner instance setup 3 number of channels When
> the second RecordWriter initializing. this throws
> ArrayIndexOutOfBoundsException when the first RecordWriter instance emits the
> data.
> The Exception likes
> |java.lang.RuntimeException: 2 at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:112)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:673)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:617)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:699)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> com.xx.flink.demo.wordcount.case3.StateTest$Source.run(StateTest.java:107) at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:734) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.ArrayIndexOutOfBoundsException: 2 at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:255)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
> ... 14 more|
--
This message was sent by Atlassian Jira
(v8.3.2#803003)