This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aba546d  [BEAM-6658] Add kms_key to BigQuery transforms, pass to 
Dataflow (#7824)
aba546d is described below

commit aba546d2d774e22aaeb9d8a24d37bc12ad0241f4
Author: Udi Meiri <u...@users.noreply.github.com>
AuthorDate: Tue Feb 12 17:42:10 2019 -0800

    [BEAM-6658] Add kms_key to BigQuery transforms, pass to Dataflow (#7824)
    
    * [BEAM-6658] Add kms_key to BigQuery transforms, pass to Dataflow
    
    * Disable IT until Dataflow supports the new property.
---
 .../io/gcp/big_query_query_to_table_it_test.py     | 27 ++++++++++++++++++++++
 .../io/gcp/big_query_query_to_table_pipeline.py    |  9 ++++++--
 sdks/python/apache_beam/io/gcp/bigquery.py         | 24 +++++++++++++++----
 sdks/python/apache_beam/io/gcp/bigquery_test.py    | 11 +++++++--
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 21 +++++++++++++----
 .../runners/dataflow/dataflow_runner.py            |  9 +++++++-
 .../apache_beam/runners/dataflow/internal/names.py |  9 ++++----
 7 files changed, 91 insertions(+), 19 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py 
b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index fe862e3..43db185 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -64,6 +64,8 @@ NEW_TYPES_QUERY = (
 DIALECT_OUTPUT_SCHEMA = ('{"fields": [{"name": "fruit","type": "STRING"}]}')
 DIALECT_OUTPUT_VERIFY_QUERY = ('SELECT fruit from `%s`;')
 DIALECT_OUTPUT_EXPECTED = [(u'apple',), (u'orange',)]
+KMS_KEY = 'projects/apache-beam-testing/locations/global/keyRings/beam-it/' \
+          'cryptoKeys/test'
 
 
 class BigQueryQueryToTableIT(unittest.TestCase):
@@ -150,6 +152,31 @@ class BigQueryQueryToTableIT(unittest.TestCase):
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
+  # TODO(BEAM-6660): Enable this test when ready.
+  @unittest.skip('This test requires BQ Dataflow native source support for ' +
+                 'KMS, which is not available yet.')
+  @attr('IT')
+  def test_big_query_standard_sql_kms_key(self):
+    verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
+    expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
+    pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher(
+        project=self.project,
+        query=verify_query,
+        checksum=expected_checksum)]
+    extra_opts = {'query': STANDARD_QUERY,
+                  'output': self.output_table,
+                  'output_schema': DIALECT_OUTPUT_SCHEMA,
+                  'use_standard_sql': True,
+                  'on_success_matcher': all_of(*pipeline_verifiers),
+                  'kms_key': KMS_KEY
+                 }
+    options = self.test_pipeline.get_full_options_as_args(**extra_opts)
+    big_query_query_to_table_pipeline.run_bq_pipeline(options)
+
+    table = self.bigquery_client.get_table(
+        self.project, self.dataset_id, 'output_table')
+    self.assertEqual(KMS_KEY, table.encryptionConfiguration.kmsKeyName)
+
   @attr('IT')
   def test_big_query_new_types(self):
     expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
diff --git 
a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py 
b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
index d3ec694..26b418a 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
@@ -48,21 +48,26 @@ def run_bq_pipeline(argv=None):
   parser.add_argument('--use_standard_sql', action='store_true',
                       dest='use_standard_sql',
                       help='Output BQ table to write results to.')
+  parser.add_argument('--kms_key', default=None,
+                      help='Use this Cloud KMS key with BigQuery.')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
   table_schema = parse_table_schema_from_json(known_args.output_schema)
+  kms_key = known_args.kms_key
 
   p = TestPipeline(options=PipelineOptions(pipeline_args))
 
   # pylint: disable=expression-not-assigned
   # pylint: disable=bad-continuation
   (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
-      query=known_args.query, use_standard_sql=known_args.use_standard_sql))
+      query=known_args.query, use_standard_sql=known_args.use_standard_sql,
+      kms_key=kms_key))
    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
            known_args.output,
            schema=table_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-           write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)))
+           write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
+           kms_key=known_args.kms_key)))
 
   result = p.run()
   result.wait_until_finish()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index c3a5814..8f3011b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -264,7 +264,7 @@ class BigQuerySource(dataflow_io.NativeSource):
 
   def __init__(self, table=None, dataset=None, project=None, query=None,
                validate=False, coder=None, use_standard_sql=False,
-               flatten_results=True):
+               flatten_results=True, kms_key=None):
     """Initialize a :class:`BigQuerySource`.
 
     Args:
@@ -301,6 +301,8 @@ class BigQuerySource(dataflow_io.NativeSource):
         This parameter is ignored for table inputs.
       flatten_results (bool): Flattens all nested and repeated fields in the
         query results. The default value is :data:`True`.
+      kms_key (str): Experimental. Optional Cloud KMS key name for use when
+        creating new tables.
 
     Raises:
       ~exceptions.ValueError: if any of the following is true:
@@ -338,6 +340,7 @@ class BigQuerySource(dataflow_io.NativeSource):
     self.validate = validate
     self.flatten_results = flatten_results
     self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+    self.kms_key = kms_key
 
   def display_data(self):
     if self.query is not None:
@@ -366,7 +369,8 @@ class BigQuerySource(dataflow_io.NativeSource):
         source=self,
         test_bigquery_client=test_bigquery_client,
         use_legacy_sql=self.use_legacy_sql,
-        flatten_results=self.flatten_results)
+        flatten_results=self.flatten_results,
+        kms_key=self.kms_key)
 
 
 class BigQuerySink(dataflow_io.NativeSink):
@@ -381,7 +385,7 @@ class BigQuerySink(dataflow_io.NativeSink):
   def __init__(self, table, dataset=None, project=None, schema=None,
                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=BigQueryDisposition.WRITE_EMPTY,
-               validate=False, coder=None):
+               validate=False, coder=None, kms_key=None):
     """Initialize a BigQuerySink.
 
     Args:
@@ -434,6 +438,8 @@ bigquery_v2_messages.TableSchema` object or a single string 
 of the form
         that will be JSON serialized as a line in a file. This argument needs a
         value only in special cases when writing table rows as dictionaries is
         not desirable.
+      kms_key (str): Experimental. Optional Cloud KMS key name for use when
+        creating new tables.
 
     Raises:
       ~exceptions.TypeError: if the schema argument is not a :class:`str` or a
@@ -481,6 +487,7 @@ bigquery_v2_messages.TableSchema` object.
         write_disposition)
     self.validate = validate
     self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
+    self.kms_key = kms_key
 
   def display_data(self):
     res = {}
@@ -531,7 +538,7 @@ class BigQueryWriteFn(DoFn):
   """
 
   def __init__(self, table_id, dataset_id, project_id, batch_size, schema,
-               create_disposition, write_disposition, test_client):
+               create_disposition, write_disposition, kms_key, test_client):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -561,6 +568,8 @@ class BigQueryWriteFn(DoFn):
         -  BigQueryDisposition.WRITE_APPEND: add to existing rows.
         -  BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
         For streaming pipelines WriteTruncate can not be used.
+      kms_key: Experimental. Optional Cloud KMS key name for use when creating
+        new tables.
       test_client: Override the default bigquery client used for testing.
     """
     self.table_id = table_id
@@ -573,6 +582,7 @@ class BigQueryWriteFn(DoFn):
     self._rows_buffer = []
     # The default batch size is 500
     self._max_batch_size = batch_size or 500
+    self.kms_key = kms_key
 
   def display_data(self):
     return {'table_id': self.table_id,
@@ -639,7 +649,7 @@ class WriteToBigQuery(PTransform):
   def __init__(self, table, dataset=None, project=None, schema=None,
                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=BigQueryDisposition.WRITE_APPEND,
-               batch_size=None, test_client=None):
+               batch_size=None, kms_key=None, test_client=None):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -684,6 +694,8 @@ bigquery_v2_messages.TableSchema`
 
       batch_size (int): Number of rows to be written to BQ per streaming API
         insert.
+      kms_key (str): Experimental. Optional Cloud KMS key name for use when
+        creating new tables.
       test_client: Override the default bigquery client used for testing.
     """
     self.table_reference = bigquery_tools.parse_table_reference(
@@ -694,6 +706,7 @@ bigquery_v2_messages.TableSchema`
         write_disposition)
     self.schema = schema
     self.batch_size = batch_size
+    self.kms_key = kms_key
     self.test_client = test_client
 
   @staticmethod
@@ -786,6 +799,7 @@ bigquery_v2_messages.TableSchema):
         schema=self.get_dict_table_schema(self.schema),
         create_disposition=self.create_disposition,
         write_disposition=self.write_disposition,
+        kms_key=self.kms_key,
         test_client=self.test_client)
     return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn)
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index e011d74..725a684 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -292,6 +292,7 @@ class WriteToBigQuery(unittest.TestCase):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
+        kms_key=None,
         test_client=client)
 
     fn.start_bundle()
@@ -299,11 +300,12 @@ class WriteToBigQuery(unittest.TestCase):
 
   def test_dofn_client_start_bundle_create_called(self):
     client = mock.Mock()
-    client.tables.Get.return_value = None
+    client.tables.Get.side_effect = HttpError(
+        response={'status': 404}, content=None, url=None)
     client.tables.Insert.return_value = bigquery.Table(
         tableReference=bigquery.TableReference(
             projectId='project_id', datasetId='dataset_id', 
tableId='table_id'))
-    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+    create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
     schema = {'fields': [
         {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
@@ -316,6 +318,7 @@ class WriteToBigQuery(unittest.TestCase):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
+        kms_key='kms_key',
         test_client=client)
 
     fn.start_bundle()
@@ -342,6 +345,7 @@ class WriteToBigQuery(unittest.TestCase):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
+        kms_key=None,
         test_client=client)
 
     fn.start_bundle()
@@ -371,6 +375,7 @@ class WriteToBigQuery(unittest.TestCase):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
+        kms_key=None,
         test_client=client)
 
     fn.start_bundle()
@@ -400,6 +405,7 @@ class WriteToBigQuery(unittest.TestCase):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
+        kms_key=None,
         test_client=client)
 
     fn.start_bundle()
@@ -433,6 +439,7 @@ class WriteToBigQuery(unittest.TestCase):
         schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
+        kms_key=None,
         test_client=client)
 
     fn.start_bundle()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index a145ed3..93d5299 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -328,11 +328,21 @@ class BigQueryWrapper(object):
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def _get_table(self, project_id, dataset_id, table_id):
+  def get_table(self, project_id, dataset_id, table_id):
+    """Lookup a table's metadata object.
+
+    Args:
+      client: bigquery.BigqueryV2 instance
+      project_id, dataset_id, table_id: table lookup parameters
+
+    Returns:
+      bigquery.Table instance
+    Raises:
+      HttpError if lookup failed.
+    """
     request = bigquery.BigqueryTablesGetRequest(
         projectId=project_id, datasetId=dataset_id, tableId=table_id)
     response = self.client.tables.Get(request)
-    # The response is a bigquery.Table instance.
     return response
 
   def _create_table(self, project_id, dataset_id, table_id, schema):
@@ -419,7 +429,7 @@ class BigQueryWrapper(object):
       num_retries=MAX_RETRIES,
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def get_table_location(self, project_id, dataset_id, table_id):
-    table = self._get_table(project_id, dataset_id, table_id)
+    table = self.get_table(project_id, dataset_id, table_id)
     return table.location
 
   @retry.with_exponential_backoff(
@@ -495,7 +505,7 @@ class BigQueryWrapper(object):
 
     found_table = None
     try:
-      found_table = self._get_table(project_id, dataset_id, table_id)
+      found_table = self.get_table(project_id, dataset_id, table_id)
     except HttpError as exn:
       if exn.status_code == 404:
         if create_disposition == BigQueryDisposition.CREATE_NEVER:
@@ -696,7 +706,7 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
   """A reader for a BigQuery source."""
 
   def __init__(self, source, test_bigquery_client=None, use_legacy_sql=True,
-               flatten_results=True):
+               flatten_results=True, kms_key=None):
     self.source = source
     self.test_bigquery_client = test_bigquery_client
     if auth.is_running_in_gce:
@@ -720,6 +730,7 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
     self.schema = None
     self.use_legacy_sql = use_legacy_sql
     self.flatten_results = flatten_results
+    self.kms_key = kms_key
 
     if self.source.table_reference is not None:
       # If table schema did not define a project we default to executing
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1d24d10..a1c6b91 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -603,7 +603,8 @@ class DataflowRunner(PipelineRunner):
               transform.table_reference.projectId,
               transform.schema,
               transform.create_disposition,
-              transform.write_disposition))
+              transform.write_disposition,
+              kms_key=transform.kms_key))
 
   def apply_GroupByKey(self, transform, pcoll, options):
     # Infer coder of parent.
@@ -905,6 +906,9 @@ class DataflowRunner(PipelineRunner):
       else:
         raise ValueError('BigQuery source %r must specify either a table or'
                          ' a query' % transform.source)
+      if transform.source.kms_key is not None:
+        step.add_property(
+            PropertyNames.BIGQUERY_KMS_KEY, transform.source.kms_key)
     elif transform.source.format == 'pubsub':
       if not standard_options.streaming:
         raise ValueError('Cloud Pub/Sub is currently available for use '
@@ -1003,6 +1007,9 @@ class DataflowRunner(PipelineRunner):
       if transform.sink.table_schema is not None:
         step.add_property(
             PropertyNames.BIGQUERY_SCHEMA, transform.sink.schema_as_json())
+      if transform.sink.kms_key is not None:
+        step.add_property(
+            PropertyNames.BIGQUERY_KMS_KEY, transform.sink.kms_key)
     elif transform.sink.format == 'pubsub':
       standard_options = options.view_as(StandardOptions)
       if not standard_options.streaming:
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py 
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 17897cc..5670d6f 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -78,13 +78,14 @@ class PropertyNames(object):
   Property strings as they are expected in the CloudWorkflow protos."""
   BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
   BIGQUERY_DATASET = 'dataset'
-  BIGQUERY_QUERY = 'bigquery_query'
-  BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql'
-  BIGQUERY_FLATTEN_RESULTS = 'bigquery_flatten_results'
   BIGQUERY_EXPORT_FORMAT = 'bigquery_export_format'
-  BIGQUERY_TABLE = 'table'
+  BIGQUERY_FLATTEN_RESULTS = 'bigquery_flatten_results'
+  BIGQUERY_KMS_KEY = 'bigquery_kms_key'
   BIGQUERY_PROJECT = 'project'
+  BIGQUERY_QUERY = 'bigquery_query'
   BIGQUERY_SCHEMA = 'schema'
+  BIGQUERY_TABLE = 'table'
+  BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql'
   BIGQUERY_WRITE_DISPOSITION = 'write_disposition'
   DISPLAY_DATA = 'display_data'
   ELEMENT = 'element'

Reply via email to