ksobrenat32 commented on code in PR #35524:
URL: https://github.com/apache/beam/pull/35524#discussion_r2261005131


##########
infra/keys/secret_manager.py:
##########
@@ -0,0 +1,651 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import google_crc32c
+import logging
+import time
+from datetime import datetime, timezone, timedelta
+from google.cloud import secretmanager
+from typing import List, Union
+
+class SecretManagerLoggerAdapter(logging.LoggerAdapter):
+    """Logger adapter that adds a prefix to all log messages."""
+    
+    def process(self, msg, kwargs):
+        return f"[SecretManager] {msg}", kwargs
+
+class SecretManager:
+    """Service to manage GCP API keys rotation."""
+
+    project_id: str # The GCP project ID where secrets are managed
+    rotation_interval: int # The interval (in days) at which to rotate secrets
+    max_versions_to_keep: int # The maximum number of secret versions to keep
+    max_retries: int # The maximum number of retries for API calls
+    client: secretmanager.SecretManagerServiceClient # GCP Secret Manager 
client
+    logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging 
messages
+    secrets_ids: List[str] # List of secret IDs managed by this service
+
+    def __init__(self, project_id: str, logger: logging.Logger, 
rotation_interval: int = 30, max_versions_to_keep: int = 5, max_retries: int = 
3) -> None:
+        self.project_id = project_id
+        self.rotation_interval = rotation_interval
+        self.max_versions_to_keep = max_versions_to_keep
+        self.max_retries = max_retries
+        self.client = secretmanager.SecretManagerServiceClient()
+        self.logger = SecretManagerLoggerAdapter(logger, {})
+        self.logger.info(f"Initialized SecretManager for project 
'{self.project_id}'")
+        self.secrets_ids = self._get_secrets_ids()
+
+    def _get_secrets_ids(self) -> List[str]:
+        """
+        Retrieves the list of secrets from the Secret Manager and populates 
the `secrets_ids` list.
+        This method filters secrets based on a specific label indicating they 
were created by this service.
+
+        Returns:
+            List[str]: A list of secret IDs that were created by this service.
+        """
+        self.logger.debug(f"Retrieving secrets with the label from project 
'{self.project_id}'")
+        secrets_ids = []
+
+        try:
+            for secret in self.client.list_secrets(request={"parent": 
f"projects/{self.project_id}"}):
+                secret_id = secret.name.split("/")[-1]
+                if "created_by" in secret.labels and 
secret.labels["created_by"] == "secretmanager-service":
+                    secrets_ids.append(secret_id)
+        except Exception as e:
+            self.logger.error(f"Error retrieving secrets: {e}")
+
+        self.logger.debug(f"Found {len(secrets_ids)} secrets created by 
secretmanager-service in project '{self.project_id}'")
+        return secrets_ids
+
+    def _secret_exists(self, secret_id: str) -> bool:
+        """
+        Checks if a secret with the given ID exists in the Secret Manager GCP.
+
+        Args:
+            secret_id (str): The ID of the secret to check.
+        Returns:
+            bool: True if the secret exists, False otherwise.
+        """
+        self.logger.debug(f"Checking if secret '{secret_id}' exists")
+        try:
+            name = self.client.secret_path(self.project_id, secret_id)
+            secret = self.client.get_secret(request={"name": name})
+
+            if "created_by" in secret.labels and secret.labels["created_by"] 
== "secretmanager-service":
+                self.logger.debug(f"Secret '{secret_id}' exists and is managed 
by secretmanager-service")
+                return True
+            else:
+                self.logger.debug(f"Secret '{secret_id}' exists but is not 
managed by secretmanager-service")
+                return False
+        except Exception as e:
+            self.logger.debug(f"Secret '{secret_id}' does not exist: {e}")
+            return False
+
+    def create_secret(self, secret_id: str) -> str:
+        """
+        Create a new secret with the given name. A secret is a logical wrapper
+        around a collection of secret versions. Secret versions hold the actual
+        secret material. This method creates a new secret with automatic 
replication
+        and labels for tracking.
+
+        Args:
+            secret_id (str): The ID to assign to the new secret. This ID must 
be unique within the project.
+        Returns:
+            str: The secret path of the newly created secret.
+        """
+        if secret_id in self.secrets_ids:
+            self.logger.debug(f"Secret '{secret_id}' already exists, returning 
existing secret path")
+            name = self.client.secret_path(self.project_id, secret_id)
+            return name
+
+        self.logger.info(f"Creating new secret '{secret_id}' with rotation 
interval of {self.rotation_interval} days")
+        response = self.client.create_secret(
+            request={
+                "parent": f"projects/{self.project_id}",
+                "secret_id": f"{secret_id}",
+                "secret": {
+                    "replication": {
+                        "automatic": {}
+                    },
+                    "labels": {
+                        "created_by": "secretmanager-service",
+                        "created_at": 
datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"),
+                        "rotation_interval_days": str(self.rotation_interval),
+                        "last_version_created_at": 
datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"),
+                    }
+                }
+            }
+        )
+
+        # Wait for the secret to be created
+        self.logger.debug(f"Waiting for secret '{secret_id}' to be created")
+        delay = 1
+        for _ in range(self.max_retries):
+            if self._secret_exists(secret_id):
+                self.logger.debug(f"Secret '{secret_id}' is now available")
+                self.secrets_ids.append(secret_id)
+                break
+            self.logger.debug(f"Secret '{secret_id}' not found, retrying in 
{delay} seconds")
+            time.sleep(delay)
+            delay *= 2
+        else:
+            error_msg = f"Failed to create secret '{secret_id}' after 
{self.max_retries} retries."

Review Comment:
   sounds great



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to