[
https://issues.apache.org/jira/browse/FLINK-12653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906109#comment-16906109
]
Jan Lukavský commented on FLINK-12653:
--------------------------------------
Although there is a workaround for user state in Beam, this issue still needs
to be fixed in Flink, because Beam can have a "system state" - state that is
not directly defined by users, and this state is now somewhat nasty (has to be
manually registered before any data arrives, and this has to be done for all
runners - not for Flink only).
So +1 to increase priority of this, if it has not been fixed by some other
refactoring in 1.9.
> Keyed state backend fails to restore during rescaling
> -----------------------------------------------------
>
> Key: FLINK-12653
> URL: https://issues.apache.org/jira/browse/FLINK-12653
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.6.4, 1.7.2, 1.8.0
> Environment: Beam 2.12.0 or any other Beam version
> Flink >= 1.6
> Heap/Filesystem state backend (RocksDB works fine)
> Reporter: Maximilian Michels
> Priority: Critical
>
> The Flink Runner includes a test which verifies checkpoints/savepoints work
> correctly with Beam on Flink. When adding additional tests for
> scaleup/scaledown [1], I came across a bug with restoring the keyed state
> backend. After a fair amount of debugging Beam code and checking any
> potential issues with serializers, I think this could be a Flink issue.
> Steps to reproduce:
> 1. {{git clone https://github.com/mxm/beam}}
> 2. {{cd beam && git checkout savepoint-problem}}
> 3. {{./gradlew :runners:flink:1.6:test --tests
> "**.FlinkSavepointTest.testSavepointRestoreLegacy"}}
> Error:
> {noformat}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for DoFnOperator_76375152c4a81d5df72cf49e32c4ecb9_(4/4) from
> any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
> ... 5 more
> Caused by: java.lang.RuntimeException: Invalid namespace string: ''
> at
> org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:245)
> at
> org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:246)
> at
> org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:221)
> at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:169)
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:513)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:474)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:431)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:370)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> ... 7 more
> {noformat}
> It is possible to change the {{maxParallelism}} to other values. The
> following lead to failure:
> {noformat}
> options.setMaxParallelism(128); // default value
> options.setMaxParallelism(64);
> options.setMaxParallelism(118);
> {noformat}
> The following work fine:
> {noformat}
> options.setMaxParallelism(110);
> options.setMaxParallelism(63);
> options.setMaxParallelism(24);
> {noformat}
> [1]
> https://github.com/apache/beam/commit/52d7291144f64eaa417862558d71a443fae3d690
> Everything works fine with RocksDB.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)