[
https://issues.apache.org/jira/browse/FLINK-19741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai updated FLINK-19741:
----------------------------------------
Description:
h2. *Diagnosis*
Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt
to read from the provided raw keyed state streams (using
{{InternalTimerServiceSerializationProxy}}):
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117
This is incorrect, since we don't write with the
{{InternalTimerServiceSerializationProxy}} if the timers do not require legacy
synchronous snapshots:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
(we currently only require that when users use RocksDB backend + heap timers).
Therefore, the {{InternalTimeServiceManager}} can fail to be created on restore
due to corrupt reads in the case where:
* a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false
(hence nothing was written, and the time service manager does not use the raw
keyed stream)
* the raw keyed stream is used elsewhere (e.g. in the Flink application's user
code)
* on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}}
attempts to read from the raw keyed stream with the
{{InternalTimerServiceSerializationProxy}}.
Full error stack trace (with Flink 1.11.1):
{code}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.io.IOException: position out of bounds
at
org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
... 9 more
Caused by: java.io.IOException: position out of bounds
at
org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
... 10 more
{code}
h2. *Reproducing*
- Have an application with any operator that uses and writes to raw keyed state
streams
- Use heap backend + any timer factory or RocksDB backend + RocksDB timers
- Take a savepoint or wait for a checkpoint, and trigger a restore
h2. *Proposed Fix*
The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag in:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231
was:
h2. *Diagnosis*
Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt
to read from the provided raw keyed state streams (using
{{InternalTimerServiceSerializationProxy}}):
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117
This is incorrect, since we don't write with the
{{InternalTimerServiceSerializationProxy}} if the timers do not require legacy
synchronous snapshots:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
(we currently only require that when users use RocksDB backend + heap timers).
Therefore, the {{InternalTimeServiceManager}} can fail to be created on restore
due to corrupt reads in the case where:
* a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false
(hence nothing was written, and the time service manager does not use the raw
keyed stream)
* the raw keyed stream is used elsewhere (e.g. in the Flink application's user
code)
* on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}}
attempts to read from the raw keyed stream with the
{{InternalTimerServiceSerializationProxy}}.
Full stack trace (with Flink 1.11.1):
{code}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.io.IOException: position out of bounds
at
org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
... 9 more
Caused by: java.io.IOException: position out of bounds
at
org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
... 10 more
{code}
h2. *Reproducing*
- Have an application with any operator that uses and writes to raw keyed state
streams
- Use heap backend + any timer factory or RocksDB backend + RocksDB timers
- Take a savepoint or wait for a checkpoint, and trigger a restore
h2. *Proposed Fix*
The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag in:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231
> InternalTimeServiceManager fails to restore due to corrupt reads if there are
> other users of raw keyed state streams
> --------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-19741
> URL: https://issues.apache.org/jira/browse/FLINK-19741
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.9.3, 1.10.2, 1.11.2
> Reporter: Tzu-Li (Gordon) Tai
> Priority: Blocker
>
> h2. *Diagnosis*
> Currently, when restoring a {{InternalTimeServiceManager}}, we always attempt
> to read from the provided raw keyed state streams (using
> {{InternalTimerServiceSerializationProxy}}):
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117
> This is incorrect, since we don't write with the
> {{InternalTimerServiceSerializationProxy}} if the timers do not require
> legacy synchronous snapshots:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
> (we currently only require that when users use RocksDB backend + heap timers).
> Therefore, the {{InternalTimeServiceManager}} can fail to be created on
> restore due to corrupt reads in the case where:
> * a checkpoint was taken where {{useLegacySynchronousSnapshots}} is false
> (hence nothing was written, and the time service manager does not use the raw
> keyed stream)
> * the raw keyed stream is used elsewhere (e.g. in the Flink application's
> user code)
> * on restore from the checkpoint, {{InternalTimeServiceManagerImpl.create()}}
> attempts to read from the raw keyed stream with the
> {{InternalTimerServiceSerializationProxy}}.
> Full error stack trace (with Flink 1.11.1):
> {code}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
> at
> org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
> ... 9 more
> Caused by: java.io.IOException: position out of bounds
> at
> org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
> ... 10 more
> {code}
> h2. *Reproducing*
> - Have an application with any operator that uses and writes to raw keyed
> state streams
> - Use heap backend + any timer factory or RocksDB backend + RocksDB timers
> - Take a savepoint or wait for a checkpoint, and trigger a restore
> h2. *Proposed Fix*
> The fix would be to also respect the {{useLegacySynchronousSnapshots}} flag
> in:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231
--
This message was sent by Atlassian Jira
(v8.3.4#803005)