Hi Shameet,

There's currently no open source Flink Snowflake connector/sink available.
As mentioned in the documentation [1] this requires the implementation of a
dialect.

Best regards

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/


On Mon, 4 Apr 2022 at 19:41, Shameet Doshi <
shameet.do...@firstperformance.com> wrote:

> Hello,
>
> I have need to talk to snowflake from within pyflink preferably using the
> TABLE API
> We are running pyflink inside AWS Kinesis Data Analysis application.
> Installing any other python modules
>
> I was able to connect a mysql db but not able to connect to snowflake db
>
>
>
>
>
>
>     import argparse
>     import logging
>     import sys
>
>     from pyflink.common import Row
>     from pyflink.table import (EnvironmentSettings, TableEnvironment,
>                            DataTypes)
>     from pyflink.table.expressions import lit, col
>     from pyflink.table.udf import udtf
>
>     env_settings = (
>
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>     )
>     t_env  = TableEnvironment.create(environment_settings=env_settings)
>
>     # write all the data to one file
>     #t_env.get_config().set("parallelism.default", "1")
>
>     # define the source
>
>     print("Executing word_count example with default input data set.")
>     print("Use --input to specify file input.")
>     tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
>                                 DataTypes.ROW([DataTypes.FIELD('line',
> DataTypes.STRING())]))
>
>     # define the sink
>
>
>
>     t_env.execute_sql("""
>     CREATE TABLE MyUserTable (
>   id BIGINT,
>   name STRING,
>   age INT,
>   status BOOLEAN,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:snowflake://
> myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
> ',
>    'table-name' = 'users',
>    'username' = 'devops',
>    'password' = '50v*hrlM#SdK0euvnWR3',
>    'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver'
> )
> """)
>
>     t_env.get_config().get_configuration().set_string("pipeline.jars",
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.13.9.jar")
>
>     usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable where
> name='amy'")
>
>     usersdata.print()
>
>
>
> ERROR on execution : Caused by: java.lang.IllegalStateException: Cannot
> handle such jdbc url:
> jdbc:snowflake://
> myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
>
>
>
> Similar test works if I try to connect to MySQL
>
> t_env.execute_sql("""
>     CREATE TABLE MyUserTable (
>   id BIGINT,
>   name STRING,
>   age INT,
>   status BOOLEAN,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://localhost:3306/mydatabase
> <mysql://localhost:3306/mydatabase>',
>    'table-name' = 'users',
>    'username' = 'root',
>    'password' = 'rootroot'
> )
> """)
>
>     t_env.get_config().get_configuration().set_string("pipeline.jars",
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar")
>
>     usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable where
> name='amy'")
>
>
> --
> NOTICE: This communication may contain information which is confidential
> to
> First Performance Corporation (FPC). If you are not the intended recipient
> of this communication, please delete this email, destroy all copies, and
> alert the sender. If you are the intended recipient of this communication,
> you should not copy, disclose or distribute this communication without the
> authority of FPC. Any views expressed in this communication are those of
> the individual sender, except where the sender specifically states them to
> be the views of FPC. Except as required by law, FPC does not represent,
> warrant or guarantee that the integrity of this communication has been
> maintained nor that the communication is free of errors, harmful code,
> interception or interference.
>

Reply via email to