Repository: beam Updated Branches: refs/heads/master b6347d02c -> 3cc4ff6d7
[BEAM-2405] Override to sink interface in the batch dataflow BQ Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e641997a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e641997a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e641997a Branch: refs/heads/master Commit: e641997affc378ec0337d5ac19d8677cba0d0933 Parents: b6347d0 Author: Sourabh Bajaj <[email protected]> Authored: Tue Jun 6 19:49:54 2017 -0700 Committer: [email protected] <[email protected]> Committed: Tue Jun 6 22:17:05 2017 -0700 ---------------------------------------------------------------------- .../examples/cookbook/bigquery_tornadoes.py | 11 +++++------ sdks/python/apache_beam/io/gcp/bigquery.py | 2 +- .../runners/dataflow/dataflow_runner.py | 18 ++++++++++++++++++ 3 files changed, 24 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py index d3b216e..1ca49c5 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py @@ -83,12 +83,11 @@ def run(argv=None): # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - counts | 'write' >> beam.io.Write( - beam.io.BigQuerySink( - known_args.output, - schema='month:INTEGER, tornado_count:INTEGER', - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) + counts | 'Write' >> beam.io.WriteToBigQuery( + known_args.output, + schema='month:INTEGER, tornado_count:INTEGER', + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE) # Run the pipeline (all operations are deferred until run() is called). http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/io/gcp/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 9069f73..da8be68 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1299,7 +1299,7 @@ class WriteToBigQuery(PTransform): create_disposition=self.create_disposition, write_disposition=self.write_disposition, client=self.test_client) - return pcoll | 'Write to BigQuery' >> ParDo(bigquery_write_fn) + return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn) def display_data(self): res = {} http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 62cea33..3fc8983 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -27,6 +27,7 @@ import time import traceback import urllib +import apache_beam as beam from apache_beam import error from apache_beam import coders from apache_beam import pvalue @@ -378,6 +379,23 @@ class DataflowRunner(PipelineRunner): PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) + def apply_WriteToBigQuery(self, transform, pcoll): + standard_options = pcoll.pipeline._options.view_as(StandardOptions) + if standard_options.streaming: + if (transform.write_disposition == + beam.io.BigQueryDisposition.WRITE_TRUNCATE): + raise RuntimeError('Can not use write truncation mode in streaming') + return self.apply_PTransform(transform, pcoll) + else: + return pcoll | 'WriteToBigQuery' >> beam.io.Write( + beam.io.BigQuerySink( + transform.table_reference.tableId, + transform.table_reference.datasetId, + transform.table_reference.projectId, + transform.schema, + transform.create_disposition, + transform.write_disposition)) + def apply_GroupByKey(self, transform, pcoll): # Infer coder of parent. #
