This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 48fa73e  [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs 
with the Dataflow Job ID
     new 1aa3eb5  Merge pull request #12084 from [BEAM-10317] Python - Update 
BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
48fa73e is described below

commit 48fa73e51b683148f102187c3a67278118cbb8ac
Author: Alex Amato <ajam...@google.com>
AuthorDate: Wed Jun 24 17:51:30 2020 -0700

    [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the 
Dataflow Job ID
---
 sdks/python/apache_beam/internal/http_client.py    |  5 +-
 sdks/python/apache_beam/io/gcp/bigquery.py         | 22 ++++--
 .../apache_beam/io/gcp/bigquery_file_loads.py      | 29 ++++++--
 .../apache_beam/io/gcp/bigquery_io_metadata.py     | 83 ++++++++++++++++++++++
 .../io/gcp/bigquery_io_metadata_test.py            | 75 +++++++++++++++++++
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 26 +++++--
 .../python/apache_beam/io/gcp/gce_metadata_util.py | 48 +++++++++++++
 7 files changed, 272 insertions(+), 16 deletions(-)

diff --git a/sdks/python/apache_beam/internal/http_client.py 
b/sdks/python/apache_beam/internal/http_client.py
index 794085d..57d2feb 100644
--- a/sdks/python/apache_beam/internal/http_client.py
+++ b/sdks/python/apache_beam/internal/http_client.py
@@ -57,7 +57,7 @@ def proxy_info_from_environment_var(proxy_env_var):
   return httplib2.proxy_info_from_url(proxy_url, method=proxy_protocol)
 
 
-def get_new_http():
+def get_new_http(timeout_secs=DEFAULT_HTTP_TIMEOUT_SECONDS):
   """Creates and returns a new httplib2.Http instance.
 
   Returns:
@@ -69,5 +69,4 @@ def get_new_http():
       proxy_info = proxy_info_from_environment_var(proxy_env_var)
       break
   # Use a non-infinite SSL timeout to avoid hangs during network flakiness.
-  return httplib2.Http(
-      proxy_info=proxy_info, timeout=DEFAULT_HTTP_TIMEOUT_SECONDS)
+  return httplib2.Http(proxy_info=proxy_info, timeout=timeout_secs)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index ccaca4d..c6b6191 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -254,6 +254,7 @@ from apache_beam.io.avroio import _create_avro_source as 
create_avro_source
 from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.io.gcp import bigquery_tools
+from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.io.iobase import BoundedSource
 from apache_beam.io.iobase import RangeTracker
@@ -637,6 +638,7 @@ class _CustomBigQuerySource(BoundedSource):
     self.kms_key = kms_key
     self.split_result = None
     self.options = pipeline_options
+    self.bq_io_metadata = None  # Populate in setup, as it may make an RPC
     self.bigquery_job_labels = bigquery_job_labels or {}
     self.use_json_exports = use_json_exports
 
@@ -649,9 +651,13 @@ class _CustomBigQuerySource(BoundedSource):
         'use_legacy_sql': self.use_legacy_sql,
         'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
         'export_file_format': export_format,
+        'launchesBigQueryJobs': DisplayDataItem(
+            True, label="This Dataflow job launches bigquery jobs."),
     }
 
   def estimate_size(self):
+    if not self.bq_io_metadata:
+      self.bq_io_metadata = create_bigquery_io_metadata()
     bq = bigquery_tools.BigQueryWrapper()
     if self.table_reference is not None:
       table_ref = self.table_reference
@@ -676,7 +682,8 @@ class _CustomBigQuerySource(BoundedSource):
           job_id=uuid.uuid4().hex,
           dry_run=True,
           kms_key=self.kms_key,
-          job_labels=self.bigquery_job_labels)
+          job_labels=self.bq_io_metadata.add_additional_bq_job_labels(
+              self.bigquery_job_labels))
       size = int(job.statistics.totalBytesProcessed)
       return size
     else:
@@ -747,6 +754,8 @@ class _CustomBigQuerySource(BoundedSource):
 
   @check_accessible(['query'])
   def _execute_query(self, bq):
+    if not self.bq_io_metadata:
+      self.bq_io_metadata = create_bigquery_io_metadata()
     job = bq._start_query_job(
         self._get_project(),
         self.query.get(),
@@ -754,7 +763,8 @@ class _CustomBigQuerySource(BoundedSource):
         self.flatten_results,
         job_id=uuid.uuid4().hex,
         kms_key=self.kms_key,
-        job_labels=self.bigquery_job_labels)
+        job_labels=self.bq_io_metadata.add_additional_bq_job_labels(
+            self.bigquery_job_labels))
     job_ref = job.jobReference
     bq.wait_for_bq_job(job_ref, max_retries=0)
     return bq._get_temp_table(self._get_project())
@@ -766,13 +776,17 @@ class _CustomBigQuerySource(BoundedSource):
       bigquery.TableSchema instance, a list of FileMetadata instances
     """
     job_id = uuid.uuid4().hex
+    if not self.bq_io_metadata:
+      self.bq_io_metadata = create_bigquery_io_metadata()
+    job_labels = self.bq_io_metadata.add_additional_bq_job_labels(
+        self.bigquery_job_labels)
     if self.use_json_exports:
       job_ref = bq.perform_extract_job([self.gcs_location],
                                        job_id,
                                        self.table_reference,
                                        bigquery_tools.FileFormat.JSON,
                                        project=self._get_project(),
-                                       job_labels=self.bigquery_job_labels,
+                                       job_labels=job_labels,
                                        include_header=False)
     else:
       job_ref = bq.perform_extract_job([self.gcs_location],
@@ -781,7 +795,7 @@ class _CustomBigQuerySource(BoundedSource):
                                        bigquery_tools.FileFormat.AVRO,
                                        project=self._get_project(),
                                        include_header=False,
-                                       job_labels=self.bigquery_job_labels,
+                                       job_labels=job_labels,
                                        use_avro_logical_types=True)
     bq.wait_for_bq_job(job_ref)
     metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index d039598..7543dca 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -42,9 +42,11 @@ import apache_beam as beam
 from apache_beam import pvalue
 from apache_beam.io import filesystems as fs
 from apache_beam.io.gcp import bigquery_tools
+from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
 from apache_beam.options import value_provider as vp
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.transforms import trigger
+from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.transforms.window import GlobalWindows
 
 _LOGGER = logging.getLogger(__name__)
@@ -327,10 +329,19 @@ class TriggerCopyJobs(beam.DoFn):
     self.write_disposition = write_disposition
     self.test_client = test_client
     self._observed_tables = set()
+    self.bq_io_metadata = None
+
+  def display_data(self):
+    return {
+        'launchesBigQueryJobs': DisplayDataItem(
+            True, label="This Dataflow job launches bigquery jobs.")
+    }
 
   def start_bundle(self):
     self._observed_tables = set()
     self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
+    if not self.bq_io_metadata:
+      self.bq_io_metadata = create_bigquery_io_metadata()
 
   def process(self, element, job_name_prefix=None):
     destination = element[0]
@@ -380,13 +391,16 @@ class TriggerCopyJobs(beam.DoFn):
       wait_for_job = False
       write_disposition = 'WRITE_APPEND'
 
+    if not self.bq_io_metadata:
+      self.bq_io_metadata = create_bigquery_io_metadata()
     job_reference = self.bq_wrapper._insert_copy_job(
         copy_to_reference.projectId,
         copy_job_name,
         copy_from_reference,
         copy_to_reference,
         create_disposition=self.create_disposition,
-        write_disposition=write_disposition)
+        write_disposition=write_disposition,
+        job_labels=self.bq_io_metadata.add_additional_bq_job_labels())
 
     if wait_for_job:
       self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
@@ -416,6 +430,7 @@ class TriggerLoadJobs(beam.DoFn):
     self.temporary_tables = temporary_tables
     self.additional_bq_parameters = additional_bq_parameters or {}
     self.source_format = source_format
+    self.bq_io_metadata = None
     if self.temporary_tables:
       # If we are loading into temporary tables, we rely on the default create
       # and write dispositions, which mean that a new table will be created.
@@ -428,14 +443,17 @@ class TriggerLoadJobs(beam.DoFn):
   def display_data(self):
     result = {
         'create_disposition': str(self.create_disposition),
-        'write_disposition': str(self.write_disposition)
+        'write_disposition': str(self.write_disposition),
     }
     result['schema'] = str(self.schema)
-
+    result['launchesBigQueryJobs'] = DisplayDataItem(
+        True, label="This Dataflow job launches bigquery jobs.")
     return result
 
   def start_bundle(self):
     self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
+    if not self.bq_io_metadata:
+      self.bq_io_metadata = create_bigquery_io_metadata()
 
   def process(self, element, load_job_name_prefix, *schema_side_inputs):
     # Each load job is assumed to have files respecting these constraints:
@@ -493,6 +511,8 @@ class TriggerLoadJobs(beam.DoFn):
         table_reference,
         schema,
         additional_parameters)
+    if not self.bq_io_metadata:
+      self.bq_io_metadata = create_bigquery_io_metadata()
     job_reference = self.bq_wrapper.perform_load_job(
         table_reference,
         files,
@@ -501,7 +521,8 @@ class TriggerLoadJobs(beam.DoFn):
         write_disposition=self.write_disposition,
         create_disposition=create_disposition,
         additional_load_parameters=additional_parameters,
-        source_format=self.source_format)
+        source_format=self.source_format,
+        job_labels=self.bq_io_metadata.add_additional_bq_job_labels())
     yield (destination, job_reference)
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py 
b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
new file mode 100644
index 0000000..c8c1fc2
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import re
+
+from apache_beam.io.gcp import gce_metadata_util
+
+_VALID_CLOUD_LABEL_PATTERN = re.compile(r'^[a-z0-9\_\-]{1,63}$')
+
+
+def _is_valid_cloud_label_value(label_value):
+  """Returns true if label_value is a valid cloud label string.
+
+    This function can return false in cases where the label value is valid.
+    However, it will not return true in a case where the lavel value is 
invalid.
+    This is because a stricter set of allowed characters is used in this
+    validator, because foreign language characters are not accepted.
+    Thus, this should not be used as a generic validator for all cloud labels.
+
+    See Also:
+      https://cloud.google.com/compute/docs/labeling-resources
+
+    Args:
+      label_value: The label value to validate.
+
+    Returns:
+      True if the label value is a valid
+  """
+  return _VALID_CLOUD_LABEL_PATTERN.match(label_value)
+
+
+def create_bigquery_io_metadata():
+  """Creates a BigQueryIOMetadata.
+
+  This will request metadata properly based on which runner is being used.
+  """
+  dataflow_job_id = gce_metadata_util.fetch_dataflow_job_id()
+  # If a dataflow_job id is returned on GCE metadata. Then it means
+  # This program is running on a Dataflow GCE VM.
+  is_dataflow_runner = bool(dataflow_job_id)
+  kwargs = {}
+  if is_dataflow_runner:
+    # Only use this label if it is validated already.
+    # As we do not want a bad label to fail the BQ job.
+    if _is_valid_cloud_label_value(dataflow_job_id):
+      kwargs['beam_job_id'] = dataflow_job_id
+  return BigQueryIOMetadata(**kwargs)
+
+
+class BigQueryIOMetadata(object):
+  """Metadata class for BigQueryIO. i.e. to use as BQ job labels.
+
+  Do not construct directly, use the create_bigquery_io_metadata factory.
+  Which will request metadata properly based on which runner is being used.
+  """
+  def __init__(self, beam_job_id=None):
+    self.beam_job_id = beam_job_id
+
+  def add_additional_bq_job_labels(self, job_labels=None):
+    job_labels = job_labels or {}
+    if self.beam_job_id and 'beam_job_id' not in job_labels:
+      job_labels['beam_job_id'] = self.beam_job_id
+    return job_labels
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_io_metadata_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata_test.py
new file mode 100644
index 0000000..26ee669
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata_test.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for bigquery_io_metadata."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import unittest
+
+from apache_beam.io.gcp import bigquery_io_metadata
+
+
+class BigqueryIoMetadataTest(unittest.TestCase):
+  def test_is_valid_cloud_label_value(self):
+    # A dataflow job ID.
+    # Lowercase letters, numbers, underscores and hyphens are allowed.
+    test_str = '2020-06-29_15_26_09-12838749047888422749'
+    self.assertTrue(bigquery_io_metadata._is_valid_cloud_label_value(test_str))
+
+    # At least one character.
+    test_str = '0'
+    self.assertTrue(bigquery_io_metadata._is_valid_cloud_label_value(test_str))
+
+    # Up to 63 characters.
+    test_str = 
'0123456789abcdefghij0123456789abcdefghij0123456789abcdefghij012'
+    self.assertTrue(bigquery_io_metadata._is_valid_cloud_label_value(test_str))
+
+    # Lowercase letters allowed
+    test_str = 'abcdefghijklmnopqrstuvwxyz'
+    for test_char in test_str:
+      self.assertTrue(
+          bigquery_io_metadata._is_valid_cloud_label_value(test_char))
+
+    # Empty strings not allowed.
+    test_str = ''
+    
self.assertFalse(bigquery_io_metadata._is_valid_cloud_label_value(test_str))
+
+    # 64 or more characters not allowed.
+    test_str = (
+        '0123456789abcdefghij0123456789abcdefghij0123456789abcdefghij0123')
+    
self.assertFalse(bigquery_io_metadata._is_valid_cloud_label_value(test_str))
+
+    # Uppercase letters not allowed
+    test_str = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
+    for test_char in test_str:
+      self.assertFalse(
+          bigquery_io_metadata._is_valid_cloud_label_value(test_char))
+
+    # Special characters besides hyphens are not allowed
+    test_str = '!@#$%^&*()+=[{]};:\'\"\\|,<.>?/`~'
+    for test_char in test_str:
+      self.assertFalse(
+          bigquery_io_metadata._is_valid_cloud_label_value(test_char))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 12c5dc0..504f530 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -51,6 +51,7 @@ from apache_beam.internal.gcp.json_value import 
from_json_value
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.gcp import bigquery_avro_tools
+from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options import value_provider
 from apache_beam.options.pipeline_options import GoogleCloudOptions
@@ -697,7 +698,8 @@ class BigQueryWrapper(object):
       write_disposition=None,
       create_disposition=None,
       additional_load_parameters=None,
-      source_format=None):
+      source_format=None,
+      job_labels=None):
     """Starts a job to load data into BigQuery.
 
     Returns:
@@ -712,7 +714,8 @@ class BigQueryWrapper(object):
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         additional_load_parameters=additional_load_parameters,
-        source_format=source_format)
+        source_format=source_format,
+        job_labels=job_labels)
 
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
@@ -865,14 +868,21 @@ class BigQueryWrapper(object):
         return created_table
 
   def run_query(
-      self, project_id, query, use_legacy_sql, flatten_results, dry_run=False):
+      self,
+      project_id,
+      query,
+      use_legacy_sql,
+      flatten_results,
+      dry_run=False,
+      job_labels=None):
     job = self._start_query_job(
         project_id,
         query,
         use_legacy_sql,
         flatten_results,
         job_id=uuid.uuid4().hex,
-        dry_run=dry_run)
+        dry_run=dry_run,
+        job_labels=job_labels)
     job_id = job.jobReference.jobId
     location = job.jobReference.location
 
@@ -1064,6 +1074,8 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
     self.use_legacy_sql = use_legacy_sql
     self.flatten_results = flatten_results
     self.kms_key = kms_key
+    self.bigquery_job_labels = {}
+    self.bq_io_metadata = None
 
     if self.source.table_reference is not None:
       # If table schema did not define a project we default to executing
@@ -1118,10 +1130,14 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
     self.client.clean_up_temporary_dataset(self.executing_project)
 
   def __iter__(self):
+    if not self.bq_io_metadata:
+      self.bq_io_metadata = create_bigquery_io_metadata()
     for rows, schema in self.client.run_query(
         project_id=self.executing_project, query=self.query,
         use_legacy_sql=self.use_legacy_sql,
-        flatten_results=self.flatten_results):
+        flatten_results=self.flatten_results,
+        job_labels=self.bq_io_metadata.add_additional_bq_job_labels(
+            self.bigquery_job_labels)):
       if self.schema is None:
         self.schema = schema
       for row in rows:
diff --git a/sdks/python/apache_beam/io/gcp/gce_metadata_util.py 
b/sdks/python/apache_beam/io/gcp/gce_metadata_util.py
new file mode 100644
index 0000000..40f1745
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/gce_metadata_util.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import requests
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/";
+
+
+def _fetch_metadata(key):
+  try:
+    headers = {"Metadata-Flavor": "Google"}
+    uri = BASE_METADATA_URL + key
+    resp = requests.get(uri, headers=headers, timeout=5)  # 5 seconds.
+    if resp.status_code == 200:
+      return resp.text
+  except requests.exceptions.RequestException:
+    # Silently fail, may mean its running on a non DataflowRunner,
+    # in which case it's prefectly normal.
+    pass
+  return ""
+
+
+def _fetch_custom_gce_metadata(customMetadataKey):
+  return _fetch_metadata("instance/attributes/" + customMetadataKey)
+
+
+def fetch_dataflow_job_id():
+  return _fetch_custom_gce_metadata("job_id")

Reply via email to