[
https://issues.apache.org/jira/browse/FLINK-34171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ken Burford updated FLINK-34171:
--------------------------------
Description:
We recently upgraded from Flink 1.14.2 to 1.17.0. Our job has not materially
changed beyond a few feature changes (enabling snapshot compression, unaligned
checkpoints), but we're seeing the following exception when attempting to
adjust the parallelism of our job up or down:
{code:java}
java.lang.RuntimeException: Exception occurred while setting the current key
context.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
at
org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
at
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)
at
org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272)
at
org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:393)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$1800(AsyncWaitOperator.java:92)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:621)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:602)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:712)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Key group 30655 is not in
KeyGroupRange{startKeyGroup=19346, endKeyGroup=19360}. Unless you're directly
using low level state access APIs, this is most likely caused by
non-deterministic shuffle key (hashCode and equals implementation).
at
org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
at
org.apache.flink.runtime.state.heap.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:77)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:250)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
... 29 more{code}
We're seeing this in an operator where we make use of
DataStreamUtils::reinterpretAsKeyedStream for collecting multiple tasks into a
single operator chain. However, each task takes the same data structure as
input with an immutable key (represented as a string) which all use the same
exact KeySelector instance.
However, one pattern we're using here is a chain of:
KeyedProcessFunction --> RichAsyncFunction -->
reinterpretAsKeyedStream(KeyedProcessFunction)
...and I suspect that this might have something to do the way that the buffered
in-flight data from the RichAsyncFunction is redistributed during re-scaling.
We've observed that this failure is seemingly non-deterministic during
re-scaling, but the probability of encountering it (from our admittedly
anecdotal and limited testing) is reduced, but not eliminated, when we disable
unaligned checkpoints. (Note that we first take a savepoint, restore with
unaligned checkpoints disabled, then take another savepoint which we then use
to adjust the parallelism to keep "persisted in-flight data" out the savepoint.)
We've never had any issues in the past with this under 1.14, so we're wondering
if this is due to unaligned checkpointing, or possibly a regression/change in
behavior since then. And if it is due to unaligned checkpointing, any thoughts
on why disabling it hasn't seemed to address the problem?
was:
We recently upgraded from Flink 1.14.2 to 1.17.0. Our job has not materially
changed beyond a few feature changes (enabling snapshot compression, unaligned
checkpoints), but we're seeing the following exception when attempting to
adjust the parallelism of our job up or down:
{code:java}
java.lang.RuntimeException: Exception occurred while setting the current key
context.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
at
org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
at
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)
at
org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272)
at
org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:393)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$1800(AsyncWaitOperator.java:92)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:621)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:602)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:712)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Key group 30655 is not in
KeyGroupRange{startKeyGroup=19346, endKeyGroup=19360}. Unless you're directly
using low level state access APIs, this is most likely caused by
non-deterministic shuffle key (hashCode and equals implementation).
at
org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
at
org.apache.flink.runtime.state.heap.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:77)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:250)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
... 29 more{code}
We're seeing this in an operator where we make use of
DataStreamUtils::reinterpretAsKeyedStream for collecting multiple tasks into a
single operator chain. However, each task takes the same data structure as
input with an immutable key (represented as a string) which all use the same
exact
KeySelector instance.
However, one pattern we're using here is a chain of:
KeyedProcessFunction --> RichAsyncFunction -->
reinterpretAsKeyedStream(KeyedProcessFunction)
...and I suspect that this might have something to do the way that the buffered
in-flight data from the RichAsyncFunction is redistributed during re-scaling.
We've observed that this failure is seemingly non-deterministic during
re-scaling, but the probability of encountering it (from our admittedly
anecdotal and limited testing) is reduced, but not eliminated, when we disable
unaligned checkpoints. (Note that we first take a savepoint, restore with
unaligned checkpoints disabled, then take another savepoint which we then use
to adjust the parallelism to keep "persisted in-flight data" out the savepoint.)
We've never had any issues in the past with this under 1.14, so we're wondering
if this is due to unaligned checkpointing, or possibly a regression/change in
behavior since then. And if it is due to unaligned checkpointing, any thoughts
on why disabling it hasn't seemed to address the problem?
> Cannot restore from savepoint when increasing parallelism of operator using
> reinterpretAsKeyedStream and RichAsyncFunction
> --------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-34171
> URL: https://issues.apache.org/jira/browse/FLINK-34171
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, Runtime / Checkpointing, Runtime /
> State Backends
> Affects Versions: 1.17.0
> Reporter: Ken Burford
> Priority: Major
>
> We recently upgraded from Flink 1.14.2 to 1.17.0. Our job has not materially
> changed beyond a few feature changes (enabling snapshot compression,
> unaligned checkpoints), but we're seeing the following exception when
> attempting to adjust the parallelism of our job up or down:
> {code:java}
> java.lang.RuntimeException: Exception occurred while setting the current key
> context.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
> at
> org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
> at
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)
> at
> org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272)
> at
> org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:393)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$1800(AsyncWaitOperator.java:92)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:621)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:602)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:712)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.IllegalArgumentException: Key group 30655 is not in
> KeyGroupRange{startKeyGroup=19346, endKeyGroup=19360}. Unless you're directly
> using low level state access APIs, this is most likely caused by
> non-deterministic shuffle key (hashCode and equals implementation).
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
> at
> org.apache.flink.runtime.state.heap.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:77)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:250)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
> ... 29 more{code}
>
> We're seeing this in an operator where we make use of
> DataStreamUtils::reinterpretAsKeyedStream for collecting multiple tasks into
> a single operator chain. However, each task takes the same data structure as
> input with an immutable key (represented as a string) which all use the same
> exact KeySelector instance.
> However, one pattern we're using here is a chain of:
> KeyedProcessFunction --> RichAsyncFunction -->
> reinterpretAsKeyedStream(KeyedProcessFunction)
> ...and I suspect that this might have something to do the way that the
> buffered in-flight data from the RichAsyncFunction is redistributed during
> re-scaling. We've observed that this failure is seemingly non-deterministic
> during re-scaling, but the probability of encountering it (from our
> admittedly anecdotal and limited testing) is reduced, but not eliminated,
> when we disable unaligned checkpoints. (Note that we first take a savepoint,
> restore with unaligned checkpoints disabled, then take another savepoint
> which we then use to adjust the parallelism to keep "persisted in-flight
> data" out the savepoint.)
> We've never had any issues in the past with this under 1.14, so we're
> wondering if this is due to unaligned checkpointing, or possibly a
> regression/change in behavior since then. And if it is due to unaligned
> checkpointing, any thoughts on why disabling it hasn't seemed to address the
> problem?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)