Antti-Kaikkonen edited a comment on pull request #13773: URL: https://github.com/apache/flink/pull/13773#issuecomment-717293281
I tried to build this from source and got an error when trying to restore a stateful function from a savepoint: 1) ``` git clone https://github.com/tzulitai/flink.git cd flink git checkout FLINK-19748-backport_1.11 mvn clean package -DskipTests ``` 2) add to flink-conf.yaml: ``` classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf #optionally use rocksdb state.backend: rocksdb taskmanager.numberOfTaskSlots: 2 parallelism.default: 2 ``` 2) Run https://github.com/Antti-Kaikkonen/FlinkStatefunCountTo1M with parallelism 2 3) create a savepoint 4) try to restore from the savepoint and the error is thrown in the **feedback-union -> functions** task: ``` java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.StreamCorruptedException: invalid stream header: 008E0A20 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:918) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:376) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:69) at org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.<init>(InstantiationUtil.java:227) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:572) at org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotReaderPreVersioned.restoreKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:308) at org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotReader.readTimersSnapshot(InternalTimersSnapshotReaderWriters.java:261) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.read(InternalTimerServiceSerializationProxy.java:115) at org.apache.flink.core.io.PostVersionedIOReadableWritable.read(PostVersionedIOReadableWritable.java:76) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:217) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:252) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:181) ... 9 more ``` I'm getting the same error with the default state backend and the rocksdb state backend. When I tried with rocksdb backend and heap timers I get a different error already when creating a savepoint. **Edit: I realized that I should have probably built from https://github.com/apache/flink/pull/13761 instead of this pull request. I'm testing it next** **Edit2:** Apparently my previous attempt was already with FLINK-19741-backport_1.11. I have now tested both FLINK-19741-backport_1.11 and FLINK-19748-backport_1.11 and both of them throw a different error in the **feedback-union -> functions** task. The error with FLINK-19748-backport_1.11 is now is now updated in the description and the error with FLINK-19741-backport_1.11 is below: ``` Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:220) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:251) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:179) ... 9 more Caused by: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:458) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:411) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:244) ... 10 more ``` which is the same error as in my original bug description https://issues.apache.org/jira/projects/FLINK/issues/FLINK-19692 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
