[
https://issues.apache.org/jira/browse/BEAM-7144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862784#comment-16862784
]
Maximilian Michels commented on BEAM-7144:
------------------------------------------
After a decent amount of debugging of the checkpoint/restore/rescale code, I
managed to find the culprit. The issue here is the lazy state creation that
Beam uses by default. State is created in Flink's state backend upon first
"binding" of the state. Binding is only performed when {{processElement}} is
called. Now, it can happen that an operator does not receive elements before
checkpointed. In this case the state won't be initialized.
Depending on the {{maxParallelism}}, this can lead to the state being
partitioned such that an operator attempts to restore from a keygroup that did
not have the state in its meta data. However, the actual data ends up
containing the state which looks like a bug in the Flink key group partitioner.
The workaround is to ensure that all user state is registered before
checkpointing is first attempted. This resolves all issues with rescaling Beam
jobs.
> Job re-scale fails on Flink 1.7
> -------------------------------
>
> Key: BEAM-7144
> URL: https://issues.apache.org/jira/browse/BEAM-7144
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.11.0
> Reporter: Jozef Vilcek
> Assignee: Maximilian Michels
> Priority: Major
>
> I am unable to rescale job after moving it to flink runner 1.7. What I am
> doing is:
> # Recompile job code just with swapped flink runner version 1.5 -> 1.7
> # Run streaming job with parallelism 112 and maxParallelism 448
> # Wait until checkpoint is taken
> # Stop job
> # Run job again with parallelims 224 and checpooint path to restore from
> # Job fails
> The same happens if I try to increase parallelims. This procedure works for
> the same job compiled with flink runner 1.5 and run on 1.5.0. Fails with
> runner 1.7 on flink 1.7.2
> Exception is:
> {noformat}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> WindowDoFnOperator_2b6af61dc418f10e82551367a7e7f78e_(83/224) from any of the
> 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 5 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 101, Size: 0
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:73)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> ... 7 more{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)