This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 999f7deb94e [Python] Add job name to GCS custom audit info (#31300)
999f7deb94e is described below
commit 999f7deb94e9875be85679ac0d17b331d9e36fd0
Author: Shunping Huang <[email protected]>
AuthorDate: Thu May 16 14:04:13 2024 -0400
[Python] Add job name to GCS custom audit info (#31300)
* Add job name to GCS custom audit info in Python
* Fix the way assertion is called in test_headers.
* Fix test failure and add function description to create_storage_client.
* Fix class reference issue in docstring.
* Change the arguments of create_storage_client and fix lints
* Minor fix based on code review.
---
sdks/python/apache_beam/io/gcp/gcsio.py | 48 ++++++++++++++++------
sdks/python/apache_beam/io/gcp/gcsio_test.py | 47 +++++++++++++++++++++
.../runners/dataflow/internal/apiclient.py | 20 ++-------
.../apache_beam/runners/interactive/utils.py | 18 +-------
.../runners/portability/sdk_container_builder.py | 14 ++-----
5 files changed, 91 insertions(+), 56 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py
b/sdks/python/apache_beam/io/gcp/gcsio.py
index f0bbf458cf8..73ff697127d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -47,7 +47,7 @@ from apache_beam.options.pipeline_options import
PipelineOptions
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated
-__all__ = ['GcsIO']
+__all__ = ['GcsIO', 'create_storage_client']
_LOGGER = logging.getLogger(__name__)
@@ -99,6 +99,40 @@ def get_or_create_default_gcs_bucket(options):
bucket_name, project, location=region)
+def create_storage_client(pipeline_options, use_credentials=True):
+ """Create a GCS client for Beam via GCS Client Library.
+
+ Args:
+ pipeline_options(apache_beam.options.pipeline_options.PipelineOptions):
+ the options of the pipeline.
+ use_credentials(bool): whether to create an authenticated client based
+ on pipeline options or an anonymous client.
+
+ Returns:
+ A google.cloud.storage.client.Client instance.
+ """
+ if use_credentials:
+ credentials = auth.get_service_credentials(pipeline_options)
+ else:
+ credentials = None
+
+ if credentials:
+ google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
+ from google.api_core import client_info
+ beam_client_info = client_info.ClientInfo(
+ user_agent="apache-beam/%s (GPN:Beam)" % beam_version.__version__)
+ return storage.Client(
+ credentials=credentials.get_google_auth_credentials(),
+ project=google_cloud_options.project,
+ client_info=beam_client_info,
+ extra_headers={
+ "x-goog-custom-audit-job": google_cloud_options.job_name
+ if google_cloud_options.job_name else "UNKNOWN"
+ })
+ else:
+ return storage.Client.create_anonymous_client()
+
+
class GcsIO(object):
"""Google Cloud Storage I/O client."""
def __init__(self, storage_client=None, pipeline_options=None):
@@ -108,17 +142,7 @@ class GcsIO(object):
pipeline_options = PipelineOptions()
elif isinstance(pipeline_options, dict):
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
- credentials = auth.get_service_credentials(pipeline_options)
- if credentials:
- storage_client = storage.Client(
- credentials=credentials.get_google_auth_credentials(),
- project=pipeline_options.view_as(GoogleCloudOptions).project,
- extra_headers={
- "User-Agent": "apache-beam/%s (GPN:Beam)" %
- beam_version.__version__
- })
- else:
- storage_client = storage.Client.create_anonymous_client()
+ storage_client = create_storage_client(pipeline_options)
self.client = storage_client
self._rewrite_cb = None
self.bucket_to_project_number = {}
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index c9a7fb72f77..3572f2ffc02 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -25,6 +25,8 @@ from datetime import datetime
import mock
+from apache_beam import version as beam_version
+
# pylint: disable=wrong-import-order, wrong-import-position
try:
@@ -224,6 +226,23 @@ class SampleOptions(object):
self.dataflow_kms_key = kms_key
+_DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
+
+
+def _make_credentials(project=None, universe_domain=_DEFAULT_UNIVERSE_DOMAIN):
+ import google.auth.credentials
+
+ if project is not None:
+ return mock.Mock(
+ spec=google.auth.credentials.Credentials,
+ project_id=project,
+ universe_domain=universe_domain,
+ )
+
+ return mock.Mock(
+ spec=google.auth.credentials.Credentials,
universe_domain=universe_domain)
+
+
@unittest.skipIf(NotFound is None, 'GCP dependencies are not installed')
class TestGCSIO(unittest.TestCase):
def _insert_random_file(
@@ -520,6 +539,34 @@ class TestGCSIO(unittest.TestCase):
blob.delete()
self.assertFalse(blob_name in bucket.blobs)
+ @mock.patch('google.cloud._http.JSONConnection._do_request')
+ @mock.patch('apache_beam.internal.gcp.auth.get_service_credentials')
+ def test_headers(self, mock_get_service_credentials, mock_do_request):
+ from apache_beam.internal.gcp.auth import _ApitoolsCredentialsAdapter
+ mock_get_service_credentials.return_value = _ApitoolsCredentialsAdapter(
+ _make_credentials("test-project"))
+
+ gcs = gcsio.GcsIO(pipeline_options={"job_name": "test-job-name"})
+ # no HTTP request when initializing GcsIO
+ mock_do_request.assert_not_called()
+
+ import requests
+ response = requests.Response()
+ response.status_code = 200
+ mock_do_request.return_value = response
+
+ # The function of get_bucket() is supposed to send only one HTTP request
+ gcs.get_bucket("test-bucket")
+ mock_do_request.assert_called_once()
+ call_args = mock_do_request.call_args[0]
+
+ # Headers are specified as the third argument of
+ # google.cloud._http.JSONConnection._do_request
+ actual_headers = call_args[2]
+ beam_user_agent = "apache-beam/%s (GPN:Beam)" % beam_version.__version__
+ self.assertIn(beam_user_agent, actual_headers['User-Agent'])
+ self.assertEqual(actual_headers['x-goog-custom-audit-job'],
'test-job-name')
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 2ef50b3c2ed..1e68676c457 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -55,6 +55,7 @@ from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.internal.http_client import get_new_http
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+from apache_beam.io.gcp.gcsio import create_storage_client
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
@@ -492,14 +493,10 @@ class DataflowApplicationClient(object):
root_staging_location or self.google_cloud_options.staging_location)
self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
- from google.cloud import storage
-
if self.google_cloud_options.no_auth:
credentials = None
- storage_credentials = None
else:
credentials = get_service_credentials(options)
- storage_credentials = credentials.get_google_auth_credentials()
http_client = get_new_http()
self._client = dataflow.DataflowV1b3(
@@ -508,19 +505,8 @@ class DataflowApplicationClient(object):
get_credentials=(not self.google_cloud_options.no_auth),
http=http_client,
response_encoding=get_response_encoding())
- if storage_credentials:
- # Here we explicitly set the project to the value specified in pipeline
- # options, so the new storage client will be consistent with the previous
- # client in terms of which GCP project to use.
- self._storage_client = storage.Client(
- credentials=storage_credentials,
- project=self.google_cloud_options.project,
- extra_headers={
- "User-Agent": "apache-beam/%s (GPN:Beam)" %
- beam_version.__version__
- })
- else:
- self._storage_client = storage.Client.create_anonymous_client()
+ self._storage_client = create_storage_client(
+ options, not self.google_cloud_options.no_auth)
self._sdk_image_overrides = self._get_sdk_image_overrides(options)
def _get_sdk_image_overrides(self, pipeline_options):
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py
b/sdks/python/apache_beam/runners/interactive/utils.py
index 524e532c2e3..3c7cb91d919 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -30,10 +30,8 @@ from typing import Tuple
import pandas as pd
import apache_beam as beam
-from apache_beam import version as beam_version
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.frame_base import DeferredBase
-from apache_beam.internal.gcp import auth
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline
from apache_beam.portability.api import beam_runner_api_pb2
@@ -452,20 +450,8 @@ def assert_bucket_exists(bucket_name):
try:
from google.cloud.exceptions import ClientError
from google.cloud.exceptions import NotFound
- from google.cloud import storage
- credentials = auth.get_service_credentials(PipelineOptions())
- if credentials:
- # We set project to None, so it will not try to use project id from
- # the environment (ADC).
- storage_client = storage.Client(
- credentials=credentials.get_google_auth_credentials(),
- project=None,
- extra_headers={
- "User-Agent": "apache-beam/%s (GPN:Beam)" %
- beam_version.__version__
- })
- else:
- storage_client = storage.Client.create_anonymous_client()
+ from apache_beam.io.gcp.gcsio import create_storage_client
+ storage_client = create_storage_client(PipelineOptions())
storage_client.get_bucket(bucket_name)
except ClientError as e:
if isinstance(e, NotFound):
diff --git
a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
index d5d1bca981d..489973304f5 100644
--- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
+++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
@@ -209,17 +209,9 @@ class
_SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
credentials = None
else:
credentials = get_service_credentials(options)
- from google.cloud import storage
- if credentials:
- self._storage_client = storage.Client(
- credentials=credentials.get_google_auth_credentials(),
- project=self._google_cloud_options.project,
- extra_headers={
- "User-Agent": "apache-beam/%s (GPN:Beam)" %
- beam_version.__version__
- })
- else:
- self._storage_client = storage.Client.create_anonymous_client()
+ from apache_beam.io.gcp.gcsio import create_storage_client
+ self._storage_client = create_storage_client(
+ options, not self._google_cloud_options.no_auth)
self._cloudbuild_client = cloudbuild.CloudbuildV1(
credentials=credentials,
get_credentials=(not self._google_cloud_options.no_auth),