[
https://issues.apache.org/jira/browse/FLINK-13159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900733#comment-16900733
]
Yun Tang commented on FLINK-13159:
----------------------------------
Before Flink-1.8, {{PojoSerializer}} would always use the [1st
constructor|https://github.com/apache/flink/blob/56c3e7cd653e4cb2ad0a76ca317aa9fa1d564dc2/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L119]
which assign {{Thread.currentThread().getContextClassLoader()}} to private
field {{ClassLoader cl}}. The [2nd
constructor|https://github.com/apache/flink/blob/56c3e7cd653e4cb2ad0a76ca317aa9fa1d564dc2/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L147]
without {{cl}} assigned would only be used to ensure the compatibility.
However, after Flink-1.8, we would use the [2nd
constructor|https://github.com/apache/flink/blob/a0d236fba7c6abdabb461aa504b1e088a3982c31/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L142]
to create a restore serializer or a reconfigured serializer from a
{{PojoSerializerSnapshot}}. Unfortunately, the restored serializer does not
contain the class loader.
I could use below code within {{PojoSubclassSerializerTest}} to reproduce this.
{code:java}
@Test
public void testRestorePojoSerializer() throws IOException {
TypeSerializer<TestUserClassBase> serializer = createSerializer();
TestUserClass2 bar = new TestUserClass2(
ThreadLocalRandom.current().nextInt(), "bar",
ThreadLocalRandom.current().nextFloat());
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (DataOutputViewStreamWrapper outView = new
DataOutputViewStreamWrapper(out)) {
serializer.serialize(bar, outView);
DataInputViewStreamWrapper inputViewStreamWrapper =
new DataInputViewStreamWrapper(new
ByteArrayInputStream(out.toByteArray()));
serializer.deserialize(inputViewStreamWrapper);
}
TypeSerializerSnapshot<TestUserClassBase> snapshot =
serializer.snapshotConfiguration();
TypeSerializer<TestUserClassBase> restoreSerializer =
snapshot.restoreSerializer();
try (DataInputViewStreamWrapper inputView =
new DataInputViewStreamWrapper(new
ByteArrayInputStream(out.toByteArray()))) {
restoreSerializer.deserialize(inputView);
}
}
{code}
[~tzulitai], what do you think of this? If this is really a bug, I am glad to
help to resolve this issue.
> java.lang.ClassNotFoundException when restore job
> -------------------------------------------------
>
> Key: FLINK-13159
> URL: https://issues.apache.org/jira/browse/FLINK-13159
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System
> Reporter: kring
> Priority: Critical
> Attachments: image-2019-08-05-17-29-40-351.png,
> image-2019-08-05-17-32-44-988.png
>
>
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_b398b3dd4c544ddf2d47a0cc47d332f4_(1/6) from
> any of the 1 prov
> ided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 5 common frames omitted
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:130)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:489)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 7 common frames omitted
> Caused by: java.lang.RuntimeException: Cannot instantiate class.
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:384)
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:290)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:251)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:127)
> ... 11 common frames omitted
> Caused by: java.lang.ClassNotFoundException: xxx
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:382)
> ... 17 common frames omitted
> {code}
> A strange problem with Flink is that after a task has been running properly
> for a period of time, if any exception (such as ask timeout or ES request
> timeout) is thrown, the task restart will report the above error (xxx is a
> business model), and ten subsequent retries will not succeed, but the task
> will be resubmitted. Then it can run normally. In addition, there are three
> other tasks running at the same time, none of which has the problem.
> My flink version is 1.8.0.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)