This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 500ca1778f4 [Python] Add warning to temp_location and staging_location
when the bucket has soft delete enabled (#31550)
500ca1778f4 is described below
commit 500ca1778f40fb4398474361405f6f9395d28caa
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Jun 10 14:42:30 2024 -0400
[Python] Add warning to temp_location and staging_location when the bucket
has soft delete enabled (#31550)
* Add warning if soft delete is enabled in temp or staging buckets.
---
sdks/python/apache_beam/io/gcp/gcsio.py | 13 +++++++++++++
sdks/python/apache_beam/io/gcp/gcsio_test.py | 15 +++++++++++++++
.../python/apache_beam/options/pipeline_options.py | 22 ++++++++++++++++++++++
3 files changed, 50 insertions(+)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 1c05996020a..b2f8bd4a2da 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -529,6 +529,19 @@ class GcsIO(object):
time.mktime(updated.timetuple()) - time.timezone +
updated.microsecond / 1000000.0)
+ def is_soft_delete_enabled(self, gcs_path):
+ try:
+ bucket_name, _ = parse_gcs_path(gcs_path)
+ bucket = self.get_bucket(bucket_name)
+ if (bucket.soft_delete_policy is not None and
+ bucket.soft_delete_policy.retention_duration_seconds > 0):
+ return True
+ except Exception:
+ _LOGGER.warning(
+ "Unexpected error occurred when checking soft delete policy for %s" %
+ gcs_path)
+ return False
+
class BeamBlobReader(BlobReader):
def __init__(self, blob, chunk_size=DEFAULT_READ_BUFFER_SIZE):
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index b17e0638d6b..c1356b53095 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -600,6 +600,21 @@ class TestGCSIO(unittest.TestCase):
self.assertEqual(
request_data_json['softDeletePolicy']['retentionDurationSeconds'], 0)
+ @mock.patch("apache_beam.io.gcp.gcsio.GcsIO.get_bucket")
+ def test_is_soft_delete_enabled(self, mock_get_bucket):
+ bucket = mock.MagicMock()
+ mock_get_bucket.return_value = bucket
+
+ # soft delete policy enabled
+ bucket.soft_delete_policy.retention_duration_seconds = 1024
+ self.assertTrue(
+ self.gcs.is_soft_delete_enabled("gs://beam_with_soft_delete/tmp"))
+
+ # soft delete policy disabled
+ bucket.soft_delete_policy.retention_duration_seconds = 0
+ self.assertFalse(
+ self.gcs.is_soft_delete_enabled("gs://beam_without_soft_delete/tmp"))
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 0af32837a3f..42aee47a957 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -913,6 +913,24 @@ class GoogleCloudOptions(PipelineOptions):
else:
return None
+ # Log warning if soft delete policy is enabled in a gcs bucket
+ # that is specified in an argument.
+ def _warn_if_soft_delete_policy_enabled(self, arg_name):
+ gcs_path = getattr(self, arg_name, None)
+ try:
+ from apache_beam.io.gcp import gcsio
+ if gcsio.GcsIO().is_soft_delete_enabled(gcs_path):
+ _LOGGER.warning(
+ "Bucket specified in %s has soft-delete policy enabled."
+ " To avoid being billed for unnecessary storage costs, turn"
+ " off the soft delete feature on buckets that your Dataflow"
+ " jobs use for temporary and staging storage. For more"
+ " information, see"
+ " https://cloud.google.com/storage/docs/use-soft-delete"
+ "#remove-soft-delete-policy." % arg_name)
+ except ImportError:
+ _LOGGER.warning('Unable to check soft delete policy due to import
error.')
+
# If either temp or staging location has an issue, we use the valid one for
# both locations. If both are bad we return an error.
def _handle_temp_and_staging_locations(self, validator):
@@ -920,9 +938,11 @@ class GoogleCloudOptions(PipelineOptions):
staging_errors = validator.validate_gcs_path(self, 'staging_location')
if temp_errors and not staging_errors:
setattr(self, 'temp_location', getattr(self, 'staging_location'))
+ self._warn_if_soft_delete_policy_enabled('staging_location')
return []
elif staging_errors and not temp_errors:
setattr(self, 'staging_location', getattr(self, 'temp_location'))
+ self._warn_if_soft_delete_policy_enabled('temp_location')
return []
elif not staging_errors and not temp_errors:
return []
@@ -935,6 +955,8 @@ class GoogleCloudOptions(PipelineOptions):
else:
setattr(self, 'temp_location', default_bucket)
setattr(self, 'staging_location', default_bucket)
+ self._warn_if_soft_delete_policy_enabled('temp_location')
+ self._warn_if_soft_delete_policy_enabled('staging_location')
return []
def validate(self, validator):