TheNeuralBit commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r493908171



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.

Review comment:
       ```suggestion
   - `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
   ```
   
   All of the javadoc and pydoc links should refer to `current` instead of a 
specific version number

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query

Review comment:
       nit: please make sure only the argument names are code formatted when 
you are referencing two different args, e.g.:
   
   ```suggestion
   - `table` or `query` Specifies a Snowflake table name or custom SQL query
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters 
for authentication:
+- `username and password` Specifies username and password for 
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the 
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This 
transformation enables you to finish the Beam pipeline with an output operation 
that sends the user's 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 to your Snowflake database.
+#### General usage
+{{< highlight >}}

Review comment:
       ```suggestion
   {{< highlight py >}}
   ```
   
   Please specify `py` or `java` in all the relevant code blocks so that syntax 
is properly highlighted

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters 
for authentication:
+- `username and password` Specifies username and password for 
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the 
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This 
transformation enables you to finish the Beam pipeline with an output operation 
that sends the user's 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}

Review comment:
       These exampels can drop the pipeline options and instead just focus on 
applying the transform to the pipeline, e.g.:
   ```py
   (p | <SOURCE>
      | WriteToSnowflake(...)
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters 
for authentication:
+- `username and password` Specifies username and password for 
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the 
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This 
transformation enables you to finish the Beam pipeline with an output operation 
that sends the user's 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. 
Bucket will be used to save CSV files which will end up in Snowflake. Those CSV 
files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection 
to an array of String values before the write operation saves the data to 
temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for 
authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the 
user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the 
target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether 
the specified target table exists; if it does not, the write operation attempts 
to create the table Specify the schema for the target table using the 
table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not 
exist.
+
+- `write_disposition` Defines the write behaviour based on the table where 
data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in 
the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation 
fails,
+  - TRUNCATE - The write operation deletes all rows from the target table 
before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to 
CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema 
for the created target table. A table schema is as JSON with the following 
structure:

Review comment:
       Is there documentation we can refer to about how all of these types are 
encoded in CSV?

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters 
for authentication:
+- `username and password` Specifies username and password for 
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the 
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This 
transformation enables you to finish the Beam pipeline with an output operation 
that sends the user's 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. 
Bucket will be used to save CSV files which will end up in Snowflake. Those CSV 
files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection 
to an array of String values before the write operation saves the data to 
temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for 
authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the 
user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the 
target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether 
the specified target table exists; if it does not, the write operation attempts 
to create the table Specify the schema for the target table using the 
table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not 
exist.
+
+- `write_disposition` Defines the write behaviour based on the table where 
data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in 
the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation 
fails,
+  - TRUNCATE - The write operation deletes all rows from the target table 
before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to 
CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema 
for the created target table. A table schema is as JSON with the following 
structure:
+{{< highlight >}}
+{"schema":[
+    {
+      "dataType":{"type":"<COLUMN DATA TYPE>"},
+      "name":"<COLUMN  NAME> ",
+      "nullable": <NULLABLE>
+    },
+        ...
+  ]}
+{{< /highlight >}}
+All supported data types:
+{{< highlight >}}
+{"dataType":{"type":"date"},"name":"","nullable":false},
+{"dataType":{"type":"datetime"},"name":"","nullable":false},
+{"dataType":{"type":"time"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+{"dataType":{"type":"boolean"},"name":"","nullable":false},
+{"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+{"dataType":{"type":"double"},"name":"","nullable":false},
+{"dataType":{"type":"float"},"name":"","nullable":false},
+{"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+{"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+{"dataType":{"type":"numeric","precision":40,"scale":2},"name":"","nullable":false},
+{"dataType":{"type":"real"},"name":"","nullable":false},
+{"dataType":{"type":"array"},"name":"","nullable":false},
+{"dataType":{"type":"object"},"name":"","nullable":false},
+{"dataType":{"type":"variant"},"name":"","nullable":true},
+{"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"char","length":1},"name":"","nullable":false},
+{"dataType":{"type":"string","length":null},"name":"","nullable":false},
+{"dataType":{"type":"text","length":null},"name":"","nullable":false},
+{"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]

Review comment:
       Does the fact that all of these have `"nullable":false` indicate nulls 
aren't supported? This could be more concise if you just include the "dataType" 
object, e.g.:
   ```suggestion
   {"type":"varchar","length":100}
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.

Review comment:
       ```suggestion
   One of the functions of SnowflakeIO is reading Snowflake tables - either 
full tables via table name or custom data via query. Output of the read 
transform is a 
[PCollection](https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters 
for authentication:
+- `username and password` Specifies username and password for 
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the 
user's default will be used.

Review comment:
       ```suggestion
   - `warehouse` specifies Snowflake warehouse name. If not specified the 
user's default will be used.
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+

Review comment:
       ```suggestion
   ```
   
   I don't think this is necessary

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,

Review comment:
       ```suggestion
              table=<SNOWFLAKE TABLE>,
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters 
for authentication:
+- `username and password` Specifies username and password for 
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.

Review comment:
       ```suggestion
   - `role` specifies Snowflake role. If not specified the user's default will 
be used.
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.

Review comment:
       This page shouldn't state Flink is the only runner the supports 
cross-language, instead please refer users to 
https://beam.apache.org/roadmap/connectors-multi-sdk/#cross-language-transforms-api-and-expansion-service
 for information about which runners support it.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.

Review comment:
       This is optional right? In other cross-language transforms we will 
download the appropriate expansion service jar and start it for you if a URL 
isn't specified.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -204,14 +370,121 @@ Then:
 **Note**:
 SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to 
table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)).
 StagingBucketName will be used to save CSV files which will end up in 
Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
 
+**Optional** for batching:
+- `.withQuotationMark()`
+  - Default value: `‘` (single quotation mark).
+  - Accepts String with one character. It will surround all text (String) 
fields saved to CSV. It should be one of the accepted characters by 
[Snowflake’s](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html)
 
[FIELD_OPTIONALLY_ENCLOSED_BY](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html)
 parameter (double quotation mark, single quotation mark or none).
+  - Example: `.withQuotationMark("'")`
+### Streaming write  (from unbounded source)
+It is required to create a 
[SnowPipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe.html) in 
the Snowflake console. SnowPipe should use the same integration and the same 
bucket as specified by .withStagingBucketName and .withStorageIntegrationName 
methods. The write operation might look as follows:
+{{< highlight java >}}
+data.apply(
+   SnowflakeIO.<type>write()
+      .withStagingBucketName("BUCKET NAME")
+      .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+      .withDataSourceConfiguration(dc)
+      .withUserDataMapper(mapper)
+      .withSnowPipe("MY_SNOW_PIPE")
+      .withFlushTimeLimit(Duration.millis(time))
+      .withFlushRowLimit(rowsNumber)
+      .withShardsNumber(shardsNumber)
+)
+{{< /highlight >}}
+#### Parameters
+**Required** for streaming:
+
+- ` .withDataSourceConfiguration()`
+  - Accepts a DatasourceConfiguration object.
+
+- `.toTable()`
+  - Accepts the target Snowflake table name.
+  - Example: `.toTable("MY_TABLE)`
+
+- `.withStagingBucketName()`
+  - Accepts a cloud bucket path ended with slash.
+  - Example: `.withStagingBucketName("gs://mybucket/my/dir/")`
+
+- `.withStorageIntegrationName()`
+  - Accepts a name of a Snowflake storage integration object created according 
to Snowflake documentationt.
+  - Example:
+{{< highlight >}}
+CREATE OR REPLACE STORAGE INTEGRATION test_integration
+TYPE = EXTERNAL_STAGE
+STORAGE_PROVIDER = GCS
+ENABLED = TRUE
+STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
+{{< /highlight >}}
+Then:
+{{< highlight >}}
+.withStorageIntegrationName(test_integration)
+{{< /highlight >}}
+
+- `.withSnowPipe()`
+  - Accepts the target SnowPipe name. `.withSnowPipe()` accepts the exact name 
of snowpipe.
+Example:
+{{< highlight >}}
+CREATE OR REPLACE PIPE test_database.public.test_gcs_pipe
+AS COPY INTO stream_table from @streamstage;
+{{< /highlight >}}
+
+   - Then:
+{{< highlight >}}
+.withSnowPipe(test_gcs_pipe)
+{{< /highlight >}}
+
+**Note**: this is important to provide **schema** and **database** names.
+- `.withUserDataMapper()`
+  - Accepts the 
[UserDataMapper](https://beam.apache.org/documentation/io/built-in/snowflake/#userdatamapper-function)
 function that will map a user's PCollection to an array of String values 
`(String[]).`
+
+**Note**:
+
+SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to 
table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)).
 StagingBucketName will be used to save CSV files which will end up in 
Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
+
+**Optional** for streaming:
+- `.withFlushTimeLimit()`
+  - Default value: 30 seconds
+  - Accepts Duration objects with the specified time after each the streaming 
write will be repeated
+  - Example: `.withFlushTimeLimit(Duration.millis(180000))`
+
+- `.withFlushRowLimit()`
+  - Default value: 10,000 rows
+  - Limit of rows written to each file staged file

Review comment:
       ```suggestion
     - Limit of rows written to each staged file
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters 
for authentication:
+- `username and password` Specifies username and password for 
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the 
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This 
transformation enables you to finish the Beam pipeline with an output operation 
that sends the user's 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,

Review comment:
       ```suggestion
              table=<SNOWFLAKE TABLE>,
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> 
getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing 
operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework 
Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide 
full interoperability
+across the Beam ecosystem. From a developer perspective it means the 
possibility of combining transforms written in different 
languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache 
Flink](https://flink.apache.org/) as a runner in a stable manner but plans are 
to support all runners.
+For more information about cross-language please see [multi sdk 
efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of 
Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html)
 articles.
+
+### Set up
+Please see [Apache Beam with Flink 
runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full 
tables via table name or custom data via query. Output of the read transform is 
a 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will 
be used as a temporary location for storing CSV files. Those temporary 
directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be 
removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object 
to array of strings. SnowflakeIO uses a [COPY INTO 
<location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html)
 statement to move data from a Snowflake table to Google Cloud Storage as CSV 
files. These files are then downloaded via 
[FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html)
 and processed line by line. Each line is split into an array of Strings using 
the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function 
job is to give the user the possibility to convert the array of Strings to a 
user-defined type, ie. GenericRecord for Avro or Parquet files, or custom 
objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters 
for authentication:
+- `username and password` Specifies username and password for 
username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the 
user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This 
transformation enables you to finish the Beam pipeline with an output operation 
that sends the user's 
[PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection)
 to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. 
Bucket will be used to save CSV files which will end up in Snowflake. Those CSV 
files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration 
object created according to [Snowflake 
documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection 
to an array of String values before the write operation saves the data to 
temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for 
authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private 
key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and 
passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default 
will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the 
user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the 
target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether 
the specified target table exists; if it does not, the write operation attempts 
to create the table Specify the schema for the target table using the 
table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not 
exist.
+
+- `write_disposition` Defines the write behaviour based on the table where 
data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in 
the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation 
fails,
+  - TRUNCATE - The write operation deletes all rows from the target table 
before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to 
CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema 
for the created target table. A table schema is as JSON with the following 
structure:

Review comment:
       ```suggestion
   - `table_schema` When the `create_disposition` parameter is set to 
CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema 
for the created target table. A table schema is a JSON array with the following 
structure:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to