Antti-Kaikkonen edited a comment on pull request #13773: URL: https://github.com/apache/flink/pull/13773#issuecomment-717293281
I tried to build this from source and got an error when trying to restore a stateful function from a savepoint: 1) ``` git clone https://github.com/tzulitai/flink.git cd flink git checkout FLINK-19748-backport_1.11 mvn clean package -DskipTests ``` 2) add to flink-conf.yaml: ``` classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf #optionally use rocksdb state.backend: rocksdb taskmanager.numberOfTaskSlots: 2 parallelism.default: 2 ``` 2) Run https://github.com/Antti-Kaikkonen/FlinkStatefunCountTo1M with parallelism 2 3) create a savepoint 4) try to restore from the savepoint and the error is thrown in the **feedback-union -> functions** task: ``` java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.StreamCorruptedException: invalid stream header: 008E0A20 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:918) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:376) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:69) at org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.<init>(InstantiationUtil.java:227) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:572) at org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotReaderPreVersioned.restoreKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:308) at org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotReader.readTimersSnapshot(InternalTimersSnapshotReaderWriters.java:261) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.read(InternalTimerServiceSerializationProxy.java:115) at org.apache.flink.core.io.PostVersionedIOReadableWritable.read(PostVersionedIOReadableWritable.java:76) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:217) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:252) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181) ... 9 more ``` I'm getting the same error with the default state backend and the rocksdb state backend. When I tried with rocksdb backend and heap timers I get a different error already when creating a savepoint. **Edit:** Apparently I had accidentally built the FLINK-19741-backport_1.11 branch. I have now updated the above description to reflect this pull request (FLINK-19748-backport_1.11) and added the error I that got with FLINK-19741-backport_1.11 (pull request #13762) below: ``` Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:220) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:251) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:179) ... 9 more Caused by: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.medescriptionmory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:458) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:411) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:244) ... 10 more ``` which is the same error as in my original bug description https://issues.apache.org/jira/projects/FLINK/issues/FLINK-19692 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
