This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8374fbe106f Stale pubsub subscriptions cleaner (#35553)
8374fbe106f is described below

commit 8374fbe106ff64543185e69c0a23d061926c4a8f
Author: Enrique Calderon <[email protected]>
AuthorDate: Mon Jul 14 16:58:20 2025 -0600

    Stale pubsub subscriptions cleaner (#35553)
    
    * Implement PubSub Subscription stale cleaner as dry run
    This cleaner will delete the stale PubSub subscriptions that do not have 
any associated topics.
    
    * Add tests for the PubSub subscription deletion class
---
 .test-infra/tools/stale_cleaner.py      | 82 ++++++++++++++++++++++++++++++-
 .test-infra/tools/test_stale_cleaner.py | 85 ++++++++++++++++++++++++++++++++-
 2 files changed, 164 insertions(+), 3 deletions(-)

diff --git a/.test-infra/tools/stale_cleaner.py 
b/.test-infra/tools/stale_cleaner.py
index a554473c7c8..2c702cfa0fd 100644
--- a/.test-infra/tools/stale_cleaner.py
+++ b/.test-infra/tools/stale_cleaner.py
@@ -25,6 +25,7 @@ from google.cloud import pubsub_v1, storage
 
 # Resource types
 PUBSUB_TOPIC_RESOURCE = "pubsub_topic"
+PUBSUB_SUBSCRIPTION_RESOURCE = "pubsub_subscription"
 
 # Storage constants
 STORAGE_PREFIX = "stale_cleaner/"
@@ -34,6 +35,7 @@ PROJECT_PATH_PREFIX = "projects/" # Prefix for the project 
path in GCP *This is
 
 # Time constants (in seconds)
 DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400  # 1 day
+DEFAULT_PUBSUB_SUBSCRIPTION_THRESHOLD = 86400  # 1 day
 DEFAULT_TIME_THRESHOLD = 3600  # 1 hour
 
 # Default values for testing
@@ -285,6 +287,12 @@ class StaleCleaner:
 
 # PubSub topic cleaner
 class PubSubTopicCleaner(StaleCleaner):
+    """
+    This cleaner will delete PubSub topics that are stale based on the time 
threshold.
+    It uses the PubSub API to list and delete topics.
+    It also applies prefix filtering to only delete topics that match the 
specified prefixes.
+    """
+
     def __init__(self, project_id: str, bucket_name: str,
                     prefixes: list = None, time_threshold: int = 
DEFAULT_PUBSUB_TOPIC_THRESHOLD,
                     clock: Clock = None) -> None:
@@ -304,7 +312,50 @@ class PubSubTopicCleaner(StaleCleaner):
         print(f"{self.clock()} - Deleting PubSub topic {resource_name}")
         self.client.delete_topic(request={"topic": resource_name})
 
-if __name__ == "__main__":
+# PubSub Subscription cleaner
+class PubSubSubscriptionCleaner(StaleCleaner):
+    """
+    This cleaner will delete PubSub subscriptions that are stale based on the 
time threshold.
+    It uses the PubSub API to list and delete subscriptions.
+    It also applies prefix filtering to only delete subscriptions that match 
the specified prefixes.
+    It checks if the subscription is detached (whether it has a topic 
associated with it).
+    If it is detached, it will be considered stale and eligible for deletion.
+    """
+
+    def __init__(self, project_id: str, bucket_name: str,
+                    prefixes: list = None, time_threshold: int = 
DEFAULT_PUBSUB_SUBSCRIPTION_THRESHOLD,
+                    clock: Clock = None) -> None:
+        super().__init__(project_id, PUBSUB_SUBSCRIPTION_RESOURCE, 
bucket_name, prefixes, time_threshold, clock)
+        self.client = None  # Will be initialized in each method that needs it
+
+    def _active_resources(self) -> dict:
+        d = {}
+        self.client = pubsub_v1.SubscriberClient()
+
+        with self.client:
+            for subscription in 
self.client.list_subscriptions(request={"project": self.project_path}):
+                subscription_name = subscription.name
+                # Apply prefix filtering if prefixes are defined
+                if not self.prefixes or 
any(subscription_name.startswith(f"{self.project_path}/subscriptions/{prefix}") 
for prefix in self.prefixes):
+                    # Check if the subscription has a topic associated with it
+                    if subscription.detached:
+                        d[subscription_name] = 
GoogleCloudResource(resource_name=subscription_name, clock=self.clock)
+
+        return d
+
+    def _delete_resource(self, resource_name: str) -> None:
+        self.client = pubsub_v1.SubscriberClient()
+        print(f"{self.clock()} - Deleting PubSub subscription {resource_name}")
+        with self.client:
+            subscription_path = self.client.subscription_path(self.project_id, 
resource_name)
+            self.client.delete_subscription(request={"subscription": 
subscription_path})
+
+def clean_pubsub_topics():
+    """ Clean up stale PubSub topics in the specified GCP project.
+    This function initializes the PubSubTopicCleaner with the default project 
ID and bucket name,
+    and a predefined list of topic prefixes.
+    It then refreshes the resources and deletes any stale topics.
+    """
     project_id = DEFAULT_PROJECT_ID
     bucket_name = DEFAULT_BUCKET_NAME
 
@@ -355,3 +406,32 @@ if __name__ == "__main__":
 
     # Delete stale resources
     cleaner.delete_stale(dry_run=False)
+
+def clean_pubsub_subscriptions():
+    """ Clean up stale PubSub subscriptions in the specified GCP project.
+    This function initializes the PubSubSubscriptionCleaner with the default 
project ID and bucket name,
+    and a predefined list of subscription prefixes.
+    It then refreshes the resources and deletes any stale subscriptions.
+    """
+    project_id = DEFAULT_PROJECT_ID
+    bucket_name = DEFAULT_BUCKET_NAME
+
+    # No prefixes are defined for subscriptions so we will delete all stale 
subscriptions
+    prefixes = []
+
+    # Create a PubSubSubscriptionCleaner instance
+    cleaner = PubSubSubscriptionCleaner(project_id=project_id, 
bucket_name=bucket_name,
+                                        prefixes=prefixes, 
time_threshold=DEFAULT_PUBSUB_SUBSCRIPTION_THRESHOLD)
+
+    # Refresh resources
+    cleaner.refresh()
+
+    # Delete stale resources
+    cleaner.delete_stale(dry_run=True) # Keep dry_run=True to avoid accidental 
deletions during testing
+
+if __name__ == "__main__":
+    # Clean up stale PubSub topics
+    clean_pubsub_topics()
+
+    # Clean up stale PubSub subscriptions
+    clean_pubsub_subscriptions()
diff --git a/.test-infra/tools/test_stale_cleaner.py 
b/.test-infra/tools/test_stale_cleaner.py
index b917c1e9b4d..c53fbc1a44d 100644
--- a/.test-infra/tools/test_stale_cleaner.py
+++ b/.test-infra/tools/test_stale_cleaner.py
@@ -25,6 +25,7 @@ from stale_cleaner import (
     GoogleCloudResource,
     StaleCleaner,
     PubSubTopicCleaner,
+    PubSubSubscriptionCleaner,
     FakeClock,
     DEFAULT_TIME_THRESHOLD,
     PUBSUB_TOPIC_RESOURCE,
@@ -390,6 +391,86 @@ class StaleCleanerTest(unittest.TestCase):
         self.assertNotIn("deleted-resource", self.cleaner.deleted_resources)
 
 
+class PubSubSubscriptionCleanerTest(unittest.TestCase):
+    """Tests for the PubSubSubscriptionCleaner class."""
+
+    def setUp(self):
+        """Set up test fixtures."""
+        self.project_id = "test-project"
+        self.bucket_name = "test-bucket"
+        self.prefixes = ["test-prefix"]
+        self.time_threshold = 86400  # 1 day
+        self.fake_clock = FakeClock("2025-05-28T10:00:00")
+
+        # Mock PubSub client
+        self.mock_client_patcher = 
mock.patch('google.cloud.pubsub_v1.SubscriberClient')
+        self.MockSubscriberClientClass = self.mock_client_patcher.start()
+        self.mock_subscriber_client = 
self.MockSubscriberClientClass.return_value
+
+        # Create a test cleaner
+        self.cleaner = PubSubSubscriptionCleaner(
+            project_id=self.project_id,
+            bucket_name=self.bucket_name,
+            prefixes=self.prefixes,
+            time_threshold=self.time_threshold,
+            clock=self.fake_clock
+        )
+
+        self.cleaner._write_resources = SilencedMock()
+        self.cleaner._stored_resources = SilencedMock(return_value={})
+
+    def tearDown(self):
+        """Tear down test fixtures."""
+        self.mock_client_patcher.stop()
+
+    def test_init(self):
+        """Test initialization."""
+        self.assertEqual(self.cleaner.project_id, self.project_id)
+        self.assertEqual(self.cleaner.bucket_name, self.bucket_name)
+        self.assertEqual(self.cleaner.prefixes, self.prefixes)
+        self.assertEqual(self.cleaner.time_threshold, self.time_threshold)
+        self.assertIsInstance(self.cleaner.clock, FakeClock)
+
+    def test_active_resources(self):
+        """Test _active_resources method."""
+        # Mock subscriptions
+        sub1 = mock.Mock()
+        sub1.name = "projects/test-project/subscriptions/test-prefix-sub1"
+        sub1.topic = "projects/test-project/topics/some-topic"
+
+        sub2 = mock.Mock()
+        sub2.name = 
"projects/test-project/subscriptions/test-prefix-sub2-detached"
+        sub2.topic = "_deleted-topic_"
+
+        sub3 = mock.Mock()
+        sub3.name = "projects/test-project/subscriptions/other-prefix-sub3"
+        sub3.topic = "projects/test-project/topics/another-topic"
+
+        self.mock_subscriber_client.list_subscriptions.return_value = [sub1, 
sub2, sub3]
+
+        with SilencePrint():
+            active = self.cleaner._active_resources()
+
+        self.assertIn("projects/test-project/subscriptions/test-prefix-sub1", 
active)
+        
self.assertIn("projects/test-project/subscriptions/test-prefix-sub2-detached", 
active)
+        
self.assertNotIn("projects/test-project/subscriptions/other-prefix-sub3", 
active)
+        self.assertEqual(len(active), 2)
+
+    def test_delete_resource(self):
+        """Test _delete_resource method."""
+        sub_name = "test-sub-to-delete"
+        subscription_path = 
f"projects/{self.project_id}/subscriptions/{sub_name}"
+        self.mock_subscriber_client.subscription_path.return_value = 
subscription_path
+
+        with SilencePrint():
+            self.cleaner._delete_resource(sub_name)
+
+        
self.mock_subscriber_client.subscription_path.assert_called_once_with(self.project_id,
 sub_name)
+        
self.mock_subscriber_client.delete_subscription.assert_called_once_with(
+            request={'subscription': subscription_path}
+        )
+
+
 class PubSubTopicCleanerTest(unittest.TestCase):
     """Tests for the PubSubTopicCleaner class."""
 
@@ -484,10 +565,10 @@ class PubSubTopicCleanerTest(unittest.TestCase):
             self.cleaner._delete_resource(resource_name)
 
             # Check that delete_topic was called
-            
self.cleaner.client.delete_topic.assert_called_once_with(name=resource_name)
+            
self.cleaner.client.delete_topic.assert_called_once_with(request={'topic': 
resource_name})
 
             # Check that correct message was printed
-            mock_print.assert_called_once_with(f"{self.cleaner.clock()} - 
Deleting PubSub topic test-topic")
+            mock_print.assert_called_once_with(f"{self.cleaner.clock()} - 
Deleting PubSub topic {resource_name}")
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to