[ 
https://issues.apache.org/jira/browse/FLINK-18006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-18006:
----------------------------
    Priority: Blocker  (was: Critical)

> It will throw Invalid lambda deserialization Exception when writing to 
> elastic search with new format
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18006
>                 URL: https://issues.apache.org/jira/browse/FLINK-18006
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / ElasticSearch, Table SQL / Client
>    Affects Versions: 1.11.0
>         Environment: ElasticSearch version is 7.6.0
>            Reporter: Shengkai Fang
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> My job follows:
> {code:java}
> // 
> create table csv( pageId VARCHAR, eventId VARCHAR, recvTime VARCHAR) with ( 
> 'connector' = 'filesystem',
>  'path' = 
> '/Users/ohmeatball/Work/flink-sql-etl/data-generator/src/main/resources/user3.csv',
>  'format' = 'csv'
>  )
> -----------------------------------------
> CREATE TABLE es_table (
>   aggId varchar ,
>   pageId varchar ,
>   ts varchar ,
>   expoCnt int ,
>   clkCnt int
> ) WITH (
> 'connector' = 'elasticsearch',
> 'hosts' = 'http://localhost:9200',
> 'index' = 'cli_test',
> 'document-id.key-delimiter' = '$',
> 'sink.bulk-flush.interval' = '1000',
> 'format' = 'json'
> )
> -----------------------------------------
> INSERT INTO es_table
> SELECT  pageId,eventId,cast(recvTime as varchar) as ts, 1, 1 from csv;
> {code}
> The full exception follows:
> {code:java}
> Sink(table=[default_catalog.default_database.es_table], fields=[aggId, 
> pageId, ts, expoCnt, clkCnt]) (1/1) (b51209fac96948c20e85b8df137287d3) 
> switched from RUNNING to FAILED on 
> org.apache.flink.runtime.jobmaster.slotpool.singlelogicals...@bb5ab41.sink(table=[default_catalog.default_database.es_table],
>  fields=[aggId, pageId, ts, expoCnt, clkCnt]) (1/1) 
> (b51209fac96948c20e85b8df137287d3) switched from RUNNING to FAILED on 
> org.apache.flink.runtime.jobmaster.slotpool.singlelogicals...@bb5ab41.org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>  Cannot instantiate user function. at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:518)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:720) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:545) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_151]Caused by: 
> java.io.IOException: unexpected exception type at 
> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682) 
> ~[?:1.8.0_151] at 
> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) 
> ~[?:1.8.0_151] at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 more
> Caused by: java.lang.reflect.InvocationTargetExceptionCaused by: 
> java.lang.reflect.InvocationTargetException at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_151] at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_151] at 
> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) 
> ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:1.8.0_151] at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_151] at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_151] at 
> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) 
> ~[?:1.8.0_151] at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 moreCaused by: 
> java.lang.IllegalArgumentException: Invalid lambda deserialization at 
> org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] 
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] 
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_151] at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_151] at 
> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) 
> ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:1.8.0_151] at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_151] at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) 
> ~[?:1.8.0_151] at 
> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) 
> ~[?:1.8.0_151]
>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) 
> ~[?:1.8.0_151] at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) 
> ~[?:1.8.0_151] at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 more
> {code}
> Notice: everything works fine with former connector grammer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to