[
https://issues.apache.org/jira/browse/FLINK-19741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-19741:
-----------------------------------
Labels: pull-request-available (was: )
> InternalTimeServiceManager fails to restore due to corrupt reads if there are
> other users of raw keyed state streams
> --------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-19741
> URL: https://issues.apache.org/jira/browse/FLINK-19741
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.9.3, 1.10.2, 1.11.2
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> h2. *Diagnosis*
> Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt
> to read from the provided raw keyed state streams (using
> {{InternalTimerServiceSerializationProxy}}):
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117
> This is incorrect, since we don't write with the
> {{InternalTimerServiceSerializationProxy}} if the timers do not require
> legacy synchronous snapshots:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
> (we currently only require that when users use RocksDB backend + heap timers).
> Therefore, the {{InternalTimeServiceManager}} can fail to be created on
> restore due to corrupt reads in the case where:
> * a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false
> (hence nothing was written, and the time service manager does not use the raw
> keyed stream)
> * the raw keyed stream is used elsewhere (e.g. in the Flink application's
> user code)
> * on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}}
> attempts to read from the raw keyed stream with the
> {{InternalTimerServiceSerializationProxy}}.
> Full error stack trace (with Flink 1.11.1):
> {code}
> 2020-10-21 13:16:51
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:197)
> at java.io.DataInputStream.readUTF(DataInputStream.java:609)
> at java.io.DataInputStream.readUTF(DataInputStream.java:564)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.read(InternalTimerServiceSerializationProxy.java:110)
> at
> org.apache.flink.core.io.PostVersionedIOReadableWritable.read(PostVersionedIOReadableWritable.java:76)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:217)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:234)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
> ... 9 more
> {code}
> h2. *Reproducing*
> - Have an application with any operator that uses and writes to raw keyed
> state streams
> - Use heap backend + any timer factory or RocksDB backend + RocksDB timers
> - Take a savepoint or wait for a checkpoint, and trigger a restore
> h2. *Proposed Fix*
> The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag
> in:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231
--
This message was sent by Atlassian Jira
(v8.3.4#803005)