This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 48fa73e [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID new 1aa3eb5 Merge pull request #12084 from [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID 48fa73e is described below commit 48fa73e51b683148f102187c3a67278118cbb8ac Author: Alex Amato <ajam...@google.com> AuthorDate: Wed Jun 24 17:51:30 2020 -0700 [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID --- sdks/python/apache_beam/internal/http_client.py | 5 +- sdks/python/apache_beam/io/gcp/bigquery.py | 22 ++++-- .../apache_beam/io/gcp/bigquery_file_loads.py | 29 ++++++-- .../apache_beam/io/gcp/bigquery_io_metadata.py | 83 ++++++++++++++++++++++ .../io/gcp/bigquery_io_metadata_test.py | 75 +++++++++++++++++++ sdks/python/apache_beam/io/gcp/bigquery_tools.py | 26 +++++-- .../python/apache_beam/io/gcp/gce_metadata_util.py | 48 +++++++++++++ 7 files changed, 272 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/internal/http_client.py b/sdks/python/apache_beam/internal/http_client.py index 794085d..57d2feb 100644 --- a/sdks/python/apache_beam/internal/http_client.py +++ b/sdks/python/apache_beam/internal/http_client.py @@ -57,7 +57,7 @@ def proxy_info_from_environment_var(proxy_env_var): return httplib2.proxy_info_from_url(proxy_url, method=proxy_protocol) -def get_new_http(): +def get_new_http(timeout_secs=DEFAULT_HTTP_TIMEOUT_SECONDS): """Creates and returns a new httplib2.Http instance. Returns: @@ -69,5 +69,4 @@ def get_new_http(): proxy_info = proxy_info_from_environment_var(proxy_env_var) break # Use a non-infinite SSL timeout to avoid hangs during network flakiness. - return httplib2.Http( - proxy_info=proxy_info, timeout=DEFAULT_HTTP_TIMEOUT_SECONDS) + return httplib2.Http(proxy_info=proxy_info, timeout=timeout_secs) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index ccaca4d..c6b6191 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -254,6 +254,7 @@ from apache_beam.io.avroio import _create_avro_source as create_avro_source from apache_beam.io.filesystems import CompressionTypes from apache_beam.io.filesystems import FileSystems from apache_beam.io.gcp import bigquery_tools +from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.io.iobase import BoundedSource from apache_beam.io.iobase import RangeTracker @@ -637,6 +638,7 @@ class _CustomBigQuerySource(BoundedSource): self.kms_key = kms_key self.split_result = None self.options = pipeline_options + self.bq_io_metadata = None # Populate in setup, as it may make an RPC self.bigquery_job_labels = bigquery_job_labels or {} self.use_json_exports = use_json_exports @@ -649,9 +651,13 @@ class _CustomBigQuerySource(BoundedSource): 'use_legacy_sql': self.use_legacy_sql, 'bigquery_job_labels': json.dumps(self.bigquery_job_labels), 'export_file_format': export_format, + 'launchesBigQueryJobs': DisplayDataItem( + True, label="This Dataflow job launches bigquery jobs."), } def estimate_size(self): + if not self.bq_io_metadata: + self.bq_io_metadata = create_bigquery_io_metadata() bq = bigquery_tools.BigQueryWrapper() if self.table_reference is not None: table_ref = self.table_reference @@ -676,7 +682,8 @@ class _CustomBigQuerySource(BoundedSource): job_id=uuid.uuid4().hex, dry_run=True, kms_key=self.kms_key, - job_labels=self.bigquery_job_labels) + job_labels=self.bq_io_metadata.add_additional_bq_job_labels( + self.bigquery_job_labels)) size = int(job.statistics.totalBytesProcessed) return size else: @@ -747,6 +754,8 @@ class _CustomBigQuerySource(BoundedSource): @check_accessible(['query']) def _execute_query(self, bq): + if not self.bq_io_metadata: + self.bq_io_metadata = create_bigquery_io_metadata() job = bq._start_query_job( self._get_project(), self.query.get(), @@ -754,7 +763,8 @@ class _CustomBigQuerySource(BoundedSource): self.flatten_results, job_id=uuid.uuid4().hex, kms_key=self.kms_key, - job_labels=self.bigquery_job_labels) + job_labels=self.bq_io_metadata.add_additional_bq_job_labels( + self.bigquery_job_labels)) job_ref = job.jobReference bq.wait_for_bq_job(job_ref, max_retries=0) return bq._get_temp_table(self._get_project()) @@ -766,13 +776,17 @@ class _CustomBigQuerySource(BoundedSource): bigquery.TableSchema instance, a list of FileMetadata instances """ job_id = uuid.uuid4().hex + if not self.bq_io_metadata: + self.bq_io_metadata = create_bigquery_io_metadata() + job_labels = self.bq_io_metadata.add_additional_bq_job_labels( + self.bigquery_job_labels) if self.use_json_exports: job_ref = bq.perform_extract_job([self.gcs_location], job_id, self.table_reference, bigquery_tools.FileFormat.JSON, project=self._get_project(), - job_labels=self.bigquery_job_labels, + job_labels=job_labels, include_header=False) else: job_ref = bq.perform_extract_job([self.gcs_location], @@ -781,7 +795,7 @@ class _CustomBigQuerySource(BoundedSource): bigquery_tools.FileFormat.AVRO, project=self._get_project(), include_header=False, - job_labels=self.bigquery_job_labels, + job_labels=job_labels, use_avro_logical_types=True) bq.wait_for_bq_job(job_ref) metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index d039598..7543dca 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -42,9 +42,11 @@ import apache_beam as beam from apache_beam import pvalue from apache_beam.io import filesystems as fs from apache_beam.io.gcp import bigquery_tools +from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata from apache_beam.options import value_provider as vp from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.transforms import trigger +from apache_beam.transforms.display import DisplayDataItem from apache_beam.transforms.window import GlobalWindows _LOGGER = logging.getLogger(__name__) @@ -327,10 +329,19 @@ class TriggerCopyJobs(beam.DoFn): self.write_disposition = write_disposition self.test_client = test_client self._observed_tables = set() + self.bq_io_metadata = None + + def display_data(self): + return { + 'launchesBigQueryJobs': DisplayDataItem( + True, label="This Dataflow job launches bigquery jobs.") + } def start_bundle(self): self._observed_tables = set() self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client) + if not self.bq_io_metadata: + self.bq_io_metadata = create_bigquery_io_metadata() def process(self, element, job_name_prefix=None): destination = element[0] @@ -380,13 +391,16 @@ class TriggerCopyJobs(beam.DoFn): wait_for_job = False write_disposition = 'WRITE_APPEND' + if not self.bq_io_metadata: + self.bq_io_metadata = create_bigquery_io_metadata() job_reference = self.bq_wrapper._insert_copy_job( copy_to_reference.projectId, copy_job_name, copy_from_reference, copy_to_reference, create_disposition=self.create_disposition, - write_disposition=write_disposition) + write_disposition=write_disposition, + job_labels=self.bq_io_metadata.add_additional_bq_job_labels()) if wait_for_job: self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) @@ -416,6 +430,7 @@ class TriggerLoadJobs(beam.DoFn): self.temporary_tables = temporary_tables self.additional_bq_parameters = additional_bq_parameters or {} self.source_format = source_format + self.bq_io_metadata = None if self.temporary_tables: # If we are loading into temporary tables, we rely on the default create # and write dispositions, which mean that a new table will be created. @@ -428,14 +443,17 @@ class TriggerLoadJobs(beam.DoFn): def display_data(self): result = { 'create_disposition': str(self.create_disposition), - 'write_disposition': str(self.write_disposition) + 'write_disposition': str(self.write_disposition), } result['schema'] = str(self.schema) - + result['launchesBigQueryJobs'] = DisplayDataItem( + True, label="This Dataflow job launches bigquery jobs.") return result def start_bundle(self): self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client) + if not self.bq_io_metadata: + self.bq_io_metadata = create_bigquery_io_metadata() def process(self, element, load_job_name_prefix, *schema_side_inputs): # Each load job is assumed to have files respecting these constraints: @@ -493,6 +511,8 @@ class TriggerLoadJobs(beam.DoFn): table_reference, schema, additional_parameters) + if not self.bq_io_metadata: + self.bq_io_metadata = create_bigquery_io_metadata() job_reference = self.bq_wrapper.perform_load_job( table_reference, files, @@ -501,7 +521,8 @@ class TriggerLoadJobs(beam.DoFn): write_disposition=self.write_disposition, create_disposition=create_disposition, additional_load_parameters=additional_parameters, - source_format=self.source_format) + source_format=self.source_format, + job_labels=self.bq_io_metadata.add_additional_bq_job_labels()) yield (destination, job_reference) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py new file mode 100644 index 0000000..c8c1fc2 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels.""" + +# pytype: skip-file + +from __future__ import absolute_import + +import re + +from apache_beam.io.gcp import gce_metadata_util + +_VALID_CLOUD_LABEL_PATTERN = re.compile(r'^[a-z0-9\_\-]{1,63}$') + + +def _is_valid_cloud_label_value(label_value): + """Returns true if label_value is a valid cloud label string. + + This function can return false in cases where the label value is valid. + However, it will not return true in a case where the lavel value is invalid. + This is because a stricter set of allowed characters is used in this + validator, because foreign language characters are not accepted. + Thus, this should not be used as a generic validator for all cloud labels. + + See Also: + https://cloud.google.com/compute/docs/labeling-resources + + Args: + label_value: The label value to validate. + + Returns: + True if the label value is a valid + """ + return _VALID_CLOUD_LABEL_PATTERN.match(label_value) + + +def create_bigquery_io_metadata(): + """Creates a BigQueryIOMetadata. + + This will request metadata properly based on which runner is being used. + """ + dataflow_job_id = gce_metadata_util.fetch_dataflow_job_id() + # If a dataflow_job id is returned on GCE metadata. Then it means + # This program is running on a Dataflow GCE VM. + is_dataflow_runner = bool(dataflow_job_id) + kwargs = {} + if is_dataflow_runner: + # Only use this label if it is validated already. + # As we do not want a bad label to fail the BQ job. + if _is_valid_cloud_label_value(dataflow_job_id): + kwargs['beam_job_id'] = dataflow_job_id + return BigQueryIOMetadata(**kwargs) + + +class BigQueryIOMetadata(object): + """Metadata class for BigQueryIO. i.e. to use as BQ job labels. + + Do not construct directly, use the create_bigquery_io_metadata factory. + Which will request metadata properly based on which runner is being used. + """ + def __init__(self, beam_job_id=None): + self.beam_job_id = beam_job_id + + def add_additional_bq_job_labels(self, job_labels=None): + job_labels = job_labels or {} + if self.beam_job_id and 'beam_job_id' not in job_labels: + job_labels['beam_job_id'] = self.beam_job_id + return job_labels diff --git a/sdks/python/apache_beam/io/gcp/bigquery_io_metadata_test.py b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata_test.py new file mode 100644 index 0000000..26ee669 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata_test.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for bigquery_io_metadata.""" + +# pytype: skip-file + +from __future__ import absolute_import + +import logging +import unittest + +from apache_beam.io.gcp import bigquery_io_metadata + + +class BigqueryIoMetadataTest(unittest.TestCase): + def test_is_valid_cloud_label_value(self): + # A dataflow job ID. + # Lowercase letters, numbers, underscores and hyphens are allowed. + test_str = '2020-06-29_15_26_09-12838749047888422749' + self.assertTrue(bigquery_io_metadata._is_valid_cloud_label_value(test_str)) + + # At least one character. + test_str = '0' + self.assertTrue(bigquery_io_metadata._is_valid_cloud_label_value(test_str)) + + # Up to 63 characters. + test_str = '0123456789abcdefghij0123456789abcdefghij0123456789abcdefghij012' + self.assertTrue(bigquery_io_metadata._is_valid_cloud_label_value(test_str)) + + # Lowercase letters allowed + test_str = 'abcdefghijklmnopqrstuvwxyz' + for test_char in test_str: + self.assertTrue( + bigquery_io_metadata._is_valid_cloud_label_value(test_char)) + + # Empty strings not allowed. + test_str = '' + self.assertFalse(bigquery_io_metadata._is_valid_cloud_label_value(test_str)) + + # 64 or more characters not allowed. + test_str = ( + '0123456789abcdefghij0123456789abcdefghij0123456789abcdefghij0123') + self.assertFalse(bigquery_io_metadata._is_valid_cloud_label_value(test_str)) + + # Uppercase letters not allowed + test_str = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + for test_char in test_str: + self.assertFalse( + bigquery_io_metadata._is_valid_cloud_label_value(test_char)) + + # Special characters besides hyphens are not allowed + test_str = '!@#$%^&*()+=[{]};:\'\"\\|,<.>?/`~' + for test_char in test_str: + self.assertFalse( + bigquery_io_metadata._is_valid_cloud_label_value(test_char)) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 12c5dc0..504f530 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -51,6 +51,7 @@ from apache_beam.internal.gcp.json_value import from_json_value from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.internal.http_client import get_new_http from apache_beam.io.gcp import bigquery_avro_tools +from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.options import value_provider from apache_beam.options.pipeline_options import GoogleCloudOptions @@ -697,7 +698,8 @@ class BigQueryWrapper(object): write_disposition=None, create_disposition=None, additional_load_parameters=None, - source_format=None): + source_format=None, + job_labels=None): """Starts a job to load data into BigQuery. Returns: @@ -712,7 +714,8 @@ class BigQueryWrapper(object): create_disposition=create_disposition, write_disposition=write_disposition, additional_load_parameters=additional_load_parameters, - source_format=source_format) + source_format=source_format, + job_labels=job_labels) @retry.with_exponential_backoff( num_retries=MAX_RETRIES, @@ -865,14 +868,21 @@ class BigQueryWrapper(object): return created_table def run_query( - self, project_id, query, use_legacy_sql, flatten_results, dry_run=False): + self, + project_id, + query, + use_legacy_sql, + flatten_results, + dry_run=False, + job_labels=None): job = self._start_query_job( project_id, query, use_legacy_sql, flatten_results, job_id=uuid.uuid4().hex, - dry_run=dry_run) + dry_run=dry_run, + job_labels=job_labels) job_id = job.jobReference.jobId location = job.jobReference.location @@ -1064,6 +1074,8 @@ class BigQueryReader(dataflow_io.NativeSourceReader): self.use_legacy_sql = use_legacy_sql self.flatten_results = flatten_results self.kms_key = kms_key + self.bigquery_job_labels = {} + self.bq_io_metadata = None if self.source.table_reference is not None: # If table schema did not define a project we default to executing @@ -1118,10 +1130,14 @@ class BigQueryReader(dataflow_io.NativeSourceReader): self.client.clean_up_temporary_dataset(self.executing_project) def __iter__(self): + if not self.bq_io_metadata: + self.bq_io_metadata = create_bigquery_io_metadata() for rows, schema in self.client.run_query( project_id=self.executing_project, query=self.query, use_legacy_sql=self.use_legacy_sql, - flatten_results=self.flatten_results): + flatten_results=self.flatten_results, + job_labels=self.bq_io_metadata.add_additional_bq_job_labels( + self.bigquery_job_labels)): if self.schema is None: self.schema = schema for row in rows: diff --git a/sdks/python/apache_beam/io/gcp/gce_metadata_util.py b/sdks/python/apache_beam/io/gcp/gce_metadata_util.py new file mode 100644 index 0000000..40f1745 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/gce_metadata_util.py @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Fetches GCE metadata if the calling process is running on a GCE VM.""" + +# pytype: skip-file + +from __future__ import absolute_import + +import requests + +BASE_METADATA_URL = "http://metadata/computeMetadata/v1/" + + +def _fetch_metadata(key): + try: + headers = {"Metadata-Flavor": "Google"} + uri = BASE_METADATA_URL + key + resp = requests.get(uri, headers=headers, timeout=5) # 5 seconds. + if resp.status_code == 200: + return resp.text + except requests.exceptions.RequestException: + # Silently fail, may mean its running on a non DataflowRunner, + # in which case it's prefectly normal. + pass + return "" + + +def _fetch_custom_gce_metadata(customMetadataKey): + return _fetch_metadata("instance/attributes/" + customMetadataKey) + + +def fetch_dataflow_job_id(): + return _fetch_custom_gce_metadata("job_id")