[ 
https://issues.apache.org/jira/browse/FLINK-30718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andriy Redko updated FLINK-30718:
---------------------------------
    Description: 
When using OpenSearchSink programmatically
{noformat}
 
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
            .createRemoteEnvironment("localhost", 8081);

        final Collection<Tuple4<String, String, Long, Long>> users = new 
ArrayList<>();
        users.add(Tuple4.of("u1", "admin", 100L, 200L));

        final DataStream<Tuple4<String, String, Long, Long>> source = 
env.fromCollection(users);
        final OpensearchSink<Tuple4<String, String, Long, Long>> sink =
            new OpensearchSinkBuilder<Tuple4<String, String, Long, Long>>()
                .setHosts(new HttpHost("localhost", 9200, "https"))
                .setEmitter( (element, ctx, indexer) -> {
                    indexer.add(
                        Requests
                            .indexRequest()
                            .index("users")
                            .id(element.f0)
                            .source(Map.ofEntries(
                                Map.entry("user_id", element.f0),
                                Map.entry("user_name", element.f1),
                                Map.entry("uv", element.f2),
                                Map.entry("pv", element.f3)
                            )));
                        })
                .setConnectionUsername("admin")
                .setConnectionPassword("admin")
                .setAllowInsecure(true)
                .setBulkFlushMaxActions(1)
                .build();

        source.sinkTo(sink);
        env.execute("Opensearch end to end sink test example");

{noformat}
the stream processing fails with the exception{color:#000000}
{color}
{noformat}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
        at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:399)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:162)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:681)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter of type 
org.apache.flink.connector.opensearch.sink.OpensearchEmitter in instance of 
org.apache.flink.connector.opensearch.sink.OpensearchSink
        at 
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
        at 
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
        at 
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
        at 
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
        at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
        at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
        ... 9 more


 {noformat}

  was:
When using OpenSearchSink programmatically
{noformat}
 
final StreamExecutionEnvironment env = StreamExecutionEnvironment
            .createRemoteEnvironment("localhost", 8081);

        final Collection<Tuple4<String, String, Long, Long>> users = new 
ArrayList<>();
        users.add(Tuple4.of("u1", "admin", 100L, 200L));

        final DataStream<Tuple4<String, String, Long, Long>> source = 
env.fromCollection(users);
        final OpensearchSink<Tuple4<String, String, Long, Long>> sink =
            new OpensearchSinkBuilder<Tuple4<String, String, Long, Long>>()
                .setHosts(new HttpHost("localhost", 9200, "https"))
                .setEmitter( (element, ctx, indexer) -> {
                    indexer.add(
                        Requests
                            .indexRequest()
                            .index("users")
                            .id(element.f0)
                            .source(Map.ofEntries(
                                Map.entry("user_id", element.f0),
                                Map.entry("user_name", element.f1),
                                Map.entry("uv", element.f2),
                                Map.entry("pv", element.f3)
                            )));
                        })
                .setConnectionUsername("admin")
                .setConnectionPassword("admin")
                .setAllowInsecure(true)
                .setBulkFlushMaxActions(1)
                .build();

        source.sinkTo(sink);
        env.execute("Opensearch end to end sink test example");

{noformat}
the stream processing fails with the exception{color:#000000}

{color}
{noformat}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
        at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:399)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:162)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:681)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter of type 
org.apache.flink.connector.opensearch.sink.OpensearchEmitter in instance of 
org.apache.flink.connector.opensearch.sink.OpensearchSink
        at 
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
        at 
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
        at 
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
        at 
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
        at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
        at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
        ... 9 more


 {noformat}


> Cannot assign instance of java.lang.invoke.SerializedLambda to field 
> org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter 
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30718
>                 URL: https://issues.apache.org/jira/browse/FLINK-30718
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Opensearch
>    Affects Versions: 1.16.0
>            Reporter: Andriy Redko
>            Priority: Major
>
> When using OpenSearchSink programmatically
> {noformat}
>  
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment
>             .createRemoteEnvironment("localhost", 8081);
>         final Collection<Tuple4<String, String, Long, Long>> users = new 
> ArrayList<>();
>         users.add(Tuple4.of("u1", "admin", 100L, 200L));
>         final DataStream<Tuple4<String, String, Long, Long>> source = 
> env.fromCollection(users);
>         final OpensearchSink<Tuple4<String, String, Long, Long>> sink =
>             new OpensearchSinkBuilder<Tuple4<String, String, Long, Long>>()
>                 .setHosts(new HttpHost("localhost", 9200, "https"))
>                 .setEmitter( (element, ctx, indexer) -> {
>                     indexer.add(
>                         Requests
>                             .indexRequest()
>                             .index("users")
>                             .id(element.f0)
>                             .source(Map.ofEntries(
>                                 Map.entry("user_id", element.f0),
>                                 Map.entry("user_name", element.f1),
>                                 Map.entry("uv", element.f2),
>                                 Map.entry("pv", element.f3)
>                             )));
>                         })
>                 .setConnectionUsername("admin")
>                 .setConnectionPassword("admin")
>                 .setAllowInsecure(true)
>                 .setBulkFlushMaxActions(1)
>                 .build();
>         source.sinkTo(sink);
>         env.execute("Opensearch end to end sink test example");
> {noformat}
> the stream processing fails with the exception{color:#000000}
> {color}
> {noformat}
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot instantiate user function.
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:399)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:162)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:681)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter of type 
> org.apache.flink.connector.opensearch.sink.OpensearchEmitter in instance of 
> org.apache.flink.connector.opensearch.sink.OpensearchSink
>       at 
> java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
>       at 
> java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
>       at 
> java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
>       at 
> java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
>       at 
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
>       at 
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
>       at 
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
>       at 
> java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
>       at 
> java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
>       at 
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
>       at 
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
>       at 
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
>       at 
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>       at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
>       ... 9 more
>  {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to