[ 
https://issues.apache.org/jira/browse/FLINK-30718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andriy Redko resolved FLINK-30718.
----------------------------------
    Resolution: Won't Fix

> Cannot assign instance of java.lang.invoke.SerializedLambda to field 
> org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter 
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30718
>                 URL: https://issues.apache.org/jira/browse/FLINK-30718
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Opensearch
>    Affects Versions: 1.16.0
>            Reporter: Andriy Redko
>            Priority: Major
>         Attachments: example-opensearch.zip
>
>
> When using OpenSearchSink (Apache Flink OpenSearch Connector 1.0.0) 
> programmatically
> {noformat}
>  
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment
>             .createRemoteEnvironment("localhost", 8081);
>         final Collection<Tuple4<String, String, Long, Long>> users = new 
> ArrayList<>();
>         users.add(Tuple4.of("u1", "admin", 100L, 200L));
>         final DataStream<Tuple4<String, String, Long, Long>> source = 
> env.fromCollection(users);
>         final OpensearchSink<Tuple4<String, String, Long, Long>> sink =
>             new OpensearchSinkBuilder<Tuple4<String, String, Long, Long>>()
>                 .setHosts(new HttpHost("localhost", 9200, "https"))
>                 .setEmitter( (element, ctx, indexer) -> {
>                     indexer.add(
>                         Requests
>                             .indexRequest()
>                             .index("users")
>                             .id(element.f0)
>                             .source(Map.ofEntries(
>                                 Map.entry("user_id", element.f0),
>                                 Map.entry("user_name", element.f1),
>                                 Map.entry("uv", element.f2),
>                                 Map.entry("pv", element.f3)
>                             )));
>                         })
>                 .setConnectionUsername("admin")
>                 .setConnectionPassword("admin")
>                 .setAllowInsecure(true)
>                 .setBulkFlushMaxActions(1)
>                 .build();
>         source.sinkTo(sink);
>         env.execute("Opensearch end to end sink test example");
> {noformat}
> the stream processing fails with the exception
> {noformat}
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot instantiate user function.
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:399)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:162)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:681)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter of type 
> org.apache.flink.connector.opensearch.sink.OpensearchEmitter in instance of 
> org.apache.flink.connector.opensearch.sink.OpensearchSink
>       at 
> java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
>       at 
> java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
>       at 
> java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
>       at 
> java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
>       at 
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
>       at 
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
>       at 
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
>       at 
> java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
>       at 
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
>       at 
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
>       at 
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
>       at 
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
>       at 
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>       at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
>       ... 9 more
>  {noformat}
> Reproducer project attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to