[ 
https://issues.apache.org/jira/browse/FLINK-17946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-17946:
----------------------------
    Affects Version/s: 1.11.0

> The config option "pipeline.jars" doesn't work if the job was executed via 
> TableEnvironment.execute_sql and StatementSet.execute
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17946
>                 URL: https://issues.apache.org/jira/browse/FLINK-17946
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.11.0
>            Reporter: Dian Fu
>            Priority: Major
>             Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import sys
> import tempfile
> from pyflink.table import BatchTableEnvironment, EnvironmentSettings
> def word_count():
>     content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>               "line or more contributor license agreements See the NOTICE 
> file " \
>               "line distributed with this work for additional information " \
>               "line regarding copyright ownership The ASF licenses this file 
> " \
>               "to you under the Apache License Version the " \
>               "License you may not use this file except in compliance " \
>               "with the License"
>     environment_settings = 
> EnvironmentSettings.new_instance().in_batch_mode().\
>         use_blink_planner().build()
>     t_env = 
> BatchTableEnvironment.create(environment_settings=environment_settings)
>     t_env.get_config().get_configuration().set_string(
>         "pipeline.jars",
>         
> "file:///Users/dianfu/workspace/wordcount_python/flink-csv-1.11.0-sql-jar.jar")
>     # register Results table in table environment
>     tmp_dir = tempfile.gettempdir()
>     result_path = tmp_dir + '/result'
>     logging.info("Results directory: %s", result_path)
>     sink_ddl = """
>         create table Results(
>             word VARCHAR,
>             `count` BIGINT
>         ) with (
>             'connector' = 'filesystem',
>             'format' = 'csv',
>             'path' = '{}'
>         )
>         """.format(result_path)
>     t_env.execute_sql(sink_ddl)
>     elements = [(word, 1) for word in content.split(" ")]
>     table = t_env.from_elements(elements, ["word", "count"]) \
>         .group_by("word") \
>         .select("word, count(1) as count")
>     statement_set = t_env.create_statement_set()
>     statement_set.add_insert("Results", table, overwrite=True)
>     statement_set.execute()
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
>     word_count()
> {code}
> It will throw exceptions as following:
> {code}
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.flink.table.filesystem.FileSystemOutputFormat.formatFactory of 
> type org.apache.flink.table.filesystem.OutputFormatFactory in instance of 
> org.apache.flink.table.filesystem.FileSystemOutputFormat
>       at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
>       at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>       at java.util.HashMap.readObject(HashMap.java:1404)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>       at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>       at 
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
>       ... 23 more
> {code}
> The reason is that the "pipeline.jars" option is not handled properly in 
> StatementSet.execute and so it cannot find the jar specified via 
> "pipeline.jars". This issue also exists in TableEnvironment.execute_sql



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to