piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494788041
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash.
Bucket will be used to save CSV files which will end up in Snowflake. Those CSV
files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which maps data from a PCollection
to an array of String values before the write operation saves the data to
temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+ return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for
authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the
target table does not exist. The following values are supported:
+ - CREATE_IF_NEEDED - default behaviour. The write operation checks whether
the specified target table exists; if it does not, the write operation attempts
to create the table Specify the schema for the target table using the
table_schema parameter.
+ - CREATE_NEVER - The write operation fails if the target table does not
exist.
+
+- `write_disposition` Defines the write behaviour based on the table where
data will be written to. The following values are supported:
+ - APPEND - Default behaviour. Written data is added to the existing rows in
the table,
+ - EMPTY - The target table must be empty; otherwise, the write operation
fails,
+ - TRUNCATE - The write operation deletes all rows from the target table
before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to
CREATE_IF_NEEDED, the table_schema parameter enables specifying the schema
for the created target table. A table schema is as JSON with the following
structure:
Review comment:
Done. I added a link to
https://docs.snowflake.com/en/sql-reference/data-types.html
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]