Hello,
I have need to talk to snowflake from within pyflink preferably using the TABLE
API
We are running pyflink inside AWS Kinesis Data Analysis application. Installing
any other python modules
I was able to connect a mysql db but not able to connect to snowflake db
import argparse
import logging
import sys
from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment,
DataTypes)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf
env_settings = (
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
t_env = TableEnvironment.create(environment_settings=env_settings)
# write all the data to one file
#t_env.get_config().set("parallelism.default", "1")
# define the source
print("Executing word_count example with default input data set.")
print("Use --input to specify file input.")
tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
DataTypes.ROW([DataTypes.FIELD('line',
DataTypes.STRING())]))
# define the sink
t_env.execute_sql("""
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' =
'jdbc:snowflake://myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE',
'table-name' = 'users',
'username' = 'devops',
'password' = '50v*hrlM#SdK0euvnWR3',
'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver'
)
""")
t_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.13.9.jar")
usersdata = t_env.execute_sql("SELECT name FROM MyUserTable where
name='amy'")
usersdata.print()
ERROR on execution : Caused by: java.lang.IllegalStateException: Cannot handle
such jdbc url:
jdbc:snowflake://myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
Similar test works if I try to connect to MySQL
t_env.execute_sql("""
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase
<mysql://localhost:3306/mydatabase>',
'table-name' = 'users',
'username' = 'root',
'password' = 'rootroot'
)
""")
t_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar")
usersdata = t_env.execute_sql("SELECT name FROM MyUserTable where
name='amy'")
--
NOTICE: This communication may contain information which is confidential to
First Performance Corporation (FPC). If you are not the intended recipient
of this communication, please delete this email, destroy all copies, and
alert the sender. If you are the intended recipient of this communication,
you should not copy, disclose or distribute this communication without the
authority of FPC. Any views expressed in this communication are those of
the individual sender, except where the sender specifically states them to
be the views of FPC. Except as required by law, FPC does not represent,
warrant or guarantee that the integrity of this communication has been
maintained nor that the communication is free of errors, harmful code,
interception or interference.