[ https://issues.apache.org/jira/browse/FLINK-27031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516867#comment-17516867 ]
Roman Khachatryan commented on FLINK-27031: ------------------------------------------- The test fails reliably when rescaling from 1 to 3 or higher DoP, ~6 runs out of 10. ACCUMULATE_TIME_MILLIS doesn't affect it and can be zero. Furthermore, it only fails when both Changelog and Unaligned checkpoints are enabled. While debugging, I can see that: - the problematic record comes from ResultSubpartition state - according to its key, it should be assigned to subtask 1 or 2 - it is produced by upstream 0 and consumed by downstream 0 (the downstream should filter it out) - however, the downstream 0 doesn't setup Virtual Channels that could filter the record; when it does, the test passes I didn't look further why Virtual Channels aren't being setup. >From the above, the failure seems to be caused by Unaligned checkpoints rather >than changelog. [~pnowojski] could you please take a look? > ChangelogRescalingITCase.test failed due to IllegalStateException > ----------------------------------------------------------------- > > Key: FLINK-27031 > URL: https://issues.apache.org/jira/browse/FLINK-27031 > Project: Flink > Issue Type: Bug > Components: Runtime / Network, Runtime / State Backends > Affects Versions: 1.15.0, 1.16.0 > Reporter: Matthias Pohl > Assignee: Roman Khachatryan > Priority: Critical > > [This > build|https://dev.azure.com/mapohl/flink/_build/results?buildId=923&view=logs&j=cc649950-03e9-5fae-8326-2f1ad744b536&t=a9a20597-291c-5240-9913-a731d46d6dd1&l=12961] > failed in {{ChangelogRescalingITCase.test}}: > {code} > Apr 01 20:26:53 Caused by: java.lang.IllegalArgumentException: Key group 94 > is not in KeyGroupRange{startKeyGroup=96, endKeyGroup=127}. Unless you're > directly using low level state access APIs, this is most likely caused by > non-deterministic shuffle key (hashCode and equals implementation). > Apr 01 20:26:53 at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) > Apr 01 20:26:53 at > org.apache.flink.runtime.state.heap.StateTable.getMapForKeyGroup(StateTable.java:305) > Apr 01 20:26:53 at > org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:261) > Apr 01 20:26:53 at > org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143) > Apr 01 20:26:53 at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:94) > Apr 01 20:26:53 at > org.apache.flink.state.changelog.ChangelogListState.add(ChangelogListState.java:78) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:404) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > Apr 01 20:26:53 at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) > Apr 01 20:26:53 at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) > Apr 01 20:26:53 at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:531) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:227) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:841) > Apr 01 20:26:53 at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:767) > Apr 01 20:26:53 at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > Apr 01 20:26:53 at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > Apr 01 20:26:53 at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > Apr 01 20:26:53 at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > Apr 01 20:26:53 at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)