pabloem commented on code in PR #34141: URL: https://github.com/apache/beam/pull/34141#discussion_r2027697506
########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,258 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import pickle +from collections import defaultdict +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" +WORKING_SUFFIX = "-working" +DELETED_SUFFIX = "-deleted" + +# File paths +WORKING_RESOURCES_TEMP_FILE = "/tmp/working_resources.bin" +LAST_WORKING_RESOURCES_TEMP_FILE = "/tmp/last_working_resources.bin" +DELETED_RESOURCES_TEMP_FILE = "/tmp/deleted_resources.bin" +LAST_DELETED_RESOURCES_TEMP_FILE = "/tmp/last_deleted_resources.bin" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + resource_name = "" + resource_type = "" + creation_date = datetime.datetime.now() + last_update_date = datetime.datetime.now() Review Comment: remove these. we are defining them in the constructor. ```suggestion ``` ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,258 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import pickle +from collections import defaultdict +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" +WORKING_SUFFIX = "-working" +DELETED_SUFFIX = "-deleted" + +# File paths +WORKING_RESOURCES_TEMP_FILE = "/tmp/working_resources.bin" +LAST_WORKING_RESOURCES_TEMP_FILE = "/tmp/last_working_resources.bin" +DELETED_RESOURCES_TEMP_FILE = "/tmp/deleted_resources.bin" +LAST_DELETED_RESOURCES_TEMP_FILE = "/tmp/last_deleted_resources.bin" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + resource_name = "" + resource_type = "" + creation_date = datetime.datetime.now() + last_update_date = datetime.datetime.now() + + def __init__(self, resource_name, resource_type, creation_date=datetime.datetime.now(), last_update_date=datetime.datetime.now()) -> None: + self.resource_name = resource_name + self.resource_type = resource_type + self.creation_date = creation_date + self.last_update = last_update_date Review Comment: I have a couple of changes to suggest here: ```suggestion def __init__(self, resource_name, resource_type, creation_date=None, last_update_date=None, clock=datetime.datetime.now) -> None: self.resource_name = resource_name self.resource_type = resource_type self._clock = clock self.creation_date = creation_date or self._clock() self.last_update = last_update_date or self._clock() ``` 1. Why do we remove the datetime.now() from the default value of the constructor's parameter? Because when we add it as a default value in __init__ params, it is **only evaluated once**: This code here: ``` print('start', datetime.datetime.now()) class MyClass: def __init__(self, value=datetime.datetime.now()): self.value = value print('first',MyClass().value) time.sleep(300) # Sleep 5 minutes print('second',MyClass(datetime.datetime.now()).value) print('third',MyClass().value) ``` The above prints: ``` > start 2025-04-03 00:00:00 > first 2025-04-03 00:00:00 > second 2025-04-03 00:05:00 > third 2025-04-03 00:00:00 ``` Because the default parameter is only evaluated once. 2. Why do we pass a 'clock' function? This will be useful for testing. In 'real' runs, we will never change it. In testing we will pass it and control the 'current time' to rum tests. LMK if that makes sense. ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,258 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import pickle +from collections import defaultdict +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" +WORKING_SUFFIX = "-working" +DELETED_SUFFIX = "-deleted" + +# File paths +WORKING_RESOURCES_TEMP_FILE = "/tmp/working_resources.bin" +LAST_WORKING_RESOURCES_TEMP_FILE = "/tmp/last_working_resources.bin" +DELETED_RESOURCES_TEMP_FILE = "/tmp/deleted_resources.bin" +LAST_DELETED_RESOURCES_TEMP_FILE = "/tmp/last_deleted_resources.bin" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + resource_name = "" + resource_type = "" + creation_date = datetime.datetime.now() + last_update_date = datetime.datetime.now() + + def __init__(self, resource_name, resource_type, creation_date=datetime.datetime.now(), last_update_date=datetime.datetime.now()) -> None: + self.resource_name = resource_name + self.resource_type = resource_type + self.creation_date = creation_date + self.last_update = last_update_date + + def __str__(self) -> str: + return f"{self.resource_name}" + + def update(self) -> None: + self.last_update_date = datetime.datetime.now() + + def time_alive(self) -> datetime.timedelta: + return datetime.datetime.now() - self.creation_date + +class StaleCleaner: + """ + StaleCleaner is a class that is used to detect stale resources in the Google Cloud Platform. + It is used to detect resources that are no longer needed and delete them. + + The update process goes through the following steps: + + 1. Get the resources that exist right now + 2. Get the resources that were working the last time this script was run + 3. Get the resources that were already deleted the last time this script was run + 4. Go through the working resource list comparing the working list with the new list + 5. If no longer exists, add it to the deleted dictionary + 6. If it exists, update the last update date + 7. If the resource did not existed before, add it to the working dictionary + 8. Delete the resources that are no longer alive (Not implemented yet) + 9. Add all the new resources to the working dictionary + 10. Save the working and deleted resources to the google bucket + """ + + # Create a new StaleCleaner object + def __init__(self, project_id, bucket_name, prefixes=None, time_threshold=DEFAULT_TIME_THRESHOLD) -> None: + self.project_id = project_id + self.project_path = f"{PROJECT_PATH_PREFIX}{project_id}" + self.bucket_name = bucket_name + self.prefixes = prefixes or [] + self.time_threshold = time_threshold + + # Dictionary of resources that exists right now, this needs to be created for each resource type + def get_now_resources(self) -> defaultdict: + pass + + # Get working dictionary of resources from google bucket + def get_working_resources(self) -> defaultdict: + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blobs = bucket.list_blobs(prefix=STORAGE_PREFIX) + + working_filtered_blobs = [blob for blob in blobs if f"{self.resource_type}{WORKING_SUFFIX}" in blob.name] + if working_filtered_blobs: + last_working_blob = max(working_filtered_blobs, key=lambda b: b.time_created) + else : + last_working_blob = None + + # Get the resource dictionary from the blob, if it exists, else create an empty dictionary + if last_working_blob is not None: + # Download the last resources from the blob + last_working_blob.download_to_filename(LAST_WORKING_RESOURCES_TEMP_FILE) + with open(LAST_WORKING_RESOURCES_TEMP_FILE, "rb") as f: + # Load the last resources from the blob + working_resource_dict = pickle.load(f) + else: + working_resource_dict = defaultdict(GoogleCloudResource) + + return working_resource_dict + + # Get deleted resources from google bucket + def get_deleted_resources(self) -> defaultdict: + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blobs = bucket.list_blobs(prefix=STORAGE_PREFIX) + + # Get the deleted resources dictionary from the blob, if it exists, else create an empty dictionary + deleted_filtered_blobs = [blob for blob in blobs if f"{self.resource_type}{DELETED_SUFFIX}" in blob.name] + if deleted_filtered_blobs: + last_deleted_blob = max(deleted_filtered_blobs, key=lambda b: b.time_created) + else: + last_deleted_blob = None + + # Get the resource dictionary from the blob, if it exists, else create an empty dictionary + if last_deleted_blob is not None: + # Download the last resources from the blob + last_deleted_blob.download_to_filename(LAST_DELETED_RESOURCES_TEMP_FILE) + with open(LAST_DELETED_RESOURCES_TEMP_FILE, "rb") as f: + # Load the last resources from the blob + deleted_resource_dict = pickle.load(f) + else: + deleted_resource_dict = defaultdict(GoogleCloudResource) + + return deleted_resource_dict + + # Get the resources that are older than the threshold (in seconds) + def get_old_resources(self) -> defaultdict: + # Traverse the working resources and get the ones that are older than the threshold + working_resource_dict = self.get_working_resources() + old_resource_dict = defaultdict(GoogleCloudResource) + + time_threshold_delta = datetime.timedelta(seconds=self.time_threshold) + for resource_name, resource_obj in working_resource_dict.items(): + if resource_obj.time_alive() > time_threshold_delta: + old_resource_dict[resource_name] = resource_obj + + return old_resource_dict + + # Set the working dictionary of resources in the google bucket + def set_working_resources(self, working_resource_dict) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + + with open(WORKING_RESOURCES_TEMP_FILE, "wb") as f: + pickle.dump(working_resource_dict, f) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}{WORKING_SUFFIX}") + blob.upload_from_filename(WORKING_RESOURCES_TEMP_FILE) + + # Set the deleted dictionary of resources in the google bucket + def set_deleted_resources(self, deleted_resource_dict) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + + with open(DELETED_RESOURCES_TEMP_FILE, "wb") as f: + pickle.dump(deleted_resource_dict, f) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}{DELETED_SUFFIX}") + blob.upload_from_filename(DELETED_RESOURCES_TEMP_FILE) + + # Save the new project state + def update(self) -> None: + # Get working resource list + now_resources_dict = self.get_now_resources() + working_resource_dict = self.get_working_resources() + deleted_resource_dict = self.get_deleted_resources() + + working_resources_to_delete = [] + + # Go through the working resource list comparing the working list with the new list + for resource_name, resource_obj in working_resource_dict.items(): + # If no longer exists, add it to the deleted dictionary + if resource_name not in now_resources_dict: + deleted_resource_dict[resource_name] = resource_obj + working_resources_to_delete.append(resource_name) + # If it exists, update the last update date + else: + working_resource_dict[resource_name].update() + now_resources_dict.pop(resource_name) + + # Delete the resources that are no longer alive + for resource_name in working_resources_to_delete: + working_resource_dict.pop(resource_name) + + # Add all the new resources to the working dictionary + for resource_name, resource_obj in now_resources_dict.items(): + working_resource_dict[resource_name] = resource_obj + + # Save the working and deleted resources + self.set_working_resources(working_resource_dict) + self.set_deleted_resources(deleted_resource_dict) + +# Create a new PubSub topic cleaner +class PubSubTopicCleaner(StaleCleaner): + def __init__(self, project_id, bucket_name, prefixes=None, time_threshold=DEFAULT_PUBSUB_TOPIC_THRESHOLD) -> None: + super().__init__(project_id, bucket_name, prefixes, time_threshold) + self.client = pubsub_v1.PublisherClient() + self.resource_type = PUBSUB_TOPIC_RESOURCE + + def get_now_resources(self) -> defaultdict: + d = defaultdict(GoogleCloudResource) + for topic in self.client.list_topics(request={"project": self.project_path}): + # Only include topics that match any of the specified prefixes + topic_name = topic.name.split('/')[-1] + if not self.prefixes or any(topic_name.startswith(prefix) for prefix in self.prefixes): + d[topic.name] = GoogleCloudResource(topic.name, self.resource_type) + return d + +# Local testing +if __name__ == "__main__": + cleaner = PubSubTopicCleaner(DEFAULT_PROJECT_ID, DEFAULT_BUCKET_NAME) + cleaner.update() + + print("Resources that exist right now") + d = cleaner.get_now_resources() + for k, v in d.items(): + print(f"{k} -> {v.time_alive()}") + + print("Resources that were working the last time this script was run") + d = cleaner.get_working_resources() + for k, v in d.items(): + print(f"{k} -> {v.time_alive()}") + + print("Resources that were already deleted the last time this script was run") + d = cleaner.get_deleted_resources() + for k, v in d.items(): + print(f"{k} -> {v.time_alive()}") + + print(f"Resources that are older than {cleaner.time_threshold} seconds") + d = cleaner.get_old_resources() + for k, v in d.items(): + print(f"{k} -> {v.time_alive()}") + Review Comment: thanks Quique this is much easier to understand. I want to change the API of our cleaner to be more intuitive to its intended usage: ``` class Cleaner: def refresh(self): # Run this to load all of the data def stale_resources(self) -> List[...]: # Run this to see the list of _stale_ resources i.e. resources that would be deleted def fresh_resources(self) -> List[...]: # Run this to see the list of resources that are NOT stale. def delete_stale(self, dry_run=True): # Run this after loading the data, to actually delete stale resources. The dry_run flag tells us when to 'preview' deletions and when to actually execute them ``` Could you change the implementation to use these methods? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org