[
https://issues.apache.org/jira/browse/FLINK-10483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Elias Levy closed FLINK-10483.
------------------------------
Resolution: Invalid
> Can't restore from a savepoint even with Allow Non Restored State enabled
> -------------------------------------------------------------------------
>
> Key: FLINK-10483
> URL: https://issues.apache.org/jira/browse/FLINK-10483
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing, Type Serialization System
> Affects Versions: 1.4.2
> Reporter: Elias Levy
> Priority: Major
>
> A trimmed streaming job fails a restore from a savepoint with an Unloadable
> class for type serializer error, even though the case class in question has
> been eliminated from the job and Allow Non Restored State is enabled.
> We have a job running on a Flink 1.4.2 cluster with two Kafka input streams,
> one of the streams is processed by an async function, and the output of the
> async function and the other original stream are consumed by a
> CoProcessOperator, that intern emits Scala case class instances, that go into
> a stateful ProcessFunction filter, and then into a sink. I.e.
> {code:java}
> source 1 -> async function --\
> |---> co process --> process
> --> sink
> source 2 --------------------------/
> {code}
> I eliminated most of the DAG, leaving only the source 1 --> async function
> portion of it. This removed the case class in question from the processing
> graph. When I try to restore from the savepoint, even if Allow Non Restored
> State is selected, the job fails to restore with the error "Deserialization
> of serializer erroed".
> This is the error being generated:
> {noformat}
> WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> at
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.readObject(Unknown Source)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> ... 14 more
> Caused by: java.lang.ClassNotFoundException:
> com.somewhere.TestJob$$anon$13$$anon$3
> at java.net.URLClassLoader.findClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Unknown Source)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
> at
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
> ... 24 more
> WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> at
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
> at
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.readObject(Unknown Source)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> ... 18 more
> Caused by: java.lang.ClassNotFoundException:
> com.somewhere.TestJob$$anon$13$$anon$3
> at java.net.URLClassLoader.findClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Unknown Source)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
> at
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
> ... 24 more
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error
> during disposal of stream operator.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> INFO org.apache.flink.runtime.taskmanager.Task - Source:
> Kafka Topic -> Async Function (1/1) (1de078fb77acdd16b7e021fb3e70339f)
> switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize operator state backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.IOException: Unable to restore operator state
> [_async_wait_operator_state_]. The previous serializer of the operator state
> must be present; the serializer could have been removed from the classpath,
> or its implementation have changed and could not be loaded. This is a
> temporary restriction that will be fixed in future versions.
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> ... 6 more
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)