[ 
https://issues.apache.org/jira/browse/FLINK-13159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900733#comment-16900733
 ] 

Yun Tang commented on FLINK-13159:
----------------------------------

Before Flink-1.8, {{PojoSerializer}} would always use the [1st 
constructor|https://github.com/apache/flink/blob/56c3e7cd653e4cb2ad0a76ca317aa9fa1d564dc2/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L119]
 which assign {{Thread.currentThread().getContextClassLoader()}} to private 
field {{ClassLoader cl}}. The [2nd 
constructor|https://github.com/apache/flink/blob/56c3e7cd653e4cb2ad0a76ca317aa9fa1d564dc2/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L147]
 without {{cl}} assigned would only be used to ensure the compatibility.

However, after Flink-1.8, we would use the [2nd 
constructor|https://github.com/apache/flink/blob/a0d236fba7c6abdabb461aa504b1e088a3982c31/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L142]
 to create a restore serializer or a reconfigured serializer from a 
{{PojoSerializerSnapshot}}. Unfortunately, the restored serializer does not 
contain the class loader.

I could use below code within {{PojoSubclassSerializerTest}} to reproduce this.
{code:java}
@Test
public void testRestorePojoSerializer() throws IOException {
   TypeSerializer<TestUserClassBase> serializer = createSerializer();
   TestUserClass2 bar = new TestUserClass2(
      ThreadLocalRandom.current().nextInt(), "bar", 
ThreadLocalRandom.current().nextFloat());
   ByteArrayOutputStream out = new ByteArrayOutputStream();
   try (DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(out)) {
      serializer.serialize(bar, outView);
      DataInputViewStreamWrapper inputViewStreamWrapper =
         new DataInputViewStreamWrapper(new 
ByteArrayInputStream(out.toByteArray()));
      serializer.deserialize(inputViewStreamWrapper);
   }
   TypeSerializerSnapshot<TestUserClassBase> snapshot = 
serializer.snapshotConfiguration();
   TypeSerializer<TestUserClassBase> restoreSerializer = 
snapshot.restoreSerializer();
   try (DataInputViewStreamWrapper inputView =
          new DataInputViewStreamWrapper(new 
ByteArrayInputStream(out.toByteArray()))) {
      restoreSerializer.deserialize(inputView);
   }
}
 {code}

[~tzulitai], what do you think of this? If this is really a bug, I am glad to 
help to resolve this issue.

> java.lang.ClassNotFoundException when restore job
> -------------------------------------------------
>
>                 Key: FLINK-13159
>                 URL: https://issues.apache.org/jira/browse/FLINK-13159
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System
>            Reporter: kring
>            Priority: Critical
>         Attachments: image-2019-08-05-17-29-40-351.png, 
> image-2019-08-05-17-32-44-988.png
>
>
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_b398b3dd4c544ddf2d47a0cc47d332f4_(1/6) from 
> any of the 1 prov
> ided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 5 common frames omitted
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:130)
> at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:489)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 7 common frames omitted
> Caused by: java.lang.RuntimeException: Cannot instantiate class.
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:384)
> at 
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
> at 
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:290)
> at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:251)
> at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:127)
> ... 11 common frames omitted
> Caused by: java.lang.ClassNotFoundException: xxx
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:382)
> ... 17 common frames omitted
> {code}
> A strange problem with Flink is that after a task has been running properly 
> for a period of time, if any exception (such as ask timeout or ES request 
> timeout) is thrown, the task restart will report the above error (xxx is a 
> business model), and ten subsequent retries will not succeed, but the task 
> will be resubmitted. Then it can run normally. In addition, there are three 
> other tasks running at the same time, none of which has the problem.
> My flink version is 1.8.0.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to