This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8374fbe106f Stale pubsub subscriptions cleaner (#35553)
8374fbe106f is described below
commit 8374fbe106ff64543185e69c0a23d061926c4a8f
Author: Enrique Calderon <[email protected]>
AuthorDate: Mon Jul 14 16:58:20 2025 -0600
Stale pubsub subscriptions cleaner (#35553)
* Implement PubSub Subscription stale cleaner as dry run
This cleaner will delete the stale PubSub subscriptions that do not have
any associated topics.
* Add tests for the PubSub subscription deletion class
---
.test-infra/tools/stale_cleaner.py | 82 ++++++++++++++++++++++++++++++-
.test-infra/tools/test_stale_cleaner.py | 85 ++++++++++++++++++++++++++++++++-
2 files changed, 164 insertions(+), 3 deletions(-)
diff --git a/.test-infra/tools/stale_cleaner.py
b/.test-infra/tools/stale_cleaner.py
index a554473c7c8..2c702cfa0fd 100644
--- a/.test-infra/tools/stale_cleaner.py
+++ b/.test-infra/tools/stale_cleaner.py
@@ -25,6 +25,7 @@ from google.cloud import pubsub_v1, storage
# Resource types
PUBSUB_TOPIC_RESOURCE = "pubsub_topic"
+PUBSUB_SUBSCRIPTION_RESOURCE = "pubsub_subscription"
# Storage constants
STORAGE_PREFIX = "stale_cleaner/"
@@ -34,6 +35,7 @@ PROJECT_PATH_PREFIX = "projects/" # Prefix for the project
path in GCP *This is
# Time constants (in seconds)
DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day
+DEFAULT_PUBSUB_SUBSCRIPTION_THRESHOLD = 86400 # 1 day
DEFAULT_TIME_THRESHOLD = 3600 # 1 hour
# Default values for testing
@@ -285,6 +287,12 @@ class StaleCleaner:
# PubSub topic cleaner
class PubSubTopicCleaner(StaleCleaner):
+ """
+ This cleaner will delete PubSub topics that are stale based on the time
threshold.
+ It uses the PubSub API to list and delete topics.
+ It also applies prefix filtering to only delete topics that match the
specified prefixes.
+ """
+
def __init__(self, project_id: str, bucket_name: str,
prefixes: list = None, time_threshold: int =
DEFAULT_PUBSUB_TOPIC_THRESHOLD,
clock: Clock = None) -> None:
@@ -304,7 +312,50 @@ class PubSubTopicCleaner(StaleCleaner):
print(f"{self.clock()} - Deleting PubSub topic {resource_name}")
self.client.delete_topic(request={"topic": resource_name})
-if __name__ == "__main__":
+# PubSub Subscription cleaner
+class PubSubSubscriptionCleaner(StaleCleaner):
+ """
+ This cleaner will delete PubSub subscriptions that are stale based on the
time threshold.
+ It uses the PubSub API to list and delete subscriptions.
+ It also applies prefix filtering to only delete subscriptions that match
the specified prefixes.
+ It checks if the subscription is detached (whether it has a topic
associated with it).
+ If it is detached, it will be considered stale and eligible for deletion.
+ """
+
+ def __init__(self, project_id: str, bucket_name: str,
+ prefixes: list = None, time_threshold: int =
DEFAULT_PUBSUB_SUBSCRIPTION_THRESHOLD,
+ clock: Clock = None) -> None:
+ super().__init__(project_id, PUBSUB_SUBSCRIPTION_RESOURCE,
bucket_name, prefixes, time_threshold, clock)
+ self.client = None # Will be initialized in each method that needs it
+
+ def _active_resources(self) -> dict:
+ d = {}
+ self.client = pubsub_v1.SubscriberClient()
+
+ with self.client:
+ for subscription in
self.client.list_subscriptions(request={"project": self.project_path}):
+ subscription_name = subscription.name
+ # Apply prefix filtering if prefixes are defined
+ if not self.prefixes or
any(subscription_name.startswith(f"{self.project_path}/subscriptions/{prefix}")
for prefix in self.prefixes):
+ # Check if the subscription has a topic associated with it
+ if subscription.detached:
+ d[subscription_name] =
GoogleCloudResource(resource_name=subscription_name, clock=self.clock)
+
+ return d
+
+ def _delete_resource(self, resource_name: str) -> None:
+ self.client = pubsub_v1.SubscriberClient()
+ print(f"{self.clock()} - Deleting PubSub subscription {resource_name}")
+ with self.client:
+ subscription_path = self.client.subscription_path(self.project_id,
resource_name)
+ self.client.delete_subscription(request={"subscription":
subscription_path})
+
+def clean_pubsub_topics():
+ """ Clean up stale PubSub topics in the specified GCP project.
+ This function initializes the PubSubTopicCleaner with the default project
ID and bucket name,
+ and a predefined list of topic prefixes.
+ It then refreshes the resources and deletes any stale topics.
+ """
project_id = DEFAULT_PROJECT_ID
bucket_name = DEFAULT_BUCKET_NAME
@@ -355,3 +406,32 @@ if __name__ == "__main__":
# Delete stale resources
cleaner.delete_stale(dry_run=False)
+
+def clean_pubsub_subscriptions():
+ """ Clean up stale PubSub subscriptions in the specified GCP project.
+ This function initializes the PubSubSubscriptionCleaner with the default
project ID and bucket name,
+ and a predefined list of subscription prefixes.
+ It then refreshes the resources and deletes any stale subscriptions.
+ """
+ project_id = DEFAULT_PROJECT_ID
+ bucket_name = DEFAULT_BUCKET_NAME
+
+ # No prefixes are defined for subscriptions so we will delete all stale
subscriptions
+ prefixes = []
+
+ # Create a PubSubSubscriptionCleaner instance
+ cleaner = PubSubSubscriptionCleaner(project_id=project_id,
bucket_name=bucket_name,
+ prefixes=prefixes,
time_threshold=DEFAULT_PUBSUB_SUBSCRIPTION_THRESHOLD)
+
+ # Refresh resources
+ cleaner.refresh()
+
+ # Delete stale resources
+ cleaner.delete_stale(dry_run=True) # Keep dry_run=True to avoid accidental
deletions during testing
+
+if __name__ == "__main__":
+ # Clean up stale PubSub topics
+ clean_pubsub_topics()
+
+ # Clean up stale PubSub subscriptions
+ clean_pubsub_subscriptions()
diff --git a/.test-infra/tools/test_stale_cleaner.py
b/.test-infra/tools/test_stale_cleaner.py
index b917c1e9b4d..c53fbc1a44d 100644
--- a/.test-infra/tools/test_stale_cleaner.py
+++ b/.test-infra/tools/test_stale_cleaner.py
@@ -25,6 +25,7 @@ from stale_cleaner import (
GoogleCloudResource,
StaleCleaner,
PubSubTopicCleaner,
+ PubSubSubscriptionCleaner,
FakeClock,
DEFAULT_TIME_THRESHOLD,
PUBSUB_TOPIC_RESOURCE,
@@ -390,6 +391,86 @@ class StaleCleanerTest(unittest.TestCase):
self.assertNotIn("deleted-resource", self.cleaner.deleted_resources)
+class PubSubSubscriptionCleanerTest(unittest.TestCase):
+ """Tests for the PubSubSubscriptionCleaner class."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ self.project_id = "test-project"
+ self.bucket_name = "test-bucket"
+ self.prefixes = ["test-prefix"]
+ self.time_threshold = 86400 # 1 day
+ self.fake_clock = FakeClock("2025-05-28T10:00:00")
+
+ # Mock PubSub client
+ self.mock_client_patcher =
mock.patch('google.cloud.pubsub_v1.SubscriberClient')
+ self.MockSubscriberClientClass = self.mock_client_patcher.start()
+ self.mock_subscriber_client =
self.MockSubscriberClientClass.return_value
+
+ # Create a test cleaner
+ self.cleaner = PubSubSubscriptionCleaner(
+ project_id=self.project_id,
+ bucket_name=self.bucket_name,
+ prefixes=self.prefixes,
+ time_threshold=self.time_threshold,
+ clock=self.fake_clock
+ )
+
+ self.cleaner._write_resources = SilencedMock()
+ self.cleaner._stored_resources = SilencedMock(return_value={})
+
+ def tearDown(self):
+ """Tear down test fixtures."""
+ self.mock_client_patcher.stop()
+
+ def test_init(self):
+ """Test initialization."""
+ self.assertEqual(self.cleaner.project_id, self.project_id)
+ self.assertEqual(self.cleaner.bucket_name, self.bucket_name)
+ self.assertEqual(self.cleaner.prefixes, self.prefixes)
+ self.assertEqual(self.cleaner.time_threshold, self.time_threshold)
+ self.assertIsInstance(self.cleaner.clock, FakeClock)
+
+ def test_active_resources(self):
+ """Test _active_resources method."""
+ # Mock subscriptions
+ sub1 = mock.Mock()
+ sub1.name = "projects/test-project/subscriptions/test-prefix-sub1"
+ sub1.topic = "projects/test-project/topics/some-topic"
+
+ sub2 = mock.Mock()
+ sub2.name =
"projects/test-project/subscriptions/test-prefix-sub2-detached"
+ sub2.topic = "_deleted-topic_"
+
+ sub3 = mock.Mock()
+ sub3.name = "projects/test-project/subscriptions/other-prefix-sub3"
+ sub3.topic = "projects/test-project/topics/another-topic"
+
+ self.mock_subscriber_client.list_subscriptions.return_value = [sub1,
sub2, sub3]
+
+ with SilencePrint():
+ active = self.cleaner._active_resources()
+
+ self.assertIn("projects/test-project/subscriptions/test-prefix-sub1",
active)
+
self.assertIn("projects/test-project/subscriptions/test-prefix-sub2-detached",
active)
+
self.assertNotIn("projects/test-project/subscriptions/other-prefix-sub3",
active)
+ self.assertEqual(len(active), 2)
+
+ def test_delete_resource(self):
+ """Test _delete_resource method."""
+ sub_name = "test-sub-to-delete"
+ subscription_path =
f"projects/{self.project_id}/subscriptions/{sub_name}"
+ self.mock_subscriber_client.subscription_path.return_value =
subscription_path
+
+ with SilencePrint():
+ self.cleaner._delete_resource(sub_name)
+
+
self.mock_subscriber_client.subscription_path.assert_called_once_with(self.project_id,
sub_name)
+
self.mock_subscriber_client.delete_subscription.assert_called_once_with(
+ request={'subscription': subscription_path}
+ )
+
+
class PubSubTopicCleanerTest(unittest.TestCase):
"""Tests for the PubSubTopicCleaner class."""
@@ -484,10 +565,10 @@ class PubSubTopicCleanerTest(unittest.TestCase):
self.cleaner._delete_resource(resource_name)
# Check that delete_topic was called
-
self.cleaner.client.delete_topic.assert_called_once_with(name=resource_name)
+
self.cleaner.client.delete_topic.assert_called_once_with(request={'topic':
resource_name})
# Check that correct message was printed
- mock_print.assert_called_once_with(f"{self.cleaner.clock()} -
Deleting PubSub topic test-topic")
+ mock_print.assert_called_once_with(f"{self.cleaner.clock()} -
Deleting PubSub topic {resource_name}")
if __name__ == '__main__':
unittest.main()