[
https://issues.apache.org/jira/browse/FLINK-30718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682691#comment-17682691
]
Andriy Redko commented on FLINK-30718:
--------------------------------------
The issue is caused by Java lambda serialization mechanism: the classpath (on a
Flink side) should contain the emitter implementation, otherwise the
`SerializedLambda` would never be reified.
> Cannot assign instance of java.lang.invoke.SerializedLambda to field
> org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-30718
> URL: https://issues.apache.org/jira/browse/FLINK-30718
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Opensearch
> Affects Versions: 1.16.0
> Reporter: Andriy Redko
> Priority: Major
> Attachments: example-opensearch.zip
>
>
> When using OpenSearchSink (Apache Flink OpenSearch Connector 1.0.0)
> programmatically
> {noformat}
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
> .createRemoteEnvironment("localhost", 8081);
> final Collection<Tuple4<String, String, Long, Long>> users = new
> ArrayList<>();
> users.add(Tuple4.of("u1", "admin", 100L, 200L));
> final DataStream<Tuple4<String, String, Long, Long>> source =
> env.fromCollection(users);
> final OpensearchSink<Tuple4<String, String, Long, Long>> sink =
> new OpensearchSinkBuilder<Tuple4<String, String, Long, Long>>()
> .setHosts(new HttpHost("localhost", 9200, "https"))
> .setEmitter( (element, ctx, indexer) -> {
> indexer.add(
> Requests
> .indexRequest()
> .index("users")
> .id(element.f0)
> .source(Map.ofEntries(
> Map.entry("user_id", element.f0),
> Map.entry("user_name", element.f1),
> Map.entry("uv", element.f2),
> Map.entry("pv", element.f3)
> )));
> })
> .setConnectionUsername("admin")
> .setConnectionPassword("admin")
> .setAllowInsecure(true)
> .setBulkFlushMaxActions(1)
> .build();
> source.sinkTo(sink);
> env.execute("Opensearch end to end sink test example");
> {noformat}
> the stream processing fails with the exception
> {noformat}
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:399)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:162)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> java.lang.invoke.SerializedLambda to field
> org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter of type
> org.apache.flink.connector.opensearch.sink.OpensearchEmitter in instance of
> org.apache.flink.connector.opensearch.sink.OpensearchSink
> at
> java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
> at
> java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
> at
> java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
> at
> java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
> at
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
> at
> java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
> at
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
> ... 9 more
> {noformat}
> Reproducer project attached.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)