[
https://issues.apache.org/jira/browse/FLINK-21317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280863#comment-17280863
]
Kezhu Wang commented on FLINK-21317:
------------------------------------
Hi [~ym], I found this in code reading not production use case.
I pushed a [test
case|https://github.com/kezhuw/flink/commit/83b14ef37d1efe34a78595ef89981fa478a2e419]
in my repository to demonstrate this. It should be much more clear. Below is
a more verbose stack trace matching the code:
{noformat}
Caused by: java.io.IOException: Exception while applying ReduceFunction in
reducing state
at
org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:102)
at
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase$1.map(KafkaShuffleExactlyOnceITCase.java:190)
at
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase$1.map(KafkaShuffleExactlyOnceITCase.java:173)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaShuffleFetcher.partitionConsumerRecordsHandler(KafkaShuffleFetcher.java:130)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:256)
Caused by: java.lang.NullPointerException
at
org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:205)
at
org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:100)
... 16 more
{noformat}
Besides this, I found {{FlinkKafkaShuffle}} will not stop consuming after
bounded input exhausted. I will open another issue if confirmed.
> Downstream keyed state not work after FlinkKafkaShuffle
> -------------------------------------------------------
>
> Key: FLINK-21317
> URL: https://issues.apache.org/jira/browse/FLINK-21317
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.0
> Reporter: Kezhu Wang
> Priority: Major
>
> {{FlinkKafkaShuffle}} uses
> {{KeyGroupRangeAssignment.assignKeyToParallelOperator}} to assign partition
> records to kafka topic partition. The assignment works as follow:
> # {{KeyGroupRangeAssignment.assignToKeyGroup(Object key, int
> maxParallelism)}} assigns key to key group.
> # {{KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(int
> maxParallelism, int parallelism, int keyGroupId)}} assigns that key group to
> operator/subtask index.
> When kafka topic partitions are consumed, they are redistributed by
> {{KafkaTopicPartitionAssigner.assign(KafkaTopicPartition partition, int
> numParallelSubtasks)}}. I copied code of this redistribution here.
> {code:java}
> public class KafkaTopicPartitionAssigner {
> public static int assign(KafkaTopicPartition partition, int
> numParallelSubtasks) {
> int startIndex =
> ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) %
> numParallelSubtasks;
> // here, the assumption is that the id of Kafka partitions are always
> ascending
> // starting from 0, and therefore can be used directly as the offset
> clockwise from the
> // start index
> return (startIndex + partition.getPartition()) % numParallelSubtasks;
> }
> }
> {code}
> This partition redistribution breaks prerequisites for
> {{DataStreamUtils.reinterpretAsKeyedStream}}, that is key groups are messed
> up. The consequence is unusable keyed state. I list deepest stack trace
> captured here:
> {noformat}
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:205)
> at
> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:100)
> {noformat}
> cc [~ym] [~sewen] [~AHeise] [~pnowojski]
> Below is my proposed changes:
> * Make assignment between partition and subtask customizable.
> * Provide a 0-based round-robin assignment. (This is making {{startIndex}} 0
> in existing assignment algorithms.)
> I saw FLINK-8570, above changes could be helpful if we finally decide to
> deliver FLINK-8570.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)