Hi Shameet, No, you'll need to add support for the Snowflake dialect in Flink. You can find more information here at the JdbcDialect documentation [1]
Best regards, Martijn Visser https://twitter.com/MartijnVisser82 https://github.com/MartijnVisser [1] https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.html On Tue, 5 Apr 2022 at 11:15, Shameet Doshi < shameet.do...@firstperformance.com> wrote: > thanks Martijn > > So supplying the snowflake-jdbc jar as a dependency jar as i have done and > mentioning the driver property doesn't help ? > > t_env.get_config().get_configuration().set_string("pipeline.jars", > > "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.14.4.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.6.27.jar") > > > 'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver' > > > > On Tue, Apr 5, 2022 at 3:21 AM Martijn Visser <martijnvis...@apache.org> > wrote: > > > Hi Shameet, > > > > There's currently no open source Flink Snowflake connector/sink > available. > > As mentioned in the documentation [1] this requires the implementation > of a > > dialect. > > > > Best regards > > > > Martijn Visser > > https://twitter.com/MartijnVisser82 > > https://github.com/MartijnVisser > > > > [1] > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/ > > > > > > On Mon, 4 Apr 2022 at 19:41, Shameet Doshi < > > shameet.do...@firstperformance.com> wrote: > > > > > Hello, > > > > > > I have need to talk to snowflake from within pyflink preferably using > the > > > TABLE API > > > We are running pyflink inside AWS Kinesis Data Analysis application. > > > Installing any other python modules > > > > > > I was able to connect a mysql db but not able to connect to snowflake > db > > > > > > > > > > > > > > > > > > > > > import argparse > > > import logging > > > import sys > > > > > > from pyflink.common import Row > > > from pyflink.table import (EnvironmentSettings, TableEnvironment, > > > DataTypes) > > > from pyflink.table.expressions import lit, col > > > from pyflink.table.udf import udtf > > > > > > env_settings = ( > > > > > > > > > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > > > ) > > > t_env = TableEnvironment.create(environment_settings=env_settings) > > > > > > # write all the data to one file > > > #t_env.get_config().set("parallelism.default", "1") > > > > > > # define the source > > > > > > print("Executing word_count example with default input data set.") > > > print("Use --input to specify file input.") > > > tab = t_env.from_elements(map(lambda i: (i,), word_count_data), > > > DataTypes.ROW([DataTypes.FIELD('line', > > > DataTypes.STRING())])) > > > > > > # define the sink > > > > > > > > > > > > t_env.execute_sql(""" > > > CREATE TABLE MyUserTable ( > > > id BIGINT, > > > name STRING, > > > age INT, > > > status BOOLEAN, > > > PRIMARY KEY (id) NOT ENFORCED > > > ) WITH ( > > > 'connector' = 'jdbc', > > > 'url' = 'jdbc:snowflake:// > > > > > > myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE > > > ', > > > 'table-name' = 'users', > > > 'username' = 'devops', > > > 'password' = '50v*hrlM#SdK0euvnWR3', > > > 'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver' > > > ) > > > """) > > > > > > t_env.get_config().get_configuration().set_string("pipeline.jars", > > > > > > "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.13.9.jar") > > > > > > usersdata = t_env.execute_sql("SELECT name FROM MyUserTable where > > > name='amy'") > > > > > > usersdata.print() > > > > > > > > > > > > ERROR on execution : Caused by: java.lang.IllegalStateException: Cannot > > > handle such jdbc url: > > > jdbc:snowflake:// > > > > > > myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE > > > > > > > > > > > > Similar test works if I try to connect to MySQL > > > > > > t_env.execute_sql(""" > > > CREATE TABLE MyUserTable ( > > > id BIGINT, > > > name STRING, > > > age INT, > > > status BOOLEAN, > > > PRIMARY KEY (id) NOT ENFORCED > > > ) WITH ( > > > 'connector' = 'jdbc', > > > 'url' = 'jdbc:mysql://localhost:3306/mydatabase > > > <mysql://localhost:3306/mydatabase>', > > > 'table-name' = 'users', > > > 'username' = 'root', > > > 'password' = 'rootroot' > > > ) > > > """) > > > > > > t_env.get_config().get_configuration().set_string("pipeline.jars", > > > > > > "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar") > > > > > > usersdata = t_env.execute_sql("SELECT name FROM MyUserTable where > > > name='amy'") > > > > > > > > > -- > > > NOTICE: This communication may contain information which is > confidential > > > to > > > First Performance Corporation (FPC). If you are not the intended > > recipient > > > of this communication, please delete this email, destroy all copies, > and > > > alert the sender. If you are the intended recipient of this > > communication, > > > you should not copy, disclose or distribute this communication without > > the > > > authority of FPC. Any views expressed in this communication are those > of > > > the individual sender, except where the sender specifically states them > > to > > > be the views of FPC. Except as required by law, FPC does not represent, > > > warrant or guarantee that the integrity of this communication has been > > > maintained nor that the communication is free of errors, harmful code, > > > interception or interference. > > > > > > > -- > NOTICE: This communication may contain information which is confidential > to > First Performance Corporation (FPC). If you are not the intended recipient > of this communication, please delete this email, destroy all copies, and > alert the sender. If you are the intended recipient of this communication, > you should not copy, disclose or distribute this communication without the > authority of FPC. Any views expressed in this communication are those of > the individual sender, except where the sender specifically states them to > be the views of FPC. Except as required by law, FPC does not represent, > warrant or guarantee that the integrity of this communication has been > maintained nor that the communication is free of errors, harmful code, > interception or interference. >