[ 
https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716906#comment-16716906
 ] 

ASF GitHub Bot commented on FLINK-11083:
----------------------------------------

kisimple commented on a change in pull request #7267: [FLINK-11083][Table&SQL] 
CRowSerializerConfigSnapshot is not instantiable
URL: https://github.com/apache/flink/pull/7267#discussion_r240500235
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala
 ##########
 @@ -29,6 +41,72 @@ class CRowSerializerTest extends TestLogger {
   @Test
   def testDefaultConstructor(): Unit = {
     new CRowSerializer.CRowSerializerConfigSnapshot()
+
+    val serializerConfigSnapshotClass = Class.forName(
+      
"org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot")
 
 Review comment:
   Thanks for the review. Have updated the PR to address your comments.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> CRowSerializerConfigSnapshot is not instantiable
> ------------------------------------------------
>
>                 Key: FLINK-11083
>                 URL: https://issues.apache.org/jira/browse/FLINK-11083
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL, Type Serialization System
>            Reporter: boshu Zheng
>            Assignee: boshu Zheng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0, 1.7.1
>
>
> An exception was encountered when restarting a job with savepoint in our 
> production env,
> {code:java}
> 2018-12-04 20:28:25,091 INFO  10595 org.apache.flink.runtime.taskmanager.Task 
>                   :917  - _OurCustomOperator_ -> select: () -> to: Tuple2 -> 
> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING 
> to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) 
> from any of the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>       ... 5 more
> Caused by: java.lang.RuntimeException: The class 
> 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot'
>  is not instantiable: The class has no (implicit) public nullary constructor, 
> i.e. a constructor without arguments.
>       at 
> org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
>       at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>       at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>       at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218)
>       at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105)
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505)
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>       ... 7 more
> {code}
> I add tests to CRowSerializerTest to make sure this is definitely a bug,
> {code:java}
>   @Test
>   def testDefaultConstructor(): Unit = {
>     new CRowSerializer.CRowSerializerConfigSnapshot()
>     /////// This would fail the test
>     val serializerConfigSnapshotClass =
>      
> Class.forName("org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot")
>     InstantiationUtil.instantiate(serializerConfigSnapshotClass)
>   }
>   @Test
>   def testStateRestore(): Unit = {
>     class IKeyedProcessFunction extends KeyedProcessFunction[Integer, 
> Integer, Integer] {
>       var state: ListState[CRow] = _
>       override def open(parameters: Configuration): Unit = {
>         val stateDesc = new ListStateDescriptor[CRow]("CRow",
>           new CRowTypeInfo(new RowTypeInfo(Types.INT)))
>         state = getRuntimeContext.getListState(stateDesc)
>       }
>       override def processElement(value: Integer,
>           ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context,
>           out: Collector[Integer]): Unit = {
>         state.add(new CRow(Row.of(value), true))
>       }
>     }
>     val operator = new KeyedProcessOperator[Integer, Integer, Integer](new 
> IKeyedProcessFunction)
>     var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, 
> Integer, Integer](
>       operator,
>       new KeySelector[Integer, Integer] {
>         override def getKey(value: Integer): Integer= -1
>       },
>       Types.INT, 1, 1, 0)
>     testHarness.setup()
>     testHarness.open()
>     testHarness.processElement(new StreamRecord[Integer](1, 1L))
>     testHarness.processElement(new StreamRecord[Integer](2, 1L))
>     testHarness.processElement(new StreamRecord[Integer](3, 1L))
>     assertEquals(1, numKeyedStateEntries(operator))
>     val snapshot = testHarness.snapshot(0L, 0L)
>     testHarness.close()
>     testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, 
> Integer, Integer](
>       operator,
>       new KeySelector[Integer, Integer] {
>         override def getKey(value: Integer): Integer= -1
>       },
>       Types.INT, 1, 1, 0)
>     testHarness.setup()
>     /////// This would throw the same exception as our production app do.
>     testHarness.initializeState(snapshot)
>     testHarness.open()
>     assertEquals(1, numKeyedStateEntries(operator))
>     testHarness.close()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to