This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 500ca1778f4 [Python] Add warning to temp_location and staging_location 
when the bucket has soft delete enabled (#31550)
500ca1778f4 is described below

commit 500ca1778f40fb4398474361405f6f9395d28caa
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Jun 10 14:42:30 2024 -0400

    [Python] Add warning to temp_location and staging_location when the bucket 
has soft delete enabled (#31550)
    
    * Add warning if soft delete is enabled in temp or staging buckets.
---
 sdks/python/apache_beam/io/gcp/gcsio.py            | 13 +++++++++++++
 sdks/python/apache_beam/io/gcp/gcsio_test.py       | 15 +++++++++++++++
 .../python/apache_beam/options/pipeline_options.py | 22 ++++++++++++++++++++++
 3 files changed, 50 insertions(+)

diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py 
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 1c05996020a..b2f8bd4a2da 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -529,6 +529,19 @@ class GcsIO(object):
         time.mktime(updated.timetuple()) - time.timezone +
         updated.microsecond / 1000000.0)
 
+  def is_soft_delete_enabled(self, gcs_path):
+    try:
+      bucket_name, _ = parse_gcs_path(gcs_path)
+      bucket = self.get_bucket(bucket_name)
+      if (bucket.soft_delete_policy is not None and
+          bucket.soft_delete_policy.retention_duration_seconds > 0):
+        return True
+    except Exception:
+      _LOGGER.warning(
+          "Unexpected error occurred when checking soft delete policy for %s" %
+          gcs_path)
+    return False
+
 
 class BeamBlobReader(BlobReader):
   def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE):
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py 
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index b17e0638d6b..c1356b53095 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -600,6 +600,21 @@ class TestGCSIO(unittest.TestCase):
     self.assertEqual(
         request_data_json['softDeletePolicy']['retentionDurationSeconds'], 0)
 
+  @mock.patch("apache_beam.io.gcp.gcsio.GcsIO.get_bucket")
+  def test_is_soft_delete_enabled(self, mock_get_bucket):
+    bucket = mock.MagicMock()
+    mock_get_bucket.return_value = bucket
+
+    # soft delete policy enabled
+    bucket.soft_delete_policy.retention_duration_seconds = 1024
+    self.assertTrue(
+        self.gcs.is_soft_delete_enabled("gs://beam_with_soft_delete/tmp"))
+
+    # soft delete policy disabled
+    bucket.soft_delete_policy.retention_duration_seconds = 0
+    self.assertFalse(
+        self.gcs.is_soft_delete_enabled("gs://beam_without_soft_delete/tmp"))
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 0af32837a3f..42aee47a957 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -913,6 +913,24 @@ class GoogleCloudOptions(PipelineOptions):
     else:
       return None
 
+  # Log warning if soft delete policy is enabled in a gcs bucket
+  # that is specified in an argument.
+  def _warn_if_soft_delete_policy_enabled(self, arg_name):
+    gcs_path = getattr(self, arg_name, None)
+    try:
+      from apache_beam.io.gcp import gcsio
+      if gcsio.GcsIO().is_soft_delete_enabled(gcs_path):
+        _LOGGER.warning(
+            "Bucket specified in %s has soft-delete policy enabled."
+            " To avoid being billed for unnecessary storage costs, turn"
+            " off the soft delete feature on buckets that your Dataflow"
+            " jobs use for temporary and staging storage. For more"
+            " information, see"
+            " https://cloud.google.com/storage/docs/use-soft-delete";
+            "#remove-soft-delete-policy." % arg_name)
+    except ImportError:
+      _LOGGER.warning('Unable to check soft delete policy due to import 
error.')
+
   # If either temp or staging location has an issue, we use the valid one for
   # both locations. If both are bad we return an error.
   def _handle_temp_and_staging_locations(self, validator):
@@ -920,9 +938,11 @@ class GoogleCloudOptions(PipelineOptions):
     staging_errors = validator.validate_gcs_path(self, 'staging_location')
     if temp_errors and not staging_errors:
       setattr(self, 'temp_location', getattr(self, 'staging_location'))
+      self._warn_if_soft_delete_policy_enabled('staging_location')
       return []
     elif staging_errors and not temp_errors:
       setattr(self, 'staging_location', getattr(self, 'temp_location'))
+      self._warn_if_soft_delete_policy_enabled('temp_location')
       return []
     elif not staging_errors and not temp_errors:
       return []
@@ -935,6 +955,8 @@ class GoogleCloudOptions(PipelineOptions):
       else:
         setattr(self, 'temp_location', default_bucket)
         setattr(self, 'staging_location', default_bucket)
+        self._warn_if_soft_delete_policy_enabled('temp_location')
+        self._warn_if_soft_delete_policy_enabled('staging_location')
         return []
 
   def validate(self, validator):

Reply via email to