This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new aba546d [BEAM-6658] Add kms_key to BigQuery transforms, pass to Dataflow (#7824) aba546d is described below commit aba546d2d774e22aaeb9d8a24d37bc12ad0241f4 Author: Udi Meiri <u...@users.noreply.github.com> AuthorDate: Tue Feb 12 17:42:10 2019 -0800 [BEAM-6658] Add kms_key to BigQuery transforms, pass to Dataflow (#7824) * [BEAM-6658] Add kms_key to BigQuery transforms, pass to Dataflow * Disable IT until Dataflow supports the new property. --- .../io/gcp/big_query_query_to_table_it_test.py | 27 ++++++++++++++++++++++ .../io/gcp/big_query_query_to_table_pipeline.py | 9 ++++++-- sdks/python/apache_beam/io/gcp/bigquery.py | 24 +++++++++++++++---- sdks/python/apache_beam/io/gcp/bigquery_test.py | 11 +++++++-- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 21 +++++++++++++---- .../runners/dataflow/dataflow_runner.py | 9 +++++++- .../apache_beam/runners/dataflow/internal/names.py | 9 ++++---- 7 files changed, 91 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py index fe862e3..43db185 100644 --- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py +++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py @@ -64,6 +64,8 @@ NEW_TYPES_QUERY = ( DIALECT_OUTPUT_SCHEMA = ('{"fields": [{"name": "fruit","type": "STRING"}]}') DIALECT_OUTPUT_VERIFY_QUERY = ('SELECT fruit from `%s`;') DIALECT_OUTPUT_EXPECTED = [(u'apple',), (u'orange',)] +KMS_KEY = 'projects/apache-beam-testing/locations/global/keyRings/beam-it/' \ + 'cryptoKeys/test' class BigQueryQueryToTableIT(unittest.TestCase): @@ -150,6 +152,31 @@ class BigQueryQueryToTableIT(unittest.TestCase): options = self.test_pipeline.get_full_options_as_args(**extra_opts) big_query_query_to_table_pipeline.run_bq_pipeline(options) + # TODO(BEAM-6660): Enable this test when ready. + @unittest.skip('This test requires BQ Dataflow native source support for ' + + 'KMS, which is not available yet.') + @attr('IT') + def test_big_query_standard_sql_kms_key(self): + verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table + expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED) + pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher( + project=self.project, + query=verify_query, + checksum=expected_checksum)] + extra_opts = {'query': STANDARD_QUERY, + 'output': self.output_table, + 'output_schema': DIALECT_OUTPUT_SCHEMA, + 'use_standard_sql': True, + 'on_success_matcher': all_of(*pipeline_verifiers), + 'kms_key': KMS_KEY + } + options = self.test_pipeline.get_full_options_as_args(**extra_opts) + big_query_query_to_table_pipeline.run_bq_pipeline(options) + + table = self.bigquery_client.get_table( + self.project, self.dataset_id, 'output_table') + self.assertEqual(KMS_KEY, table.encryptionConfiguration.kmsKeyName) + @attr('IT') def test_big_query_new_types(self): expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED) diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py index d3ec694..26b418a 100644 --- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py +++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py @@ -48,21 +48,26 @@ def run_bq_pipeline(argv=None): parser.add_argument('--use_standard_sql', action='store_true', dest='use_standard_sql', help='Output BQ table to write results to.') + parser.add_argument('--kms_key', default=None, + help='Use this Cloud KMS key with BigQuery.') known_args, pipeline_args = parser.parse_known_args(argv) table_schema = parse_table_schema_from_json(known_args.output_schema) + kms_key = known_args.kms_key p = TestPipeline(options=PipelineOptions(pipeline_args)) # pylint: disable=expression-not-assigned # pylint: disable=bad-continuation (p | 'read' >> beam.io.Read(beam.io.BigQuerySource( - query=known_args.query, use_standard_sql=known_args.use_standard_sql)) + query=known_args.query, use_standard_sql=known_args.use_standard_sql, + kms_key=kms_key)) | 'write' >> beam.io.Write(beam.io.BigQuerySink( known_args.output, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))) + write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY, + kms_key=known_args.kms_key))) result = p.run() result.wait_until_finish() diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index c3a5814..8f3011b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -264,7 +264,7 @@ class BigQuerySource(dataflow_io.NativeSource): def __init__(self, table=None, dataset=None, project=None, query=None, validate=False, coder=None, use_standard_sql=False, - flatten_results=True): + flatten_results=True, kms_key=None): """Initialize a :class:`BigQuerySource`. Args: @@ -301,6 +301,8 @@ class BigQuerySource(dataflow_io.NativeSource): This parameter is ignored for table inputs. flatten_results (bool): Flattens all nested and repeated fields in the query results. The default value is :data:`True`. + kms_key (str): Experimental. Optional Cloud KMS key name for use when + creating new tables. Raises: ~exceptions.ValueError: if any of the following is true: @@ -338,6 +340,7 @@ class BigQuerySource(dataflow_io.NativeSource): self.validate = validate self.flatten_results = flatten_results self.coder = coder or bigquery_tools.RowAsDictJsonCoder() + self.kms_key = kms_key def display_data(self): if self.query is not None: @@ -366,7 +369,8 @@ class BigQuerySource(dataflow_io.NativeSource): source=self, test_bigquery_client=test_bigquery_client, use_legacy_sql=self.use_legacy_sql, - flatten_results=self.flatten_results) + flatten_results=self.flatten_results, + kms_key=self.kms_key) class BigQuerySink(dataflow_io.NativeSink): @@ -381,7 +385,7 @@ class BigQuerySink(dataflow_io.NativeSink): def __init__(self, table, dataset=None, project=None, schema=None, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_EMPTY, - validate=False, coder=None): + validate=False, coder=None, kms_key=None): """Initialize a BigQuerySink. Args: @@ -434,6 +438,8 @@ bigquery_v2_messages.TableSchema` object or a single string of the form that will be JSON serialized as a line in a file. This argument needs a value only in special cases when writing table rows as dictionaries is not desirable. + kms_key (str): Experimental. Optional Cloud KMS key name for use when + creating new tables. Raises: ~exceptions.TypeError: if the schema argument is not a :class:`str` or a @@ -481,6 +487,7 @@ bigquery_v2_messages.TableSchema` object. write_disposition) self.validate = validate self.coder = coder or bigquery_tools.RowAsDictJsonCoder() + self.kms_key = kms_key def display_data(self): res = {} @@ -531,7 +538,7 @@ class BigQueryWriteFn(DoFn): """ def __init__(self, table_id, dataset_id, project_id, batch_size, schema, - create_disposition, write_disposition, test_client): + create_disposition, write_disposition, kms_key, test_client): """Initialize a WriteToBigQuery transform. Args: @@ -561,6 +568,8 @@ class BigQueryWriteFn(DoFn): - BigQueryDisposition.WRITE_APPEND: add to existing rows. - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty. For streaming pipelines WriteTruncate can not be used. + kms_key: Experimental. Optional Cloud KMS key name for use when creating + new tables. test_client: Override the default bigquery client used for testing. """ self.table_id = table_id @@ -573,6 +582,7 @@ class BigQueryWriteFn(DoFn): self._rows_buffer = [] # The default batch size is 500 self._max_batch_size = batch_size or 500 + self.kms_key = kms_key def display_data(self): return {'table_id': self.table_id, @@ -639,7 +649,7 @@ class WriteToBigQuery(PTransform): def __init__(self, table, dataset=None, project=None, schema=None, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_APPEND, - batch_size=None, test_client=None): + batch_size=None, kms_key=None, test_client=None): """Initialize a WriteToBigQuery transform. Args: @@ -684,6 +694,8 @@ bigquery_v2_messages.TableSchema` batch_size (int): Number of rows to be written to BQ per streaming API insert. + kms_key (str): Experimental. Optional Cloud KMS key name for use when + creating new tables. test_client: Override the default bigquery client used for testing. """ self.table_reference = bigquery_tools.parse_table_reference( @@ -694,6 +706,7 @@ bigquery_v2_messages.TableSchema` write_disposition) self.schema = schema self.batch_size = batch_size + self.kms_key = kms_key self.test_client = test_client @staticmethod @@ -786,6 +799,7 @@ bigquery_v2_messages.TableSchema): schema=self.get_dict_table_schema(self.schema), create_disposition=self.create_disposition, write_disposition=self.write_disposition, + kms_key=self.kms_key, test_client=self.test_client) return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index e011d74..725a684 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -292,6 +292,7 @@ class WriteToBigQuery(unittest.TestCase): schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, + kms_key=None, test_client=client) fn.start_bundle() @@ -299,11 +300,12 @@ class WriteToBigQuery(unittest.TestCase): def test_dofn_client_start_bundle_create_called(self): client = mock.Mock() - client.tables.Get.return_value = None + client.tables.Get.side_effect = HttpError( + response={'status': 404}, content=None, url=None) client.tables.Insert.return_value = bigquery.Table( tableReference=bigquery.TableReference( projectId='project_id', datasetId='dataset_id', tableId='table_id')) - create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER + create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND schema = {'fields': [ {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} @@ -316,6 +318,7 @@ class WriteToBigQuery(unittest.TestCase): schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, + kms_key='kms_key', test_client=client) fn.start_bundle() @@ -342,6 +345,7 @@ class WriteToBigQuery(unittest.TestCase): schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, + kms_key=None, test_client=client) fn.start_bundle() @@ -371,6 +375,7 @@ class WriteToBigQuery(unittest.TestCase): schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, + kms_key=None, test_client=client) fn.start_bundle() @@ -400,6 +405,7 @@ class WriteToBigQuery(unittest.TestCase): schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, + kms_key=None, test_client=client) fn.start_bundle() @@ -433,6 +439,7 @@ class WriteToBigQuery(unittest.TestCase): schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, + kms_key=None, test_client=client) fn.start_bundle() diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index a145ed3..93d5299 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -328,11 +328,21 @@ class BigQueryWrapper(object): @retry.with_exponential_backoff( num_retries=MAX_RETRIES, retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def _get_table(self, project_id, dataset_id, table_id): + def get_table(self, project_id, dataset_id, table_id): + """Lookup a table's metadata object. + + Args: + client: bigquery.BigqueryV2 instance + project_id, dataset_id, table_id: table lookup parameters + + Returns: + bigquery.Table instance + Raises: + HttpError if lookup failed. + """ request = bigquery.BigqueryTablesGetRequest( projectId=project_id, datasetId=dataset_id, tableId=table_id) response = self.client.tables.Get(request) - # The response is a bigquery.Table instance. return response def _create_table(self, project_id, dataset_id, table_id, schema): @@ -419,7 +429,7 @@ class BigQueryWrapper(object): num_retries=MAX_RETRIES, retry_filter=retry.retry_on_server_errors_and_timeout_filter) def get_table_location(self, project_id, dataset_id, table_id): - table = self._get_table(project_id, dataset_id, table_id) + table = self.get_table(project_id, dataset_id, table_id) return table.location @retry.with_exponential_backoff( @@ -495,7 +505,7 @@ class BigQueryWrapper(object): found_table = None try: - found_table = self._get_table(project_id, dataset_id, table_id) + found_table = self.get_table(project_id, dataset_id, table_id) except HttpError as exn: if exn.status_code == 404: if create_disposition == BigQueryDisposition.CREATE_NEVER: @@ -696,7 +706,7 @@ class BigQueryReader(dataflow_io.NativeSourceReader): """A reader for a BigQuery source.""" def __init__(self, source, test_bigquery_client=None, use_legacy_sql=True, - flatten_results=True): + flatten_results=True, kms_key=None): self.source = source self.test_bigquery_client = test_bigquery_client if auth.is_running_in_gce: @@ -720,6 +730,7 @@ class BigQueryReader(dataflow_io.NativeSourceReader): self.schema = None self.use_legacy_sql = use_legacy_sql self.flatten_results = flatten_results + self.kms_key = kms_key if self.source.table_reference is not None: # If table schema did not define a project we default to executing diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 1d24d10..a1c6b91 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -603,7 +603,8 @@ class DataflowRunner(PipelineRunner): transform.table_reference.projectId, transform.schema, transform.create_disposition, - transform.write_disposition)) + transform.write_disposition, + kms_key=transform.kms_key)) def apply_GroupByKey(self, transform, pcoll, options): # Infer coder of parent. @@ -905,6 +906,9 @@ class DataflowRunner(PipelineRunner): else: raise ValueError('BigQuery source %r must specify either a table or' ' a query' % transform.source) + if transform.source.kms_key is not None: + step.add_property( + PropertyNames.BIGQUERY_KMS_KEY, transform.source.kms_key) elif transform.source.format == 'pubsub': if not standard_options.streaming: raise ValueError('Cloud Pub/Sub is currently available for use ' @@ -1003,6 +1007,9 @@ class DataflowRunner(PipelineRunner): if transform.sink.table_schema is not None: step.add_property( PropertyNames.BIGQUERY_SCHEMA, transform.sink.schema_as_json()) + if transform.sink.kms_key is not None: + step.add_property( + PropertyNames.BIGQUERY_KMS_KEY, transform.sink.kms_key) elif transform.sink.format == 'pubsub': standard_options = options.view_as(StandardOptions) if not standard_options.streaming: diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 17897cc..5670d6f 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -78,13 +78,14 @@ class PropertyNames(object): Property strings as they are expected in the CloudWorkflow protos.""" BIGQUERY_CREATE_DISPOSITION = 'create_disposition' BIGQUERY_DATASET = 'dataset' - BIGQUERY_QUERY = 'bigquery_query' - BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql' - BIGQUERY_FLATTEN_RESULTS = 'bigquery_flatten_results' BIGQUERY_EXPORT_FORMAT = 'bigquery_export_format' - BIGQUERY_TABLE = 'table' + BIGQUERY_FLATTEN_RESULTS = 'bigquery_flatten_results' + BIGQUERY_KMS_KEY = 'bigquery_kms_key' BIGQUERY_PROJECT = 'project' + BIGQUERY_QUERY = 'bigquery_query' BIGQUERY_SCHEMA = 'schema' + BIGQUERY_TABLE = 'table' + BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql' BIGQUERY_WRITE_DISPOSITION = 'write_disposition' DISPLAY_DATA = 'display_data' ELEMENT = 'element'