[ https://issues.apache.org/jira/browse/FLINK-18006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu updated FLINK-18006: ---------------------------- Priority: Blocker (was: Critical) > It will throw Invalid lambda deserialization Exception when writing to > elastic search with new format > ----------------------------------------------------------------------------------------------------- > > Key: FLINK-18006 > URL: https://issues.apache.org/jira/browse/FLINK-18006 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch, Table SQL / Client > Affects Versions: 1.11.0 > Environment: ElasticSearch version is 7.6.0 > Reporter: Shengkai Fang > Priority: Blocker > Fix For: 1.11.0 > > > My job follows: > {code:java} > // > create table csv( pageId VARCHAR, eventId VARCHAR, recvTime VARCHAR) with ( > 'connector' = 'filesystem', > 'path' = > '/Users/ohmeatball/Work/flink-sql-etl/data-generator/src/main/resources/user3.csv', > 'format' = 'csv' > ) > ----------------------------------------- > CREATE TABLE es_table ( > aggId varchar , > pageId varchar , > ts varchar , > expoCnt int , > clkCnt int > ) WITH ( > 'connector' = 'elasticsearch', > 'hosts' = 'http://localhost:9200', > 'index' = 'cli_test', > 'document-id.key-delimiter' = '$', > 'sink.bulk-flush.interval' = '1000', > 'format' = 'json' > ) > ----------------------------------------- > INSERT INTO es_table > SELECT pageId,eventId,cast(recvTime as varchar) as ts, 1, 1 from csv; > {code} > The full exception follows: > {code:java} > Sink(table=[default_catalog.default_database.es_table], fields=[aggId, > pageId, ts, expoCnt, clkCnt]) (1/1) (b51209fac96948c20e85b8df137287d3) > switched from RUNNING to FAILED on > org.apache.flink.runtime.jobmaster.slotpool.singlelogicals...@bb5ab41.sink(table=[default_catalog.default_database.es_table], > fields=[aggId, pageId, ts, expoCnt, clkCnt]) (1/1) > (b51209fac96948c20e85b8df137287d3) switched from RUNNING to FAILED on > org.apache.flink.runtime.jobmaster.slotpool.singlelogicals...@bb5ab41.org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot instantiate user function. at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:518) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:720) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:545) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_151]Caused by: > java.io.IOException: unexpected exception type at > java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682) > ~[?:1.8.0_151] at > java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > ~[?:1.8.0_151] at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 more > Caused by: java.lang.reflect.InvocationTargetExceptionCaused by: > java.lang.reflect.InvocationTargetException at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_151] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_151] at > java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) > ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_151] at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_151] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_151] at > java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > ~[?:1.8.0_151] at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 moreCaused by: > java.lang.IllegalArgumentException: Invalid lambda deserialization at > org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80) > ~[flink-sql-connector-elasticsearch7_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_151] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_151] at > java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) > ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_151] at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_151] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_151] at > java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > ~[?:1.8.0_151] > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > ~[?:1.8.0_151] at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > ~[?:1.8.0_151] at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) > ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 more > {code} > Notice: everything works fine with former connector grammer. -- This message was sent by Atlassian Jira (v8.3.4#803005)