[ 
https://issues.apache.org/jira/browse/FLINK-34171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17910992#comment-17910992
 ] 

Adam Domański commented on FLINK-34171:
---------------------------------------

Hi,

I'm experiencing similar problem within Flink 1.20.0.

Exception thrown when restoring from savepoint. Parallelism wasn't changed.

No unaligned checkpoints, no custom keys in KeyedDataStream - only one 
ProcessFunction that uses String as a key, a couple of Flink SQL uses.
{code:java}
org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception 
while processing timer.
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1711)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1686)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1840)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$27(StreamTask.java:1829)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:101) 
~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:414)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:399)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:361)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:815)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
~[flink-dist-1.20.0.jar:1.20.0]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
~[flink-dist-1.20.0.jar:1.20.0]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist-1.20.0.jar:1.20.0]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: 
java.lang.RuntimeException: Exception occurred while setting the current key 
context.
    ... 15 more
Caused by: java.lang.RuntimeException: Exception occurred while setting the 
current key context.
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:409)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:567)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:296)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1838)
 ~[flink-dist-1.20.0.jar:1.20.0]
    ... 14 more
Caused by: java.lang.IllegalArgumentException: Key group 200 is not in 
KeyGroupRange{startKeyGroup=14, endKeyGroup=15}. Unless you're directly using 
low level state access APIs, this is most likely caused by non-deterministic 
shuffle key (hashCode and equals implementation).
    at 
org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.runtime.state.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:74)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:249)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:445)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:407)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:567)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:296)
 ~[flink-dist-1.20.0.jar:1.20.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1838)
 ~[flink-dist-1.20.0.jar:1.20.0]
    ... 14 more   {code}

> Cannot restore from savepoint when increasing parallelism of operator using 
> reinterpretAsKeyedStream and RichAsyncFunction
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34171
>                 URL: https://issues.apache.org/jira/browse/FLINK-34171
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Runtime / Checkpointing, Runtime / 
> State Backends
>    Affects Versions: 1.17.0
>            Reporter: Ken Burford
>            Priority: Major
>
> We recently upgraded from Flink 1.14.2 to 1.17.0. Our job has not materially 
> changed beyond a few feature changes (enabling snapshot compression, 
> unaligned checkpoints), but we're seeing the following exception when 
> attempting to adjust the parallelism of our job up or down:
> {code:java}
> java.lang.RuntimeException: Exception occurred while setting the current key 
> context.
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
>     at 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
>     at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>     at 
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)
>     at 
> org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272)
>     at 
> org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159)
>     at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:393)
>     at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$1800(AsyncWaitOperator.java:92)
>     at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:621)
>     at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:602)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:712)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.IllegalArgumentException: Key group 30655 is not in 
> KeyGroupRange{startKeyGroup=19346, endKeyGroup=19360}. Unless you're directly 
> using low level state access APIs, this is most likely caused by 
> non-deterministic shuffle key (hashCode and equals implementation).
>     at 
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
>     at 
> org.apache.flink.runtime.state.heap.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:77)
>     at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:250)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430)
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
>     ... 29 more{code}
>  
> We're seeing this in an operator where we make use of 
> DataStreamUtils::reinterpretAsKeyedStream for collecting multiple tasks into 
> a single operator chain. However, each task takes the same data structure as 
> input with an immutable key (represented as a string) which all use the same 
> exact KeySelector instance.
> However, one pattern we're using here is a chain of:
> KeyedProcessFunction --> RichAsyncFunction --> 
> reinterpretAsKeyedStream(KeyedProcessFunction)
> ...and I suspect that this might have something to do the way that the 
> buffered in-flight data from the RichAsyncFunction is redistributed during 
> re-scaling. We've observed that this failure is seemingly non-deterministic 
> during re-scaling, but the probability of encountering it (from our 
> admittedly anecdotal and limited testing) is reduced, but not eliminated, 
> when we disable unaligned checkpoints. (Note that we first take a savepoint, 
> restore with unaligned checkpoints disabled, then take another savepoint 
> which we then use to adjust the parallelism to keep "persisted in-flight 
> data" out the savepoint.)
> We've never had any issues in the past with this under 1.14, so we're 
> wondering if this is due to unaligned checkpointing, or possibly a 
> regression/change in behavior since then. And if it is due to unaligned 
> checkpointing, any thoughts on why disabling it hasn't seemed to address the 
> problem?
> Update: I took a savepoint and confirmed that it could not be restored under 
> a new parallelism. I then removed the RichAsyncFunction and restarted the job 
> (using --allowNonRestoredState) successfully. I was then able to take another 
> savepoint and restart the job with the RichAsyncFunction re-inserted into the 
> DS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to