Arvid Heise created FLINK-22686:
-----------------------------------
Summary: Incompatible subtask mappings while resuming from
unaligned checkpoints
Key: FLINK-22686
URL: https://issues.apache.org/jira/browse/FLINK-22686
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Affects Versions: 1.13.0
Reporter: Arvid Heise
A user
[reported|https://lists.apache.org/x/[email protected]:lte=1M:Flink%201.13.0%20reactive%20mode:%20Job%20stop%20and%20cannot%20restore%20from%20checkpoint]
that he encountered an internal error while resuming during reactive mode.
There isn't an immediate connection to reactive mode, so it's safe to assume
that one rescaling case was not covered.
{noformat}
Caused by: java.lang.IllegalStateException: Incompatible subtask mappings: are
multiple operators ingesting/producing intermediate results with varying
degrees of parallelism?Found RescaleMappings{mappings=[[0, 1, 2, 3, 4, 5, 6, 7,
8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27,
28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46,
47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65,
66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85,
86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103,
104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118,
119], [120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133,
134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148,
149], [150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163,
164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178,
179], [180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193,
194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208,
209]]} and RescaleMappings{mappings=[[0, 7, 14, 21, 28, 35, 42, 49, 56, 63, 70,
77, 84, 91, 98, 105, 112, 119, 126, 133, 140, 147, 154, 161, 168, 175, 182,
189, 196, 203], [1, 8, 15, 22, 29, 36, 43, 50, 57, 64, 71, 78, 85, 92, 99, 106,
113, 120, 127, 134, 141, 148, 155, 162, 169, 176, 183, 190, 197, 204], [2, 9,
16, 23, 30, 37, 44, 51, 58, 65, 72, 79, 86, 93, 100, 107, 114, 121, 128, 135,
142, 149, 156, 163, 170, 177, 184, 191, 198, 205], [3, 10, 17, 24, 31, 38, 45,
52, 59, 66, 73, 80, 87, 94, 101, 108, 115, 122, 129, 136, 143, 150, 157, 164,
171, 178, 185, 192, 199, 206], [4, 11, 18, 25, 32, 39, 46, 53, 60, 67, 74, 81,
88, 95, 102, 109, 116, 123, 130, 137, 144, 151, 158, 165, 172, 179, 186, 193,
200, 207], [5, 12, 19, 26, 33, 40, 47, 54, 61, 68, 75, 82, 89, 96, 103, 110,
117, 124, 131, 138, 145, 152, 159, 166, 173, 180, 187, 194, 201, 208], [6, 13,
20, 27, 34, 41, 48, 55, 62, 69, 76, 83, 90, 97, 104, 111, 118, 125, 132, 139,
146, 153, 160, 167, 174, 181, 188, 195, 202, 209]]}.
at
org.apache.flink.runtime.checkpoint.TaskStateAssignment.checkSubtaskMapping(TaskStateAssignment.java:322)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:306)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:409)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:193)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1566)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1646)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:986)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:976)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57)
~[flink-dist_2.12-1.13.0.jar:1.13.0]
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
~[?:?]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
~[?:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]
at java.lang.Thread.run(Thread.java:834) ~[?:?]
{noformat}
Here it seems that the same gate gets input from a range-partitioned and a
round-robin partitioned channel at the same time. During the implementation of
FLINK-19801, we couldn't find such a case and optimized the implementation
accordingly.
We have asked the user to provide his topology.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)