[
https://issues.apache.org/jira/browse/BEAM-6892?focusedWorklogId=238203&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-238203
]
ASF GitHub Bot logged work on BEAM-6892:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/May/19 23:36
Start Date: 06/May/19 23:36
Worklog Time Spent: 10m
Work Description: pabloem commented on pull request #8102: [BEAM-6892]
Adding support for side inputs on table & schema. Also adding additional params
for BQ.
URL: https://github.com/apache/beam/pull/8102#discussion_r281404198
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -78,15 +78,120 @@
or a table. Pipeline construction will fail with a validation error if neither
or both are specified.
-**Time partitioned tables**
+Writing Data to BigQuery
+========================
+
+The `WriteToBigQuery` transform is the recommended way of writing data to
+BigQuery. It supports a large set of parameters to customize how you'd like to
+write to BigQuery.
+
+Table References
+----------------
+
+This transform allows you to provide static `project`, `dataset` and `table`
+parameters which point to a specific BigQuery table to be created. The `table`
+parameter can also be a dynamic parameter (i.e. a callable), which receives an
+element to be written to BigQuery, and returns the table that that element
+should be sent to.
+
+You may also provide a tuple of PCollectionView elements to be passed as side
+inputs to your callable. For example, suppose that one wishes to send
+events of different types to different tables, and the table names are
+computed at pipeline runtime, one may do something like so::
+
+ with Pipeline() as p:
+ elements = (p | beam.Create([
+ {'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'},
+ {'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'},
+ ]))
+
+ table_names = (p | beam.Create([
+ ('error', 'my_project.dataset1.error_table_for_today'),
+ ('user_log', 'my_project.dataset1.query_table_for_today'),
+ ])
+
+ table_names_dict = beam.pvalue.AsDict(table_names)
+
+ elements | beam.io.gcp.WriteToBigQuery(
+ table=lambda row, table_dict: table_dict[row['type']],
+ table_side_inputs=(table_names_dict,))
+
+Schemas
+---------
+
+This transform also allows you to provide a static or dynamic `schema`
+parameter (i.e. a callable).
+
+If providing a callable, this should take in a table reference (as returned by
+the `table` parameter), and return the corresponding schema for that table.
+This allows to provide different schemas for different tables::
+
+ def compute_table_name(row):
+ ...
+
+ errors_schema = {'fields': [
+ {'name': 'type', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+ queries_schema = {'fields': [
+ {'name': 'type', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'query', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+ with Pipeline() as p:
+ elements = (p | beam.Create([
+ {'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'},
+ {'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'},
+ ]))
+
+ elements | beam.io.gcp.WriteToBigQuery(
+ table=compute_table_name,
+ schema=lambda table: (errors_schema
+ if 'errors' in table
+ else queries_schema))
+
+It may be the case that schemas are computed at pipeline runtime. In cases
+like these, one can also provide a `schema_side_inputs` parameter, which is
+a tuple of PCollectionViews to be passed to the schema callable (much like
+the `table_side_inputs` parameter).
+
+Additional Parameters for BigQuery Tables
+-----------------------------------------
+
+This sink is able to create tables in BigQuery if they don't already exist. It
+also relies on creating temporary tables when performing file loads.
+
+The WriteToBigQuery transform creates tables using the BigQuery API by
+inserting a load job (see the API reference [1]), or by inserting a new table
+(see the API reference for that [2][3]).
+
+When creating a new BigQuery table, there are a number of extra parameters
+that one may need to specify. For example, clustering, partitioning, data
+encoding, etc. It is possible to provide these additional parameters by
+passing a Python dictionary as `additional_bq_parameters` to the transform.
Review comment:
Pasting my chat response here:
hey cham so for BQ, the reason that I'm admitting a variable set of
properties is that in Java, the BQ sink API did not adequately support users to
provide parameters for clustering
which is a newer feature
so when BQ added it, and users wanted to rely on it, they couldnt. And in
fact, their pipelines would fail whenever we needed to run multiple loads for
the same table with temporary tables
and they had clustered tables
so I decided to accept a dictionary that would make it easy to accept new
elements
but I agree with you it's risky. We could have a whitelist of properties
that we admit. WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 238203)
Time Spent: 5h 50m (was: 5h 40m)
> Use temp_location for BQ FILE_LOADS on DirectRunner, and autocreate it in GCS
> if not specified by user.
> -------------------------------------------------------------------------------------------------------
>
> Key: BEAM-6892
> URL: https://issues.apache.org/jira/browse/BEAM-6892
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Valentyn Tymofieiev
> Assignee: Pablo Estrada
> Priority: Major
> Fix For: 2.13.0
>
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)