piotr-szuberski commented on a change in pull request #12823: URL: https://github.com/apache/beam/pull/12823#discussion_r494788891
########## File path: website/www/site/content/en/documentation/io/built-in/snowflake.md ########## @@ -204,14 +370,121 @@ Then: **Note**: SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path. +**Optional** for batching: +- `.withQuotationMark()` + - Default value: `‘` (single quotation mark). + - Accepts String with one character. It will surround all text (String) fields saved to CSV. It should be one of the accepted characters by [Snowflake’s](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html) [FIELD_OPTIONALLY_ENCLOSED_BY](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html) parameter (double quotation mark, single quotation mark or none). + - Example: `.withQuotationMark("'")` +### Streaming write (from unbounded source) +It is required to create a [SnowPipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe.html) in the Snowflake console. SnowPipe should use the same integration and the same bucket as specified by .withStagingBucketName and .withStorageIntegrationName methods. The write operation might look as follows: +{{< highlight java >}} +data.apply( + SnowflakeIO.<type>write() + .withStagingBucketName("BUCKET NAME") + .withStorageIntegrationName("STORAGE INTEGRATION NAME") + .withDataSourceConfiguration(dc) + .withUserDataMapper(mapper) + .withSnowPipe("MY_SNOW_PIPE") + .withFlushTimeLimit(Duration.millis(time)) + .withFlushRowLimit(rowsNumber) + .withShardsNumber(shardsNumber) +) +{{< /highlight >}} +#### Parameters +**Required** for streaming: + +- ` .withDataSourceConfiguration()` + - Accepts a DatasourceConfiguration object. + +- `.toTable()` + - Accepts the target Snowflake table name. + - Example: `.toTable("MY_TABLE)` + +- `.withStagingBucketName()` + - Accepts a cloud bucket path ended with slash. + - Example: `.withStagingBucketName("gs://mybucket/my/dir/")` + +- `.withStorageIntegrationName()` + - Accepts a name of a Snowflake storage integration object created according to Snowflake documentationt. + - Example: +{{< highlight >}} +CREATE OR REPLACE STORAGE INTEGRATION test_integration +TYPE = EXTERNAL_STAGE +STORAGE_PROVIDER = GCS +ENABLED = TRUE +STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/'); +{{< /highlight >}} +Then: +{{< highlight >}} +.withStorageIntegrationName(test_integration) +{{< /highlight >}} + +- `.withSnowPipe()` + - Accepts the target SnowPipe name. `.withSnowPipe()` accepts the exact name of snowpipe. +Example: +{{< highlight >}} +CREATE OR REPLACE PIPE test_database.public.test_gcs_pipe +AS COPY INTO stream_table from @streamstage; +{{< /highlight >}} + + - Then: +{{< highlight >}} +.withSnowPipe(test_gcs_pipe) +{{< /highlight >}} + +**Note**: this is important to provide **schema** and **database** names. +- `.withUserDataMapper()` + - Accepts the [UserDataMapper](https://beam.apache.org/documentation/io/built-in/snowflake/#userdatamapper-function) function that will map a user's PCollection to an array of String values `(String[]).` + +**Note**: + +SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path. + +**Optional** for streaming: +- `.withFlushTimeLimit()` + - Default value: 30 seconds + - Accepts Duration objects with the specified time after each the streaming write will be repeated + - Example: `.withFlushTimeLimit(Duration.millis(180000))` + +- `.withFlushRowLimit()` + - Default value: 10,000 rows + - Limit of rows written to each file staged file Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
