Andrey Starostin created FLINK-38054:
----------------------------------------
Summary: Kryo Deserialization Error in ElasticSearch 8 Sink 3.1.0
With Flink 1.20.0
Key: FLINK-38054
URL: https://issues.apache.org/jira/browse/FLINK-38054
Project: Flink
Issue Type: Bug
Components: Connectors / ElasticSearch
Affects Versions: 1.20.0, elasticsearch-3.1.0
Environment: The issue is observed in AWS Managed Flink 1.20.0 and
when running the Job on a local Flink 1.20.0 cluster.
The issue is not observed when running the Job with a MiniCluster.
Reporter: Andrey Starostin
When running a Flink Job which uses ElasticSearch 8 Sink submitted as a fat JAR
on a Flink cluster, if the Job takes a checkpoint when there are ElasticSearch
operations being buffered, and then fails, it cannot recover from this
checkpoint.
The following error prevents recovery:
```
com.esotericsoftware.kryo.KryoException: Unable to find class:
co.elastic.clients.elasticsearch.core.bulk.UpdateOperation
Serialization trace:
bulkOperationVariant (org.apache.flink.connector.elasticsearch.sink.Operation)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.connector.elasticsearch.sink.OperationSerializer.deserialize(OperationSerializer.java:51)
at
org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:39)
at
org.apache.flink.connector.elasticsearch.sink.Elasticsearch8AsyncSinkSerializer.deserializeRequestFromStream(Elasticsearch8AsyncSinkSerializer.java:30)
at
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81)
at
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39)
at
org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
at
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119)
at
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException:
co.elastic.clients.elasticsearch.core.bulk.UpdateOperation
at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 29 more
```
The failing `UpdateOperation` class exists in the Jar.
I have attempted to dig deeper, and it seems like this class is not visible
from the context of ElasticSearch connector's OperationSerializer class.
I have produced a minimal reproduction example in the github repository:
[https://github.com/mayorandrew/flink-elasticsearch-deserialization-bug].
--
This message was sent by Atlassian Jira
(v8.20.10#820010)