Dian Fu created FLINK-17946:
-------------------------------

             Summary: The config option "pipeline.jars" doesn't work if the job 
was executed via TableEnvironment.execute_sql and StatementSet.execute
                 Key: FLINK-17946
                 URL: https://issues.apache.org/jira/browse/FLINK-17946
             Project: Flink
          Issue Type: Bug
          Components: API / Python
            Reporter: Dian Fu
             Fix For: 1.11.0


For the following job:
{code}
import logging
import sys
import tempfile

from pyflink.table import BatchTableEnvironment, EnvironmentSettings


def word_count():
    content = "line Licensed to the Apache Software Foundation ASF under one " \
              "line or more contributor license agreements See the NOTICE file 
" \
              "line distributed with this work for additional information " \
              "line regarding copyright ownership The ASF licenses this file " \
              "to you under the Apache License Version the " \
              "License you may not use this file except in compliance " \
              "with the License"

    environment_settings = EnvironmentSettings.new_instance().in_batch_mode().\
        use_blink_planner().build()
    t_env = 
BatchTableEnvironment.create(environment_settings=environment_settings)
    t_env.get_config().get_configuration().set_string(
        "pipeline.jars",
        
"file:///Users/dianfu/workspace/wordcount_python/flink-csv-1.11.0-sql-jar.jar")

    # register Results table in table environment
    tmp_dir = tempfile.gettempdir()
    result_path = tmp_dir + '/result'

    logging.info("Results directory: %s", result_path)

    sink_ddl = """
        create table Results(
            word VARCHAR,
            `count` BIGINT
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '{}'
        )
        """.format(result_path)
    t_env.execute_sql(sink_ddl)

    elements = [(word, 1) for word in content.split(" ")]
    table = t_env.from_elements(elements, ["word", "count"]) \
        .group_by("word") \
        .select("word, count(1) as count")

    statement_set = t_env.create_statement_set()
    statement_set.add_insert("Results", table, overwrite=True)
    statement_set.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")

    word_count()
{code}

It will throw exceptions as following:
{code}
Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.table.filesystem.FileSystemOutputFormat.formatFactory of type 
org.apache.flink.table.filesystem.OutputFormatFactory in instance of 
org.apache.flink.table.filesystem.FileSystemOutputFormat
        at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
        at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at java.util.HashMap.readObject(HashMap.java:1404)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
        at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
        at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
        at 
org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
        ... 23 more
{code}

The reason is that the "pipeline.jars" option is not handled properly in 
StatementSet.execute and this issue also exists in TableEnvironment.execute_sql



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to