[ 
https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-11083:
----------------------------------
    Fix Version/s: 1.7.1
                   1.8.0

> CRowSerializerConfigSnapshot is not instantiable
> ------------------------------------------------
>
>                 Key: FLINK-11083
>                 URL: https://issues.apache.org/jira/browse/FLINK-11083
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL, Type Serialization System
>            Reporter: boshu Zheng
>            Assignee: boshu Zheng
>            Priority: Major
>             Fix For: 1.8.0, 1.7.1
>
>
> An exception was encountered when restarting a job with savepoint in our 
> production env,
> {code:java}
> 2018-12-04 20:28:25,091 INFO  10595 org.apache.flink.runtime.taskmanager.Task 
>                   :917  - _OurCustomOperator_ -> select: () -> to: Tuple2 -> 
> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING 
> to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) 
> from any of the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>       ... 5 more
> Caused by: java.lang.RuntimeException: The class 
> 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot'
>  is not instantiable: The class has no (implicit) public nullary constructor, 
> i.e. a constructor without arguments.
>       at 
> org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
>       at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>       at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>       at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218)
>       at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105)
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505)
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>       ... 7 more
> {code}
> I add tests to CRowSerializerTest to make sure this is definitely a bug,
> {code:java}
>   @Test
>   def testDefaultConstructor(): Unit = {
>     new CRowSerializer.CRowSerializerConfigSnapshot()
>     /////// This would fail the test
>     val serializerConfigSnapshotClass =
>      
> Class.forName("org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot")
>     InstantiationUtil.instantiate(serializerConfigSnapshotClass)
>   }
>   @Test
>   def testStateRestore(): Unit = {
>     class IKeyedProcessFunction extends KeyedProcessFunction[Integer, 
> Integer, Integer] {
>       var state: ListState[CRow] = _
>       override def open(parameters: Configuration): Unit = {
>         val stateDesc = new ListStateDescriptor[CRow]("CRow",
>           new CRowTypeInfo(new RowTypeInfo(Types.INT)))
>         state = getRuntimeContext.getListState(stateDesc)
>       }
>       override def processElement(value: Integer,
>           ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context,
>           out: Collector[Integer]): Unit = {
>         state.add(new CRow(Row.of(value), true))
>       }
>     }
>     val operator = new KeyedProcessOperator[Integer, Integer, Integer](new 
> IKeyedProcessFunction)
>     var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, 
> Integer, Integer](
>       operator,
>       new KeySelector[Integer, Integer] {
>         override def getKey(value: Integer): Integer= -1
>       },
>       Types.INT, 1, 1, 0)
>     testHarness.setup()
>     testHarness.open()
>     testHarness.processElement(new StreamRecord[Integer](1, 1L))
>     testHarness.processElement(new StreamRecord[Integer](2, 1L))
>     testHarness.processElement(new StreamRecord[Integer](3, 1L))
>     assertEquals(1, numKeyedStateEntries(operator))
>     val snapshot = testHarness.snapshot(0L, 0L)
>     testHarness.close()
>     testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, 
> Integer, Integer](
>       operator,
>       new KeySelector[Integer, Integer] {
>         override def getKey(value: Integer): Integer= -1
>       },
>       Types.INT, 1, 1, 0)
>     testHarness.setup()
>     /////// This would throw the same exception as our production app do.
>     testHarness.initializeState(snapshot)
>     testHarness.open()
>     assertEquals(1, numKeyedStateEntries(operator))
>     testHarness.close()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to