This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e4126c8 Fixing Python tornadoes test for Google (#7959)
e4126c8 is described below
commit e4126c8485a95a44412ffa52b8d146db0f18abc4
Author: Pablo <[email protected]>
AuthorDate: Wed Feb 27 16:12:23 2019 -0800
Fixing Python tornadoes test for Google (#7959)
* Fixing tornadoes test for Google
* improving check
---
sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py | 7 ++++++-
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 4 ++++
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index 9411df1..9db0f73 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -87,6 +87,11 @@ def run(argv=None):
rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
counts = count_tornadoes(rows)
+ if 'temp_location' in p.options.get_all_options():
+ location = p.options.get_all_options()['temp_location']
+ else:
+ location = known_args.gcs_location
+
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
counts | 'Write' >> beam.io.WriteToBigQuery(
@@ -94,7 +99,7 @@ def run(argv=None):
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
- gs_location=known_args.gcs_location)
+ gs_location=location)
# Run the pipeline (all operations are deferred until run() is called).
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 2f4fd52..2af9bf5 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -619,6 +619,10 @@ class BigQueryWrapper(object):
dataset_id=dataset_id,
table_id=table_id,
schema=schema or found_table.schema)
+ logging.info('Created table %s.%s.%s with schema %s. Result: %s.',
+ project_id, dataset_id, table_id,
+ schema or found_table.schema,
+ created_table)
# if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
# the table before this point.
if write_disposition == BigQueryDisposition.WRITE_TRUNCATE: