piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494764556
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
Review comment:
Good to know that! Done.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
Review comment:
Done.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
Review comment:
Of course it is. Done for both read and write.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
Review comment:
Done for this and the rest of such cases.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
Review comment:
Done.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
Review comment:
Done.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
Review comment:
Done.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
Review comment:
Done.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
Review comment:
Yeah, it used to be when the expansion services weren't picked up
automatically. Removed.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
Review comment:
Done.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
Review comment:
Done.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
Review comment:
Done.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash.
Bucket will be used to save CSV files which will end up in Snowflake. Those CSV
files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which maps data from a PCollection
to an array of String values before the write operation saves the data to
temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+ return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for
authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the
target table does not exist. The following values are supported:
+ - CREATE_IF_NEEDED - default behaviour. The write operation checks whether
the specified target table exists; if it does not, the write operation attempts
to create the table Specify the schema for the target table using the
table_schema parameter.
+ - CREATE_NEVER - The write operation fails if the target table does not
exist.
+
+- `write_disposition` Defines the write behaviour based on the table where
data will be written to. The following values are supported:
+ - APPEND - Default behaviour. Written data is added to the existing rows in
the table,
+ - EMPTY - The target table must be empty; otherwise, the write operation
fails,
+ - TRUNCATE - The write operation deletes all rows from the target table
before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to
CREATE_IF_NEEDED, the table_schema parameter enables specifying the schema
for the created target table. A table schema is as JSON with the following
structure:
Review comment:
Done. I added a link to
https://docs.snowflake.com/en/sql-reference/data-types.html
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash.
Bucket will be used to save CSV files which will end up in Snowflake. Those CSV
files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which maps data from a PCollection
to an array of String values before the write operation saves the data to
temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+ return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for
authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the
target table does not exist. The following values are supported:
+ - CREATE_IF_NEEDED - default behaviour. The write operation checks whether
the specified target table exists; if it does not, the write operation attempts
to create the table Specify the schema for the target table using the
table_schema parameter.
+ - CREATE_NEVER - The write operation fails if the target table does not
exist.
+
+- `write_disposition` Defines the write behaviour based on the table where
data will be written to. The following values are supported:
+ - APPEND - Default behaviour. Written data is added to the existing rows in
the table,
+ - EMPTY - The target table must be empty; otherwise, the write operation
fails,
+ - TRUNCATE - The write operation deletes all rows from the target table
before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to
CREATE_IF_NEEDED, the table_schema parameter enables specifying the schema
for the created target table. A table schema is as JSON with the following
structure:
+{{< highlight >}}
+{"schema":[
+ {
+ "dataType":{"type":"<COLUMN DATA TYPE>"},
+ "name":"<COLUMN NAME> ",
+ "nullable": <NULLABLE>
+ },
+ ...
+ ]}
+{{< /highlight >}}
+All supported data types:
+{{< highlight >}}
+{"dataType":{"type":"date"},"name":"","nullable":false},
+{"dataType":{"type":"datetime"},"name":"","nullable":false},
+{"dataType":{"type":"time"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+{"dataType":{"type":"boolean"},"name":"","nullable":false},
+{"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+{"dataType":{"type":"double"},"name":"","nullable":false},
+{"dataType":{"type":"float"},"name":"","nullable":false},
+{"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+{"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+{"dataType":{"type":"numeric","precision":40,"scale":2},"name":"","nullable":false},
+{"dataType":{"type":"real"},"name":"","nullable":false},
+{"dataType":{"type":"array"},"name":"","nullable":false},
+{"dataType":{"type":"object"},"name":"","nullable":false},
+{"dataType":{"type":"variant"},"name":"","nullable":true},
+{"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"char","length":1},"name":"","nullable":false},
+{"dataType":{"type":"string","length":null},"name":"","nullable":false},
+{"dataType":{"type":"text","length":null},"name":"","nullable":false},
+{"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]
Review comment:
I agree. I found 2 of them to be true but the overall json schema is
written above so just the dataType json object will be sufficient and much
clearer.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+ return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters
for authentication:
+- `username and password` Specifies username and password for
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This
transformation enables you to finish the Beam pipeline with an output operation
that sends the user's
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | <SOURCE OF DATA>
+ | WriteToSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ create_disposition=<CREATE DISPOSITION>,
+ write_disposition=<WRITE DISPOSITION>,
+ table_schema=<SNOWFLAKE TABLE SCHEMA>,
+ user_data_mapper=<USER DATA MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash.
Bucket will be used to save CSV files which will end up in Snowflake. Those CSV
files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which maps data from a PCollection
to an array of String values before the write operation saves the data to
temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+ return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for
authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the
user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the
target table does not exist. The following values are supported:
+ - CREATE_IF_NEEDED - default behaviour. The write operation checks whether
the specified target table exists; if it does not, the write operation attempts
to create the table Specify the schema for the target table using the
table_schema parameter.
+ - CREATE_NEVER - The write operation fails if the target table does not
exist.
+
+- `write_disposition` Defines the write behaviour based on the table where
data will be written to. The following values are supported:
+ - APPEND - Default behaviour. Written data is added to the existing rows in
the table,
+ - EMPTY - The target table must be empty; otherwise, the write operation
fails,
+ - TRUNCATE - The write operation deletes all rows from the target table
before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to
CREATE_IF_NEEDED, the table_schema parameter enables specifying the schema
for the created target table. A table schema is as JSON with the following
structure:
+{{< highlight >}}
+{"schema":[
+ {
+ "dataType":{"type":"<COLUMN DATA TYPE>"},
+ "name":"<COLUMN NAME> ",
+ "nullable": <NULLABLE>
+ },
+ ...
+ ]}
+{{< /highlight >}}
+All supported data types:
+{{< highlight >}}
+{"dataType":{"type":"date"},"name":"","nullable":false},
+{"dataType":{"type":"datetime"},"name":"","nullable":false},
+{"dataType":{"type":"time"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+{"dataType":{"type":"boolean"},"name":"","nullable":false},
+{"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+{"dataType":{"type":"double"},"name":"","nullable":false},
+{"dataType":{"type":"float"},"name":"","nullable":false},
+{"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+{"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+{"dataType":{"type":"numeric","precision":40,"scale":2},"name":"","nullable":false},
+{"dataType":{"type":"real"},"name":"","nullable":false},
+{"dataType":{"type":"array"},"name":"","nullable":false},
+{"dataType":{"type":"object"},"name":"","nullable":false},
+{"dataType":{"type":"variant"},"name":"","nullable":true},
+{"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"char","length":1},"name":"","nullable":false},
+{"dataType":{"type":"string","length":null},"name":"","nullable":false},
+{"dataType":{"type":"text","length":null},"name":"","nullable":false},
+{"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]
Review comment:
I agree. I found 2 of nullables to be true but the overall json schema
is written above so just the dataType json object will be sufficient and much
clearer.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -204,14 +370,121 @@ Then:
**Note**:
SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to
table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)).
StagingBucketName will be used to save CSV files which will end up in
Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
+**Optional** for batching:
+- `.withQuotationMark()`
+ - Default value: `‘` (single quotation mark).
+ - Accepts String with one character. It will surround all text (String)
fields saved to CSV. It should be one of the accepted characters by
[Snowflake’s](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html)
[FIELD_OPTIONALLY_ENCLOSED_BY](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html)
parameter (double quotation mark, single quotation mark or none).
+ - Example: `.withQuotationMark("'")`
+### Streaming write (from unbounded source)
+It is required to create a
[SnowPipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe.html) in
the Snowflake console. SnowPipe should use the same integration and the same
bucket as specified by .withStagingBucketName and .withStorageIntegrationName
methods. The write operation might look as follows:
+{{< highlight java >}}
+data.apply(
+ SnowflakeIO.<type>write()
+ .withStagingBucketName("BUCKET NAME")
+ .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+ .withDataSourceConfiguration(dc)
+ .withUserDataMapper(mapper)
+ .withSnowPipe("MY_SNOW_PIPE")
+ .withFlushTimeLimit(Duration.millis(time))
+ .withFlushRowLimit(rowsNumber)
+ .withShardsNumber(shardsNumber)
+)
+{{< /highlight >}}
+#### Parameters
+**Required** for streaming:
+
+- ` .withDataSourceConfiguration()`
+ - Accepts a DatasourceConfiguration object.
+
+- `.toTable()`
+ - Accepts the target Snowflake table name.
+ - Example: `.toTable("MY_TABLE)`
+
+- `.withStagingBucketName()`
+ - Accepts a cloud bucket path ended with slash.
+ - Example: `.withStagingBucketName("gs://mybucket/my/dir/")`
+
+- `.withStorageIntegrationName()`
+ - Accepts a name of a Snowflake storage integration object created according
to Snowflake documentationt.
+ - Example:
+{{< highlight >}}
+CREATE OR REPLACE STORAGE INTEGRATION test_integration
+TYPE = EXTERNAL_STAGE
+STORAGE_PROVIDER = GCS
+ENABLED = TRUE
+STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
+{{< /highlight >}}
+Then:
+{{< highlight >}}
+.withStorageIntegrationName(test_integration)
+{{< /highlight >}}
+
+- `.withSnowPipe()`
+ - Accepts the target SnowPipe name. `.withSnowPipe()` accepts the exact name
of snowpipe.
+Example:
+{{< highlight >}}
+CREATE OR REPLACE PIPE test_database.public.test_gcs_pipe
+AS COPY INTO stream_table from @streamstage;
+{{< /highlight >}}
+
+ - Then:
+{{< highlight >}}
+.withSnowPipe(test_gcs_pipe)
+{{< /highlight >}}
+
+**Note**: this is important to provide **schema** and **database** names.
+- `.withUserDataMapper()`
+ - Accepts the
[UserDataMapper](https://beam.apache.org/documentation/io/built-in/snowflake/#userdatamapper-function)
function that will map a user's PCollection to an array of String values
`(String[]).`
+
+**Note**:
+
+SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to
table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)).
StagingBucketName will be used to save CSV files which will end up in
Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
+
+**Optional** for streaming:
+- `.withFlushTimeLimit()`
+ - Default value: 30 seconds
+ - Accepts Duration objects with the specified time after each the streaming
write will be repeated
+ - Example: `.withFlushTimeLimit(Duration.millis(180000))`
+
+- `.withFlushRowLimit()`
+ - Default value: 10,000 rows
+ - Limit of rows written to each file staged file
Review comment:
Done.
##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord>
getCsvMapper() {
};
}
{{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide
full interoperability
+across the Beam ecosystem. From a developer perspective it means the
possibility of combining transforms written in different
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are
to support all runners.
+For more information about cross-language please see [multi sdk
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
articles.
+
+### Set up
+Please see [Apache Beam with Flink
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full
tables via table name or custom data via query. Output of the read transform is
a
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+ "--runner=FlinkRunner",
+ "--flink_version=1.10",
+ "--flink_master=localhost:8081",
+ "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+ (p
+ | ReadFromSnowflake(
+ server_name=<SNOWFLAKE SERVER NAME>,
+ username=<SNOWFLAKE USERNAME>,
+ password=<SNOWFLAKE PASSWORD>,
+ o_auth_token=<OAUTH TOKEN>,
+ private_key_path=<PATH TO P8 FILE>,
+ raw_private_key=<PRIVATE_KEY>
+ private_key_passphrase=<PASSWORD FOR KEY>,
+ schema=<SNOWFLAKE SCHEMA>,
+ database=<SNOWFLAKE DATABASE>,
+ staging_bucket_name=<GCS BUCKET NAME>,
+ storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+ csv_mapper=<CSV MAPPER FUNCTION>,
+ table=<SNOWFALKE TABLE>,
+ query=<IF NOT TABLE THEN QUERY>,
+ role=<SNOWFLAKE ROLE>,
+ warehouse=<SNOWFLAKE WAREHOUSE>,
+ expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will
be used as a temporary location for storing CSV files. Those temporary
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration
object created according to [Snowflake
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object
to array of strings. SnowflakeIO uses a [COPY INTO
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
statement to move data from a Snowflake table to Google Cloud Storage as CSV
files. These files are then downloaded via
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
and processed line by line. Each line is split into an array of Strings using
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function
job is to give the user the possibility to convert the array of Strings to a
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom
objects.
Review comment:
I think it was considered but at the time of writing this connector Rows
and Schemas were not well known and the csv + user mapper were already
implemented. It would definitely be a nice feature to be done in the future.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]