pabloem commented on code in PR #34141: URL: https://github.com/apache/beam/pull/34141#discussion_r2103493193
########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock Review Comment: self.clock is always assigned, so this should just be: ```suggestion self.last_update_date = self.clock() ``` ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock + + def time_alive(self, clock: datetime.datetime = None) -> int: + """ + Get the time since the resource was created (in seconds). + """ + clock = clock or datetime.datetime.now() + return (clock - self.creation_date).total_seconds() Review Comment: ```suggestion return (self.clock() - self.creation_date).total_seconds() ``` ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource Review Comment: Note that our clock is NOT a fixed datetime. It's a FUNCTION that returns a datetime. So you would do something like this: ```suggestion last_update_date: datetime.datetime = None, clock: Callable[[], datetime.datetime] = None) -> None: self.resource_name = resource_name clock = clock or datetime.datetime.now # Function that returns current time or use another one for testing self.creation_date = creation_date or clock() # Date of first appearance of the resource ``` Can you see that `clock` is actually a function? (e.g. `clock = datetime.datetime.now` is different than `clock = datetime.datetime.now()` - the first makes clock a FUNCTION, and the second makes it a value returned by that function) ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock + + def time_alive(self, clock: datetime.datetime = None) -> int: + """ + Get the time since the resource was created (in seconds). + """ + clock = clock or datetime.datetime.now() + return (clock - self.creation_date).total_seconds() + +class StaleCleaner: + """ + StaleCleaner is a class that is used to detect stale resources in the Google Cloud Platform. + It is used to detect resources that are no longer needed and delete them. + + Methods: + + refresh(): + Load all data with the current datetime + + stale_resources(): + Dict of _stale_ resources that should be deleted + + fresh_resources(): + Dict of resources that are NOT stale + + def delete_stale(dry_run=True): + Delete all stale resources (dry_run by default) + """ + + # Create a new StaleCleaner object + def __init__(self, project_id: str, resource_type: str, bucket_name: str, + prefixes: list = None, time_threshold: int = DEFAULT_TIME_THRESHOLD) -> None: + self.project_id = project_id + self.project_path = f"{PROJECT_PATH_PREFIX}{project_id}" + self.resource_type = resource_type + self.bucket_name = bucket_name + self.prefixes = prefixes or [] + self.time_threshold = time_threshold + + def __delete_resource(self, resource_name: str) -> None: + """ + Different for each resource type. Delete the resource from GCP. + """ + pass + + def __active_resources(self, clock: datetime.datetime = None) -> dict: + """ + Different for each resource type. Get the active resources from GCP as a dictionary. + The dictionary is a dict of GoogleCloudResource objects. + The key is the resource name and the value is the GoogleCloudResource object. + The clock is for testing purposes. It gives the resources a specific creation date. + """ + pass + + def __write_resources(self, resources: dict) -> None: + """ + Write existing resources to the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + resource_dict = {k: v.to_dict() for k, v in resources.items()} + blob_json = json.dumps(resource_dict, indent=4) + + blob.upload_from_string(blob_json, content_type="application/json") + print(f"Resources written to {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json") + + def __stored_resources(self) -> dict: Review Comment: ```suggestion def _stored_resources(self) -> dict: ``` single prefixed underscore is enough for these functions ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock + + def time_alive(self, clock: datetime.datetime = None) -> int: + """ + Get the time since the resource was created (in seconds). + """ + clock = clock or datetime.datetime.now() + return (clock - self.creation_date).total_seconds() + +class StaleCleaner: + """ + StaleCleaner is a class that is used to detect stale resources in the Google Cloud Platform. + It is used to detect resources that are no longer needed and delete them. + + Methods: + + refresh(): + Load all data with the current datetime + + stale_resources(): + Dict of _stale_ resources that should be deleted + + fresh_resources(): + Dict of resources that are NOT stale + + def delete_stale(dry_run=True): + Delete all stale resources (dry_run by default) + """ + + # Create a new StaleCleaner object + def __init__(self, project_id: str, resource_type: str, bucket_name: str, + prefixes: list = None, time_threshold: int = DEFAULT_TIME_THRESHOLD) -> None: + self.project_id = project_id + self.project_path = f"{PROJECT_PATH_PREFIX}{project_id}" + self.resource_type = resource_type + self.bucket_name = bucket_name + self.prefixes = prefixes or [] + self.time_threshold = time_threshold + + def __delete_resource(self, resource_name: str) -> None: + """ + Different for each resource type. Delete the resource from GCP. + """ + pass + + def __active_resources(self, clock: datetime.datetime = None) -> dict: + """ + Different for each resource type. Get the active resources from GCP as a dictionary. + The dictionary is a dict of GoogleCloudResource objects. + The key is the resource name and the value is the GoogleCloudResource object. + The clock is for testing purposes. It gives the resources a specific creation date. + """ + pass + + def __write_resources(self, resources: dict) -> None: Review Comment: ```suggestion def _write_resources(self, resources: dict) -> None: ``` single prefixed underscore is enough for these functions ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock + + def time_alive(self, clock: datetime.datetime = None) -> int: + """ + Get the time since the resource was created (in seconds). + """ + clock = clock or datetime.datetime.now() + return (clock - self.creation_date).total_seconds() + +class StaleCleaner: + """ + StaleCleaner is a class that is used to detect stale resources in the Google Cloud Platform. + It is used to detect resources that are no longer needed and delete them. + + Methods: + + refresh(): + Load all data with the current datetime + + stale_resources(): + Dict of _stale_ resources that should be deleted + + fresh_resources(): + Dict of resources that are NOT stale + + def delete_stale(dry_run=True): + Delete all stale resources (dry_run by default) + """ + + # Create a new StaleCleaner object + def __init__(self, project_id: str, resource_type: str, bucket_name: str, + prefixes: list = None, time_threshold: int = DEFAULT_TIME_THRESHOLD) -> None: + self.project_id = project_id + self.project_path = f"{PROJECT_PATH_PREFIX}{project_id}" + self.resource_type = resource_type + self.bucket_name = bucket_name + self.prefixes = prefixes or [] + self.time_threshold = time_threshold + + def __delete_resource(self, resource_name: str) -> None: + """ + Different for each resource type. Delete the resource from GCP. + """ + pass + + def __active_resources(self, clock: datetime.datetime = None) -> dict: + """ + Different for each resource type. Get the active resources from GCP as a dictionary. + The dictionary is a dict of GoogleCloudResource objects. + The key is the resource name and the value is the GoogleCloudResource object. + The clock is for testing purposes. It gives the resources a specific creation date. + """ + pass + + def __write_resources(self, resources: dict) -> None: + """ + Write existing resources to the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + resource_dict = {k: v.to_dict() for k, v in resources.items()} + blob_json = json.dumps(resource_dict, indent=4) + + blob.upload_from_string(blob_json, content_type="application/json") + print(f"Resources written to {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json") + + def __stored_resources(self) -> dict: + """ + Get the stored resources from the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + if not blob.exists(): + print(f"Blob {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json does not exist.") + return {} + + blob_string = blob.download_as_text() + blob_dict = json.loads(blob_string) + + # Convert the dictionary to a dict of GoogleCloudResource objects + resources = {} + for k, v in blob_dict.items(): + resources[k] = GoogleCloudResource( + resource_name=v["resource_name"], + creation_date=datetime.datetime.fromisoformat(v["creation_date"]), + last_update_date=datetime.datetime.fromisoformat(v["last_update_date"]) + ) + return resources + + def refresh(self, clock: datetime.datetime = None) -> None: + """ + Refresh the resources time and save them to the google bucket. + The process goes through the following steps: + 1. Get the resources that exist in the GCP + 2. Get the resources that were working the last time this script was run + 3. Delete from the stored resources the ones that are no longer alive + 4. Add the new resources to the working dictionary + 5. Save the working resources to the google bucket + """ + clock = clock or datetime.datetime.now() + stored_resources = self.__stored_resources() + active_resources = self.__active_resources(clock=clock) + + for k, v in list(stored_resources.items()): + if k not in active_resources: + print(f"Resource {k} is no longer alive. Deleting it from the stored resources.") + del stored_resources[k] + else: + stored_resources[k].update(clock=clock) + + for k, v in active_resources.items(): + if k not in stored_resources: + stored_resources[k] = v + + self.__write_resources(stored_resources) + + def stale_resources(self, clock: datetime.datetime = None) -> dict: + """ + Get the stale resources that should be deleted. + The process goes through the following steps: + 1. Get the stored resources + 2. Compare the time since the creation date of the resource with the time threshold + 3. If the time since the creation date is greater than the time threshold, add it to the stale resources + """ + clock = clock or datetime.datetime.now() Review Comment: ```suggestion clock = clock or datetime.datetime.now ``` ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock + + def time_alive(self, clock: datetime.datetime = None) -> int: + """ + Get the time since the resource was created (in seconds). + """ + clock = clock or datetime.datetime.now() + return (clock - self.creation_date).total_seconds() + +class StaleCleaner: + """ + StaleCleaner is a class that is used to detect stale resources in the Google Cloud Platform. + It is used to detect resources that are no longer needed and delete them. + + Methods: + + refresh(): + Load all data with the current datetime + + stale_resources(): + Dict of _stale_ resources that should be deleted + + fresh_resources(): + Dict of resources that are NOT stale + + def delete_stale(dry_run=True): + Delete all stale resources (dry_run by default) + """ + + # Create a new StaleCleaner object + def __init__(self, project_id: str, resource_type: str, bucket_name: str, + prefixes: list = None, time_threshold: int = DEFAULT_TIME_THRESHOLD) -> None: + self.project_id = project_id + self.project_path = f"{PROJECT_PATH_PREFIX}{project_id}" + self.resource_type = resource_type + self.bucket_name = bucket_name + self.prefixes = prefixes or [] + self.time_threshold = time_threshold + + def __delete_resource(self, resource_name: str) -> None: + """ + Different for each resource type. Delete the resource from GCP. + """ + pass + + def __active_resources(self, clock: datetime.datetime = None) -> dict: + """ + Different for each resource type. Get the active resources from GCP as a dictionary. + The dictionary is a dict of GoogleCloudResource objects. + The key is the resource name and the value is the GoogleCloudResource object. + The clock is for testing purposes. It gives the resources a specific creation date. + """ + pass + + def __write_resources(self, resources: dict) -> None: + """ + Write existing resources to the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + resource_dict = {k: v.to_dict() for k, v in resources.items()} + blob_json = json.dumps(resource_dict, indent=4) + + blob.upload_from_string(blob_json, content_type="application/json") + print(f"Resources written to {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json") + + def __stored_resources(self) -> dict: + """ + Get the stored resources from the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + if not blob.exists(): + print(f"Blob {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json does not exist.") + return {} + + blob_string = blob.download_as_text() + blob_dict = json.loads(blob_string) + + # Convert the dictionary to a dict of GoogleCloudResource objects + resources = {} + for k, v in blob_dict.items(): + resources[k] = GoogleCloudResource( + resource_name=v["resource_name"], + creation_date=datetime.datetime.fromisoformat(v["creation_date"]), + last_update_date=datetime.datetime.fromisoformat(v["last_update_date"]) + ) + return resources + + def refresh(self, clock: datetime.datetime = None) -> None: + """ + Refresh the resources time and save them to the google bucket. + The process goes through the following steps: + 1. Get the resources that exist in the GCP + 2. Get the resources that were working the last time this script was run + 3. Delete from the stored resources the ones that are no longer alive + 4. Add the new resources to the working dictionary + 5. Save the working resources to the google bucket + """ + clock = clock or datetime.datetime.now() + stored_resources = self.__stored_resources() + active_resources = self.__active_resources(clock=clock) + + for k, v in list(stored_resources.items()): + if k not in active_resources: + print(f"Resource {k} is no longer alive. Deleting it from the stored resources.") + del stored_resources[k] + else: + stored_resources[k].update(clock=clock) + + for k, v in active_resources.items(): + if k not in stored_resources: + stored_resources[k] = v + + self.__write_resources(stored_resources) + + def stale_resources(self, clock: datetime.datetime = None) -> dict: Review Comment: ```suggestion def stale_resources(self, clock: Callable[[],datetime.datetime] = None) -> dict: ``` ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock + + def time_alive(self, clock: datetime.datetime = None) -> int: + """ + Get the time since the resource was created (in seconds). + """ + clock = clock or datetime.datetime.now() + return (clock - self.creation_date).total_seconds() + +class StaleCleaner: + """ + StaleCleaner is a class that is used to detect stale resources in the Google Cloud Platform. + It is used to detect resources that are no longer needed and delete them. + + Methods: + + refresh(): + Load all data with the current datetime + + stale_resources(): + Dict of _stale_ resources that should be deleted + + fresh_resources(): + Dict of resources that are NOT stale + + def delete_stale(dry_run=True): + Delete all stale resources (dry_run by default) + """ + + # Create a new StaleCleaner object + def __init__(self, project_id: str, resource_type: str, bucket_name: str, + prefixes: list = None, time_threshold: int = DEFAULT_TIME_THRESHOLD) -> None: + self.project_id = project_id + self.project_path = f"{PROJECT_PATH_PREFIX}{project_id}" + self.resource_type = resource_type + self.bucket_name = bucket_name + self.prefixes = prefixes or [] + self.time_threshold = time_threshold + + def __delete_resource(self, resource_name: str) -> None: + """ + Different for each resource type. Delete the resource from GCP. + """ + pass + + def __active_resources(self, clock: datetime.datetime = None) -> dict: + """ + Different for each resource type. Get the active resources from GCP as a dictionary. + The dictionary is a dict of GoogleCloudResource objects. + The key is the resource name and the value is the GoogleCloudResource object. + The clock is for testing purposes. It gives the resources a specific creation date. + """ + pass + + def __write_resources(self, resources: dict) -> None: + """ + Write existing resources to the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + resource_dict = {k: v.to_dict() for k, v in resources.items()} + blob_json = json.dumps(resource_dict, indent=4) + + blob.upload_from_string(blob_json, content_type="application/json") + print(f"Resources written to {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json") + + def __stored_resources(self) -> dict: + """ + Get the stored resources from the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + if not blob.exists(): + print(f"Blob {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json does not exist.") + return {} + + blob_string = blob.download_as_text() + blob_dict = json.loads(blob_string) + + # Convert the dictionary to a dict of GoogleCloudResource objects + resources = {} + for k, v in blob_dict.items(): + resources[k] = GoogleCloudResource( + resource_name=v["resource_name"], + creation_date=datetime.datetime.fromisoformat(v["creation_date"]), + last_update_date=datetime.datetime.fromisoformat(v["last_update_date"]) + ) + return resources + + def refresh(self, clock: datetime.datetime = None) -> None: + """ + Refresh the resources time and save them to the google bucket. + The process goes through the following steps: + 1. Get the resources that exist in the GCP + 2. Get the resources that were working the last time this script was run + 3. Delete from the stored resources the ones that are no longer alive + 4. Add the new resources to the working dictionary + 5. Save the working resources to the google bucket + """ + clock = clock or datetime.datetime.now() + stored_resources = self.__stored_resources() + active_resources = self.__active_resources(clock=clock) + + for k, v in list(stored_resources.items()): + if k not in active_resources: + print(f"Resource {k} is no longer alive. Deleting it from the stored resources.") + del stored_resources[k] + else: + stored_resources[k].update(clock=clock) + + for k, v in active_resources.items(): + if k not in stored_resources: + stored_resources[k] = v + + self.__write_resources(stored_resources) + + def stale_resources(self, clock: datetime.datetime = None) -> dict: + """ + Get the stale resources that should be deleted. + The process goes through the following steps: + 1. Get the stored resources + 2. Compare the time since the creation date of the resource with the time threshold + 3. If the time since the creation date is greater than the time threshold, add it to the stale resources + """ + clock = clock or datetime.datetime.now() + stored_resources = self.__stored_resources() + stale_resources = {} + + for k, v in stored_resources.items(): + if v.time_alive(clock=clock) > self.time_threshold: + stale_resources[k] = v + + return stale_resources + + def fresh_resources(self, clock: datetime.datetime = None) -> dict: + """ + Get the fresh resources that are not stale. + The process goes through the following steps: + 1. Get the stored resources + 2. Compare the time since the creation date of the resource with the time threshold + 3. If the time since the creation date is less than the time threshold, add it to the fresh resources + """ + clock = clock or datetime.datetime.now() + stored_resources = self.__stored_resources() + fresh_resources = {} + + for k, v in stored_resources.items(): + if v.time_alive(clock=clock) <= self.time_threshold: + fresh_resources[k] = v + + return fresh_resources + + def delete_stale(self, clock: datetime.datetime = None, dry_run: bool = True) -> None: + """ + Delete the stale resources. + The process goes through the following steps: + 1. Get the stale resources + 2. Check if they still exist in GCP + 3. If they exist, delete them + 4. If dry_run is True, do not delete them, just print the names + """ + clock = clock or datetime.datetime.now() + stale_resources = self.stale_resources(clock=clock) Review Comment: ```suggestion stale_resources = self.stale_resources(clock=self.clock) ``` ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock + + def time_alive(self, clock: datetime.datetime = None) -> int: + """ + Get the time since the resource was created (in seconds). + """ + clock = clock or datetime.datetime.now() + return (clock - self.creation_date).total_seconds() + +class StaleCleaner: + """ + StaleCleaner is a class that is used to detect stale resources in the Google Cloud Platform. + It is used to detect resources that are no longer needed and delete them. + + Methods: + + refresh(): + Load all data with the current datetime + + stale_resources(): + Dict of _stale_ resources that should be deleted + + fresh_resources(): + Dict of resources that are NOT stale + + def delete_stale(dry_run=True): + Delete all stale resources (dry_run by default) + """ + + # Create a new StaleCleaner object + def __init__(self, project_id: str, resource_type: str, bucket_name: str, + prefixes: list = None, time_threshold: int = DEFAULT_TIME_THRESHOLD) -> None: + self.project_id = project_id + self.project_path = f"{PROJECT_PATH_PREFIX}{project_id}" + self.resource_type = resource_type + self.bucket_name = bucket_name + self.prefixes = prefixes or [] + self.time_threshold = time_threshold + + def __delete_resource(self, resource_name: str) -> None: + """ + Different for each resource type. Delete the resource from GCP. + """ + pass + + def __active_resources(self, clock: datetime.datetime = None) -> dict: + """ + Different for each resource type. Get the active resources from GCP as a dictionary. + The dictionary is a dict of GoogleCloudResource objects. + The key is the resource name and the value is the GoogleCloudResource object. + The clock is for testing purposes. It gives the resources a specific creation date. + """ + pass + + def __write_resources(self, resources: dict) -> None: + """ + Write existing resources to the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + resource_dict = {k: v.to_dict() for k, v in resources.items()} + blob_json = json.dumps(resource_dict, indent=4) + + blob.upload_from_string(blob_json, content_type="application/json") + print(f"Resources written to {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json") + + def __stored_resources(self) -> dict: + """ + Get the stored resources from the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + if not blob.exists(): + print(f"Blob {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json does not exist.") + return {} + + blob_string = blob.download_as_text() + blob_dict = json.loads(blob_string) + + # Convert the dictionary to a dict of GoogleCloudResource objects + resources = {} + for k, v in blob_dict.items(): + resources[k] = GoogleCloudResource( + resource_name=v["resource_name"], + creation_date=datetime.datetime.fromisoformat(v["creation_date"]), + last_update_date=datetime.datetime.fromisoformat(v["last_update_date"]) + ) + return resources + + def refresh(self, clock: datetime.datetime = None) -> None: + """ + Refresh the resources time and save them to the google bucket. + The process goes through the following steps: + 1. Get the resources that exist in the GCP + 2. Get the resources that were working the last time this script was run + 3. Delete from the stored resources the ones that are no longer alive + 4. Add the new resources to the working dictionary + 5. Save the working resources to the google bucket + """ + clock = clock or datetime.datetime.now() + stored_resources = self.__stored_resources() + active_resources = self.__active_resources(clock=clock) + + for k, v in list(stored_resources.items()): + if k not in active_resources: + print(f"Resource {k} is no longer alive. Deleting it from the stored resources.") + del stored_resources[k] + else: + stored_resources[k].update(clock=clock) + + for k, v in active_resources.items(): + if k not in stored_resources: + stored_resources[k] = v + + self.__write_resources(stored_resources) + + def stale_resources(self, clock: datetime.datetime = None) -> dict: + """ + Get the stale resources that should be deleted. + The process goes through the following steps: + 1. Get the stored resources + 2. Compare the time since the creation date of the resource with the time threshold + 3. If the time since the creation date is greater than the time threshold, add it to the stale resources + """ + clock = clock or datetime.datetime.now() + stored_resources = self.__stored_resources() + stale_resources = {} + + for k, v in stored_resources.items(): + if v.time_alive(clock=clock) > self.time_threshold: + stale_resources[k] = v + + return stale_resources + + def fresh_resources(self, clock: datetime.datetime = None) -> dict: Review Comment: same change as in stale_resources ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock + + def time_alive(self, clock: datetime.datetime = None) -> int: + """ + Get the time since the resource was created (in seconds). + """ + clock = clock or datetime.datetime.now() + return (clock - self.creation_date).total_seconds() + +class StaleCleaner: + """ + StaleCleaner is a class that is used to detect stale resources in the Google Cloud Platform. + It is used to detect resources that are no longer needed and delete them. + + Methods: + + refresh(): + Load all data with the current datetime + + stale_resources(): + Dict of _stale_ resources that should be deleted + + fresh_resources(): + Dict of resources that are NOT stale + + def delete_stale(dry_run=True): + Delete all stale resources (dry_run by default) + """ + + # Create a new StaleCleaner object + def __init__(self, project_id: str, resource_type: str, bucket_name: str, + prefixes: list = None, time_threshold: int = DEFAULT_TIME_THRESHOLD) -> None: + self.project_id = project_id + self.project_path = f"{PROJECT_PATH_PREFIX}{project_id}" + self.resource_type = resource_type + self.bucket_name = bucket_name + self.prefixes = prefixes or [] + self.time_threshold = time_threshold + + def __delete_resource(self, resource_name: str) -> None: + """ + Different for each resource type. Delete the resource from GCP. + """ + pass + + def __active_resources(self, clock: datetime.datetime = None) -> dict: + """ + Different for each resource type. Get the active resources from GCP as a dictionary. + The dictionary is a dict of GoogleCloudResource objects. + The key is the resource name and the value is the GoogleCloudResource object. + The clock is for testing purposes. It gives the resources a specific creation date. + """ + pass + + def __write_resources(self, resources: dict) -> None: + """ + Write existing resources to the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + resource_dict = {k: v.to_dict() for k, v in resources.items()} + blob_json = json.dumps(resource_dict, indent=4) + + blob.upload_from_string(blob_json, content_type="application/json") + print(f"Resources written to {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json") + + def __stored_resources(self) -> dict: + """ + Get the stored resources from the google bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(self.bucket_name) + blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json") + + if not blob.exists(): + print(f"Blob {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json does not exist.") + return {} + + blob_string = blob.download_as_text() + blob_dict = json.loads(blob_string) + + # Convert the dictionary to a dict of GoogleCloudResource objects + resources = {} + for k, v in blob_dict.items(): + resources[k] = GoogleCloudResource( + resource_name=v["resource_name"], + creation_date=datetime.datetime.fromisoformat(v["creation_date"]), + last_update_date=datetime.datetime.fromisoformat(v["last_update_date"]) + ) + return resources + + def refresh(self, clock: datetime.datetime = None) -> None: Review Comment: same change as in stale_resources ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock + + def time_alive(self, clock: datetime.datetime = None) -> int: + """ + Get the time since the resource was created (in seconds). + """ + clock = clock or datetime.datetime.now() + return (clock - self.creation_date).total_seconds() + +class StaleCleaner: + """ + StaleCleaner is a class that is used to detect stale resources in the Google Cloud Platform. + It is used to detect resources that are no longer needed and delete them. + + Methods: + + refresh(): + Load all data with the current datetime + + stale_resources(): + Dict of _stale_ resources that should be deleted + + fresh_resources(): + Dict of resources that are NOT stale + + def delete_stale(dry_run=True): + Delete all stale resources (dry_run by default) + """ + + # Create a new StaleCleaner object + def __init__(self, project_id: str, resource_type: str, bucket_name: str, + prefixes: list = None, time_threshold: int = DEFAULT_TIME_THRESHOLD) -> None: + self.project_id = project_id + self.project_path = f"{PROJECT_PATH_PREFIX}{project_id}" + self.resource_type = resource_type + self.bucket_name = bucket_name + self.prefixes = prefixes or [] + self.time_threshold = time_threshold + + def __delete_resource(self, resource_name: str) -> None: Review Comment: ```suggestion def _delete_resource(self, resource_name: str) -> None: ``` single prefixed underscore is enough for these functions ########## .test-infra/tools/stale_cleaner.py: ########## @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Deletes stale and old resources from the Google Cloud Platform. +# In order to detect them, save the current resources state and compare it +# with a previous one. They are stored in a bucket in the Google Cloud Storage. +# +import datetime +import json +from google.cloud import pubsub_v1, storage + +# Resource types +PUBSUB_TOPIC_RESOURCE = "pubsub_topic" + +# Storage constants +STORAGE_PREFIX = "stale_cleaner/" + +# Project constants +PROJECT_PATH_PREFIX = "projects/" # Prefix for the project path in GCP *This is not the project id* + +# Time constants (in seconds) +DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day +DEFAULT_TIME_THRESHOLD = 3600 # 1 hour + +# Default values for testing +DEFAULT_PROJECT_ID = "apache-beam-testing" +DEFAULT_BUCKET_NAME = "apache-beam-testing-pabloem" + +class GoogleCloudResource: + """ + GoogleCloudResource is a class used to store the GCP resource information of name and type + including the creation date and last check date. + """ + def __init__(self, resource_name: str, creation_date: datetime.datetime = None, + last_update_date: datetime.datetime = None, clock: datetime.datetime = None) -> None: + self.resource_name = resource_name + clock = clock or datetime.datetime.now() # Get current time or use another one for testing + self.creation_date = creation_date or clock # Date of first appearance of the resource + self.last_update_date = last_update_date or clock # Date of last existence check + + def __str__(self) -> str: + return f"{self.resource_name}" + + def to_dict(self) -> dict: + """ + Convert the resource to a dictionary. + """ + return { + "resource_name": self.resource_name, + "creation_date": self.creation_date.isoformat(), + "last_update_date": self.last_update_date.isoformat() + } + + def update(self, clock: datetime.datetime = None) -> None: + self.last_update_date = datetime.datetime.now() if clock is None else clock + + def time_alive(self, clock: datetime.datetime = None) -> int: + """ + Get the time since the resource was created (in seconds). + """ + clock = clock or datetime.datetime.now() + return (clock - self.creation_date).total_seconds() + +class StaleCleaner: + """ + StaleCleaner is a class that is used to detect stale resources in the Google Cloud Platform. + It is used to detect resources that are no longer needed and delete them. + + Methods: + + refresh(): + Load all data with the current datetime + + stale_resources(): + Dict of _stale_ resources that should be deleted + + fresh_resources(): + Dict of resources that are NOT stale + + def delete_stale(dry_run=True): + Delete all stale resources (dry_run by default) + """ + + # Create a new StaleCleaner object + def __init__(self, project_id: str, resource_type: str, bucket_name: str, + prefixes: list = None, time_threshold: int = DEFAULT_TIME_THRESHOLD) -> None: + self.project_id = project_id + self.project_path = f"{PROJECT_PATH_PREFIX}{project_id}" + self.resource_type = resource_type + self.bucket_name = bucket_name + self.prefixes = prefixes or [] + self.time_threshold = time_threshold + + def __delete_resource(self, resource_name: str) -> None: + """ + Different for each resource type. Delete the resource from GCP. + """ + pass + + def __active_resources(self, clock: datetime.datetime = None) -> dict: Review Comment: ```suggestion def _active_resources(self, clock: datetime.datetime = None) -> dict: ``` single prefixed underscore is enough for these functions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
