This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 999f7deb94e [Python] Add job name to GCS custom audit info (#31300)
999f7deb94e is described below

commit 999f7deb94e9875be85679ac0d17b331d9e36fd0
Author: Shunping Huang <[email protected]>
AuthorDate: Thu May 16 14:04:13 2024 -0400

    [Python] Add job name to GCS custom audit info (#31300)
    
    * Add job name to GCS custom audit info in Python
    
    * Fix the way assertion is called in test_headers.
    
    * Fix test failure and add function description to create_storage_client.
    
    * Fix class reference issue in docstring.
    
    * Change the arguments of create_storage_client and fix lints
    
    * Minor fix based on code review.
---
 sdks/python/apache_beam/io/gcp/gcsio.py            | 48 ++++++++++++++++------
 sdks/python/apache_beam/io/gcp/gcsio_test.py       | 47 +++++++++++++++++++++
 .../runners/dataflow/internal/apiclient.py         | 20 ++-------
 .../apache_beam/runners/interactive/utils.py       | 18 +-------
 .../runners/portability/sdk_container_builder.py   | 14 ++-----
 5 files changed, 91 insertions(+), 56 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py 
b/sdks/python/apache_beam/io/gcp/gcsio.py
index f0bbf458cf8..73ff697127d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -47,7 +47,7 @@ from apache_beam.options.pipeline_options import 
PipelineOptions
 from apache_beam.utils import retry
 from apache_beam.utils.annotations import deprecated
 
-__all__ = ['GcsIO']
+__all__ = ['GcsIO', 'create_storage_client']
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -99,6 +99,40 @@ def get_or_create_default_gcs_bucket(options):
         bucket_name, project, location=region)
 
 
+def create_storage_client(pipeline_options, use_credentials=True):
+  """Create a GCS client for Beam via GCS Client Library.
+
+  Args:
+    pipeline_options(apache_beam.options.pipeline_options.PipelineOptions):
+      the options of the pipeline.
+    use_credentials(bool): whether to create an authenticated client based
+      on pipeline options or an anonymous client.
+
+  Returns:
+    A google.cloud.storage.client.Client instance.
+  """
+  if use_credentials:
+    credentials = auth.get_service_credentials(pipeline_options)
+  else:
+    credentials = None
+
+  if credentials:
+    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
+    from google.api_core import client_info
+    beam_client_info = client_info.ClientInfo(
+        user_agent="apache-beam/%s (GPN:Beam)" % beam_version.__version__)
+    return storage.Client(
+        credentials=credentials.get_google_auth_credentials(),
+        project=google_cloud_options.project,
+        client_info=beam_client_info,
+        extra_headers={
+            "x-goog-custom-audit-job": google_cloud_options.job_name
+            if google_cloud_options.job_name else "UNKNOWN"
+        })
+  else:
+    return storage.Client.create_anonymous_client()
+
+
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
   def __init__(self, storage_client=None, pipeline_options=None):
@@ -108,17 +142,7 @@ class GcsIO(object):
         pipeline_options = PipelineOptions()
       elif isinstance(pipeline_options, dict):
         pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
-      credentials = auth.get_service_credentials(pipeline_options)
-      if credentials:
-        storage_client = storage.Client(
-            credentials=credentials.get_google_auth_credentials(),
-            project=pipeline_options.view_as(GoogleCloudOptions).project,
-            extra_headers={
-                "User-Agent": "apache-beam/%s (GPN:Beam)" %
-                beam_version.__version__
-            })
-      else:
-        storage_client = storage.Client.create_anonymous_client()
+      storage_client = create_storage_client(pipeline_options)
     self.client = storage_client
     self._rewrite_cb = None
     self.bucket_to_project_number = {}
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py 
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index c9a7fb72f77..3572f2ffc02 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -25,6 +25,8 @@ from datetime import datetime
 
 import mock
 
+from apache_beam import version as beam_version
+
 # pylint: disable=wrong-import-order, wrong-import-position
 
 try:
@@ -224,6 +226,23 @@ class SampleOptions(object):
     self.dataflow_kms_key = kms_key
 
 
+_DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
+
+
+def _make_credentials(project=None, universe_domain=_DEFAULT_UNIVERSE_DOMAIN):
+  import google.auth.credentials
+
+  if project is not None:
+    return mock.Mock(
+        spec=google.auth.credentials.Credentials,
+        project_id=project,
+        universe_domain=universe_domain,
+    )
+
+  return mock.Mock(
+      spec=google.auth.credentials.Credentials, 
universe_domain=universe_domain)
+
+
 @unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
 class TestGCSIO(unittest.TestCase):
   def _insert_random_file(
@@ -520,6 +539,34 @@ class TestGCSIO(unittest.TestCase):
     blob.delete()
     self.assertFalse(blob_name in bucket.blobs)
 
+  @mock.patch('google.cloud._http.JSONConnection._do_request')
+  @mock.patch('apache_beam.internal.gcp.auth.get_service_credentials')
+  def test_headers(self, mock_get_service_credentials, mock_do_request):
+    from apache_beam.internal.gcp.auth import _ApitoolsCredentialsAdapter
+    mock_get_service_credentials.return_value = _ApitoolsCredentialsAdapter(
+        _make_credentials("test-project"))
+
+    gcs = gcsio.GcsIO(pipeline_options={"job_name": "test-job-name"})
+    # no HTTP request when initializing GcsIO
+    mock_do_request.assert_not_called()
+
+    import requests
+    response = requests.Response()
+    response.status_code = 200
+    mock_do_request.return_value = response
+
+    # The function of get_bucket() is supposed to send only one HTTP request
+    gcs.get_bucket("test-bucket")
+    mock_do_request.assert_called_once()
+    call_args = mock_do_request.call_args[0]
+
+    # Headers are specified as the third argument of
+    # google.cloud._http.JSONConnection._do_request
+    actual_headers = call_args[2]
+    beam_user_agent = "apache-beam/%s (GPN:Beam)" % beam_version.__version__
+    self.assertIn(beam_user_agent, actual_headers['User-Agent'])
+    self.assertEqual(actual_headers['x-goog-custom-audit-job'], 
'test-job-name')
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 2ef50b3c2ed..1e68676c457 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -55,6 +55,7 @@ from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+from apache_beam.io.gcp.gcsio import create_storage_client
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import StandardOptions
@@ -492,14 +493,10 @@ class DataflowApplicationClient(object):
         root_staging_location or self.google_cloud_options.staging_location)
     self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
 
-    from google.cloud import storage
-
     if self.google_cloud_options.no_auth:
       credentials = None
-      storage_credentials = None
     else:
       credentials = get_service_credentials(options)
-      storage_credentials = credentials.get_google_auth_credentials()
 
     http_client = get_new_http()
     self._client = dataflow.DataflowV1b3(
@@ -508,19 +505,8 @@ class DataflowApplicationClient(object):
         get_credentials=(not self.google_cloud_options.no_auth),
         http=http_client,
         response_encoding=get_response_encoding())
-    if storage_credentials:
-      # Here we explicitly set the project to the value specified in pipeline
-      # options, so the new storage client will be consistent with the previous
-      # client in terms of which GCP project to use.
-      self._storage_client = storage.Client(
-          credentials=storage_credentials,
-          project=self.google_cloud_options.project,
-          extra_headers={
-              "User-Agent": "apache-beam/%s (GPN:Beam)" %
-              beam_version.__version__
-          })
-    else:
-      self._storage_client = storage.Client.create_anonymous_client()
+    self._storage_client = create_storage_client(
+        options, not self.google_cloud_options.no_auth)
     self._sdk_image_overrides = self._get_sdk_image_overrides(options)
 
   def _get_sdk_image_overrides(self, pipeline_options):
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py 
b/sdks/python/apache_beam/runners/interactive/utils.py
index 524e532c2e3..3c7cb91d919 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -30,10 +30,8 @@ from typing import Tuple
 import pandas as pd
 
 import apache_beam as beam
-from apache_beam import version as beam_version
 from apache_beam.dataframe.convert import to_pcollection
 from apache_beam.dataframe.frame_base import DeferredBase
-from apache_beam.internal.gcp import auth
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.pipeline import Pipeline
 from apache_beam.portability.api import beam_runner_api_pb2
@@ -452,20 +450,8 @@ def assert_bucket_exists(bucket_name):
   try:
     from google.cloud.exceptions import ClientError
     from google.cloud.exceptions import NotFound
-    from google.cloud import storage
-    credentials = auth.get_service_credentials(PipelineOptions())
-    if credentials:
-      # We set project to None, so it will not try to use project id from
-      # the environment (ADC).
-      storage_client = storage.Client(
-          credentials=credentials.get_google_auth_credentials(),
-          project=None,
-          extra_headers={
-              "User-Agent": "apache-beam/%s (GPN:Beam)" %
-              beam_version.__version__
-          })
-    else:
-      storage_client = storage.Client.create_anonymous_client()
+    from apache_beam.io.gcp.gcsio import create_storage_client
+    storage_client = create_storage_client(PipelineOptions())
     storage_client.get_bucket(bucket_name)
   except ClientError as e:
     if isinstance(e, NotFound):
diff --git 
a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py 
b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
index d5d1bca981d..489973304f5 100644
--- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
+++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
@@ -209,17 +209,9 @@ class 
_SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
       credentials = None
     else:
       credentials = get_service_credentials(options)
-    from google.cloud import storage
-    if credentials:
-      self._storage_client = storage.Client(
-          credentials=credentials.get_google_auth_credentials(),
-          project=self._google_cloud_options.project,
-          extra_headers={
-              "User-Agent": "apache-beam/%s (GPN:Beam)" %
-              beam_version.__version__
-          })
-    else:
-      self._storage_client = storage.Client.create_anonymous_client()
+    from apache_beam.io.gcp.gcsio import create_storage_client
+    self._storage_client = create_storage_client(
+        options, not self._google_cloud_options.no_auth)
     self._cloudbuild_client = cloudbuild.CloudbuildV1(
         credentials=credentials,
         get_credentials=(not self._google_cloud_options.no_auth),

Reply via email to