Hi Shameet, There's currently no open source Flink Snowflake connector/sink available. As mentioned in the documentation [1] this requires the implementation of a dialect.
Best regards Martijn Visser https://twitter.com/MartijnVisser82 https://github.com/MartijnVisser [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/ On Mon, 4 Apr 2022 at 19:41, Shameet Doshi < shameet.do...@firstperformance.com> wrote: > Hello, > > I have need to talk to snowflake from within pyflink preferably using the > TABLE API > We are running pyflink inside AWS Kinesis Data Analysis application. > Installing any other python modules > > I was able to connect a mysql db but not able to connect to snowflake db > > > > > > > import argparse > import logging > import sys > > from pyflink.common import Row > from pyflink.table import (EnvironmentSettings, TableEnvironment, > DataTypes) > from pyflink.table.expressions import lit, col > from pyflink.table.udf import udtf > > env_settings = ( > > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > ) > t_env = TableEnvironment.create(environment_settings=env_settings) > > # write all the data to one file > #t_env.get_config().set("parallelism.default", "1") > > # define the source > > print("Executing word_count example with default input data set.") > print("Use --input to specify file input.") > tab = t_env.from_elements(map(lambda i: (i,), word_count_data), > DataTypes.ROW([DataTypes.FIELD('line', > DataTypes.STRING())])) > > # define the sink > > > > t_env.execute_sql(""" > CREATE TABLE MyUserTable ( > id BIGINT, > name STRING, > age INT, > status BOOLEAN, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:snowflake:// > myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE > ', > 'table-name' = 'users', > 'username' = 'devops', > 'password' = '50v*hrlM#SdK0euvnWR3', > 'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver' > ) > """) > > t_env.get_config().get_configuration().set_string("pipeline.jars", > "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.13.9.jar") > > usersdata = t_env.execute_sql("SELECT name FROM MyUserTable where > name='amy'") > > usersdata.print() > > > > ERROR on execution : Caused by: java.lang.IllegalStateException: Cannot > handle such jdbc url: > jdbc:snowflake:// > myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE > > > > Similar test works if I try to connect to MySQL > > t_env.execute_sql(""" > CREATE TABLE MyUserTable ( > id BIGINT, > name STRING, > age INT, > status BOOLEAN, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/mydatabase > <mysql://localhost:3306/mydatabase>', > 'table-name' = 'users', > 'username' = 'root', > 'password' = 'rootroot' > ) > """) > > t_env.get_config().get_configuration().set_string("pipeline.jars", > "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar") > > usersdata = t_env.execute_sql("SELECT name FROM MyUserTable where > name='amy'") > > > -- > NOTICE: This communication may contain information which is confidential > to > First Performance Corporation (FPC). If you are not the intended recipient > of this communication, please delete this email, destroy all copies, and > alert the sender. If you are the intended recipient of this communication, > you should not copy, disclose or distribute this communication without the > authority of FPC. Any views expressed in this communication are those of > the individual sender, except where the sender specifically states them to > be the views of FPC. Except as required by law, FPC does not represent, > warrant or guarantee that the integrity of this communication has been > maintained nor that the communication is free of errors, harmful code, > interception or interference. >