TheNeuralBit commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r493908171
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
Review comment:
```suggestion
- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
```
All of the javadoc and pydoc links should refer to `current` instead of a
specific version number
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
Review comment:
nit: please make sure only the argument names are code formatted when
you are referencing two different args, e.g.:
```suggestion
- `table` or `query` Specifies a Snowflake table name or custom SQL query
```
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
Review comment:
```suggestion
{{< highlight py >}}
```
Please specify `py` or `java` in all the relevant code blocks so that syntax
is properly highlighted
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
Review comment:
These exampels can drop the pipeline options and instead just focus on
applying the transform to the pipeline, e.g.:
```py
(p | <SOURCE>
| WriteToSnowflake(...)
```
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash.
Bucket will be used to save CSV files which will end up in Snowflake. Those CSV
files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which maps data from a PCollection
to an array of String values before the write operation saves the data to
temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+ return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for
authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the
target table does not exist. The following values are supported:
+ - CREATE_IF_NEEDED - default behaviour. The write operation checks whether
the specified target table exists; if it does not, the write operation attempts
to create the table Specify the schema for the target table using the
table_schema parameter.
+ - CREATE_NEVER - The write operation fails if the target table does not
exist.
+
+- `write_disposition` Defines the write behaviour based on the table where
data will be written to. The following values are supported:
+ - APPEND - Default behaviour. Written data is added to the existing rows in
the table,
+ - EMPTY - The target table must be empty; otherwise, the write operation
fails,
+ - TRUNCATE - The write operation deletes all rows from the target table
before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to
CREATE_IF_NEEDED, the table_schema parameter enables specifying the schema
for the created target table. A table schema is as JSON with the following
structure:
Review comment:
Is there documentation we can refer to about how all of these types are
encoded in CSV?
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash.
Bucket will be used to save CSV files which will end up in Snowflake. Those CSV
files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which maps data from a PCollection
to an array of String values before the write operation saves the data to
temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+ return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for
authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the
target table does not exist. The following values are supported:
+ - CREATE_IF_NEEDED - default behaviour. The write operation checks whether
the specified target table exists; if it does not, the write operation attempts
to create the table Specify the schema for the target table using the
table_schema parameter.
+ - CREATE_NEVER - The write operation fails if the target table does not
exist.
+
+- `write_disposition` Defines the write behaviour based on the table where
data will be written to. The following values are supported:
+ - APPEND - Default behaviour. Written data is added to the existing rows in
the table,
+ - EMPTY - The target table must be empty; otherwise, the write operation
fails,
+ - TRUNCATE - The write operation deletes all rows from the target table
before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to
CREATE_IF_NEEDED, the table_schema parameter enables specifying the schema
for the created target table. A table schema is as JSON with the following
structure:
+{{< highlight >}}
+{"schema":[
+ {
+ "dataType":{"type":"<COLUMN DATA TYPE>"},
+ "name":"<COLUMN NAME> ",
+ "nullable": <NULLABLE>
+ },
+ ...
+ ]}
+{{< /highlight >}}
+All supported data types:
+{{< highlight >}}
+{"dataType":{"type":"date"},"name":"","nullable":false},
+{"dataType":{"type":"datetime"},"name":"","nullable":false},
+{"dataType":{"type":"time"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+{"dataType":{"type":"boolean"},"name":"","nullable":false},
+{"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+{"dataType":{"type":"double"},"name":"","nullable":false},
+{"dataType":{"type":"float"},"name":"","nullable":false},
+{"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+{"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+{"dataType":{"type":"numeric","precision":40,"scale":2},"name":"","nullable":false},
+{"dataType":{"type":"real"},"name":"","nullable":false},
+{"dataType":{"type":"array"},"name":"","nullable":false},
+{"dataType":{"type":"object"},"name":"","nullable":false},
+{"dataType":{"type":"variant"},"name":"","nullable":true},
+{"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"char","length":1},"name":"","nullable":false},
+{"dataType":{"type":"string","length":null},"name":"","nullable":false},
+{"dataType":{"type":"text","length":null},"name":"","nullable":false},
+{"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]
Review comment:
Does the fact that all of these have `"nullable":false` indicate nulls
aren't supported? This could be more concise if you just include the "dataType"
object, e.g.:
```suggestion
{"type":"varchar","length":100}
```
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
Review comment:
```suggestion
One of the functions of SnowflakeIO is reading Snowflake tables - either
full tables via table name or custom data via query. Output of the read
transform is a
[PCollection](https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
```
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
Review comment:
```suggestion
- `warehouse` specifies Snowflake warehouse name. If not specified the
user's default will be used.
```
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
Review comment:
```suggestion
```
I don't think this is necessary
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
Review comment:
```suggestion
table=<SNOWFLAKE TABLE>,
```
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
Review comment:
```suggestion
- `role` specifies Snowflake role. If not specified the user's default will
be used.
```
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
Review comment:
This page shouldn't state Flink is the only runner the supports
cross-language, instead please refer users to
https://beam.apache.org/roadmap/connectors-multi-sdk/#cross-language-transforms-api-and-expansion-service
for information about which runners support it.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
Review comment:
This is optional right? In other cross-language transforms we will
download the appropriate expansion service jar and start it for you if a URL
isn't specified.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -204,14 +370,121 @@ Then:
**Note**:
SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to
table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)).
StagingBucketName will be used to save CSV files which will end up in
Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
+**Optional** for batching:
+- `.withQuotationMark()`
+ - Default value: `‘` (single quotation mark).
+ - Accepts String with one character. It will surround all text (String)
fields saved to CSV. It should be one of the accepted characters by
[Snowflake’s](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html)
[FIELD_OPTIONALLY_ENCLOSED_BY](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html)
parameter (double quotation mark, single quotation mark or none).
+ - Example: `.withQuotationMark("'")`
+### Streaming write (from unbounded source)
+It is required to create a
[SnowPipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe.html) in
the Snowflake console. SnowPipe should use the same integration and the same
bucket as specified by .withStagingBucketName and .withStorageIntegrationName
methods. The write operation might look as follows:
+{{< highlight java >}}
+data.apply(
+ SnowflakeIO.<type>write()
+ .withStagingBucketName("BUCKET NAME")
+ .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+ .withDataSourceConfiguration(dc)
+ .withUserDataMapper(mapper)
+ .withSnowPipe("MY_SNOW_PIPE")
+ .withFlushTimeLimit(Duration.millis(time))
+ .withFlushRowLimit(rowsNumber)
+ .withShardsNumber(shardsNumber)
+)
+{{< /highlight >}}
+#### Parameters
+**Required** for streaming:
+
+- ` .withDataSourceConfiguration()`
+ - Accepts a DatasourceConfiguration object.
+
+- `.toTable()`
+ - Accepts the target Snowflake table name.
+ - Example: `.toTable("MY_TABLE)`
+
+- `.withStagingBucketName()`
+ - Accepts a cloud bucket path ended with slash.
+ - Example: `.withStagingBucketName("gs://mybucket/my/dir/")`
+
+- `.withStorageIntegrationName()`
+ - Accepts a name of a Snowflake storage integration object created according
to Snowflake documentationt.
+ - Example:
+{{< highlight >}}
+CREATE OR REPLACE STORAGE INTEGRATION test_integration
+TYPE = EXTERNAL_STAGE
+STORAGE_PROVIDER = GCS
+ENABLED = TRUE
+STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
+{{< /highlight >}}
+Then:
+{{< highlight >}}
+.withStorageIntegrationName(test_integration)
+{{< /highlight >}}
+
+- `.withSnowPipe()`
+ - Accepts the target SnowPipe name. `.withSnowPipe()` accepts the exact name
of snowpipe.
+Example:
+{{< highlight >}}
+CREATE OR REPLACE PIPE test_database.public.test_gcs_pipe
+AS COPY INTO stream_table from @streamstage;
+{{< /highlight >}}
+
+ - Then:
+{{< highlight >}}
+.withSnowPipe(test_gcs_pipe)
+{{< /highlight >}}
+
+**Note**: this is important to provide **schema** and **database** names.
+- `.withUserDataMapper()`
+ - Accepts the
[UserDataMapper](https://beam.apache.org/documentation/io/built-in/snowflake/#userdatamapper-function)
function that will map a user's PCollection to an array of String values
`(String[]).`
+
+**Note**:
+
+SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to
table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)).
StagingBucketName will be used to save CSV files which will end up in
Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
+
+**Optional** for streaming:
+- `.withFlushTimeLimit()`
+ - Default value: 30 seconds
+ - Accepts Duration objects with the specified time after each the streaming
write will be repeated
+ - Example: `.withFlushTimeLimit(Duration.millis(180000))`
+
+- `.withFlushRowLimit()`
+ - Default value: 10,000 rows
+ - Limit of rows written to each file staged file
Review comment:
```suggestion
- Limit of rows written to each staged file
```
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
Review comment:
```suggestion
table=<SNOWFLAKE TABLE>,
```
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash.
Bucket will be used to save CSV files which will end up in Snowflake. Those CSV
files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which maps data from a PCollection
to an array of String values before the write operation saves the data to
temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+ return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for
authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the
target table does not exist. The following values are supported:
+ - CREATE_IF_NEEDED - default behaviour. The write operation checks whether
the specified target table exists; if it does not, the write operation attempts
to create the table Specify the schema for the target table using the
table_schema parameter.
+ - CREATE_NEVER - The write operation fails if the target table does not
exist.
+
+- `write_disposition` Defines the write behaviour based on the table where
data will be written to. The following values are supported:
+ - APPEND - Default behaviour. Written data is added to the existing rows in
the table,
+ - EMPTY - The target table must be empty; otherwise, the write operation
fails,
+ - TRUNCATE - The write operation deletes all rows from the target table
before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to
CREATE_IF_NEEDED, the table_schema parameter enables specifying the schema
for the created target table. A table schema is as JSON with the following
structure:
Review comment:
```suggestion
- `table_schema` When the `create_disposition` parameter is set to
CREATE_IF_NEEDED, the table_schema parameter enables specifying the schema
for the created target table. A table schema is a JSON array with the following
structure:
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]