[ 
https://issues.apache.org/jira/browse/FLINK-12653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16907111#comment-16907111
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-12653:
---------------------------------------------

[~mxm] no, the State Processing API merged in 1.9.0 does not contain the 
centralization of the registered state meta info, since it was considered more 
as a follow-up improvement.

But from the looks of it, the proposal still seems like the way to go, and 
would also be beneficial for multiple other issues.
Will look into a plan to implement this with [~carp84] and open subtasks to fix 
this.

> Keyed state backend fails to restore during rescaling
> -----------------------------------------------------
>
>                 Key: FLINK-12653
>                 URL: https://issues.apache.org/jira/browse/FLINK-12653
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.6.4, 1.7.2, 1.8.0
>         Environment: Beam 2.12.0 or any other Beam version
> Flink >= 1.6
> Heap/Filesystem state backend (RocksDB works fine)
>            Reporter: Maximilian Michels
>            Priority: Critical
>
> The Flink Runner includes a test which verifies checkpoints/savepoints work 
> correctly with Beam on Flink. When adding additional tests for 
> scaleup/scaledown [1], I came across a bug with restoring the keyed state 
> backend. After a fair amount of debugging Beam code and checking any 
> potential issues with serializers, I think this could be a Flink issue.
> Steps to reproduce: 
> 1. {{git clone https://github.com/mxm/beam}}
> 2. {{cd beam && git checkout savepoint-problem}}
> 3. {{./gradlew :runners:flink:1.6:test --tests 
> "**.FlinkSavepointTest.testSavepointRestoreLegacy"}}
> Error:
> {noformat}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for DoFnOperator_76375152c4a81d5df72cf49e32c4ecb9_(4/4) from 
> any of the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
>       ... 5 more
> Caused by: java.lang.RuntimeException: Invalid namespace string: ''
>       at 
> org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:245)
>       at 
> org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:246)
>       at 
> org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:221)
>       at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
>       at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:169)
>       at 
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
>       at 
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:513)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:474)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:431)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:370)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>       ... 7 more
> {noformat}
> It is possible to change the {{maxParallelism}} to other values. The 
> following lead to failure:
> {noformat}
>    options.setMaxParallelism(128); // default value
>    options.setMaxParallelism(64);
>     options.setMaxParallelism(118);
> {noformat}
> The following work fine:
> {noformat}
>     options.setMaxParallelism(110);
>     options.setMaxParallelism(63);
>     options.setMaxParallelism(24);
> {noformat}
> [1] 
> https://github.com/apache/beam/commit/52d7291144f64eaa417862558d71a443fae3d690
> Everything works fine with RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to