[ 
https://issues.apache.org/jira/browse/BEAM-6892?focusedWorklogId=238822&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-238822
 ]

ASF GitHub Bot logged work on BEAM-6892:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/May/19 21:32
            Start Date: 07/May/19 21:32
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on pull request #8102: 
[BEAM-6892] Adding support for side inputs on table & schema. Also adding 
additional params for BQ.
URL: https://github.com/apache/beam/pull/8102#discussion_r281837554
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery.py
 ##########
 @@ -78,15 +78,120 @@
 or a table. Pipeline construction will fail with a validation error if neither
 or both are specified.
 
-**Time partitioned tables**
+Writing Data to BigQuery
+========================
+
+The `WriteToBigQuery` transform is the recommended way of writing data to
+BigQuery. It supports a large set of parameters to customize how you'd like to
+write to BigQuery.
+
+Table References
+----------------
+
+This transform allows you to provide static `project`, `dataset` and `table`
+parameters which point to a specific BigQuery table to be created. The `table`
+parameter can also be a dynamic parameter (i.e. a callable), which receives an
+element to be written to BigQuery, and returns the table that that element
+should be sent to.
+
+You may also provide a tuple of PCollectionView elements to be passed as side
+inputs to your callable. For example, suppose that one wishes to send
+events of different types to different tables, and the table names are
+computed at pipeline runtime, one may do something like so::
+
+    with Pipeline() as p:
+      elements = (p | beam.Create([
+        {'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'},
+        {'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'},
+      ]))
+
+      table_names = (p | beam.Create([
+        ('error', 'my_project.dataset1.error_table_for_today'),
+        ('user_log', 'my_project.dataset1.query_table_for_today'),
+      ])
+
+      table_names_dict = beam.pvalue.AsDict(table_names)
+
+      elements | beam.io.gcp.WriteToBigQuery(
+        table=lambda row, table_dict: table_dict[row['type']],
+        table_side_inputs=(table_names_dict,))
+
+Schemas
+---------
+
+This transform also allows you to provide a static or dynamic `schema`
+parameter (i.e. a callable).
+
+If providing a callable, this should take in a table reference (as returned by
+the `table` parameter), and return the corresponding schema for that table.
+This allows to provide different schemas for different tables::
+
+    def compute_table_name(row):
+      ...
+
+    errors_schema = {'fields': [
+      {'name': 'type', 'type': 'STRING', 'mode': 'NULLABLE'},
+      {'name': 'message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+    queries_schema = {'fields': [
+      {'name': 'type', 'type': 'STRING', 'mode': 'NULLABLE'},
+      {'name': 'query', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+    with Pipeline() as p:
+      elements = (p | beam.Create([
+        {'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'},
+        {'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'},
+      ]))
+
+      elements | beam.io.gcp.WriteToBigQuery(
+        table=compute_table_name,
+        schema=lambda table: (errors_schema
+                              if 'errors' in table
+                              else queries_schema))
+
+It may be the case that schemas are computed at pipeline runtime. In cases
+like these, one can also provide a `schema_side_inputs` parameter, which is
+a tuple of PCollectionViews to be passed to the schema callable (much like
+the `table_side_inputs` parameter).
+
+Additional Parameters for BigQuery Tables
+-----------------------------------------
+
+This sink is able to create tables in BigQuery if they don't already exist. It
+also relies on creating temporary tables when performing file loads.
+
+The WriteToBigQuery transform creates tables using the BigQuery API by
+inserting a load job (see the API reference [1]), or by inserting a new table
+(see the API reference for that [2][3]).
+
+When creating a new BigQuery table, there are a number of extra parameters
+that one may need to specify. For example, clustering, partitioning, data
+encoding, etc. It is possible to provide these additional parameters by
+passing a Python dictionary as `additional_bq_parameters` to the transform.
 
 Review comment:
   LGTM. But please make sure that Java and Python APIs are consistent in this 
regard.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 238822)
    Time Spent: 6h 40m  (was: 6.5h)

> Use temp_location for BQ FILE_LOADS on DirectRunner, and autocreate it in GCS 
> if not specified by user.
> -------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-6892
>                 URL: https://issues.apache.org/jira/browse/BEAM-6892
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Valentyn Tymofieiev
>            Assignee: Pablo Estrada
>            Priority: Major
>             Fix For: 2.13.0
>
>          Time Spent: 6h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to