pabloem commented on code in PR #35524:
URL: https://github.com/apache/beam/pull/35524#discussion_r2255833484


##########
infra/keys/config.yaml:
##########
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is the configuration file for the secrets rotation service.
+# It defines the parameters for the secrets rotation process.
+
+# GENERAL CONFIGURATION
+
+# The project ID where the secrets rotation service will run
+project_id: "testing-me-460223"
+
+# Secret service configuration
+
+# Default secret rotation interval in days
+rotation_interval: 30
+
+# The maximum number of secrets versions to keep in the secret manager
+max_versions_to_keep: 5
+
+# LOGGING
+
+# The secret rotation logs will be stored in a GCP bucket.
+bucket_name: "testing-me-460223-tfstate"

Review Comment:
   TODO: Change this before merge



##########
infra/keys/secret_manager.py:
##########
@@ -0,0 +1,651 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import google_crc32c
+import logging
+import time
+from datetime import datetime, timezone, timedelta
+from google.cloud import secretmanager
+from typing import List, Union
+
+class SecretManagerLoggerAdapter(logging.LoggerAdapter):
+    """Logger adapter that adds a prefix to all log messages."""
+    
+    def process(self, msg, kwargs):
+        return f"[SecretManager] {msg}", kwargs
+
+class SecretManager:
+    """Service to manage GCP API keys rotation."""
+
+    project_id: str # The GCP project ID where secrets are managed
+    rotation_interval: int # The interval (in days) at which to rotate secrets
+    max_versions_to_keep: int # The maximum number of secret versions to keep
+    max_retries: int # The maximum number of retries for API calls
+    client: secretmanager.SecretManagerServiceClient # GCP Secret Manager 
client
+    logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging 
messages
+    secrets_ids: List[str] # List of secret IDs managed by this service
+
+    def __init__(self, project_id: str, logger: logging.Logger, 
rotation_interval: int = 30, max_versions_to_keep: int = 5, max_retries: int = 
3) -> None:
+        self.project_id = project_id
+        self.rotation_interval = rotation_interval
+        self.max_versions_to_keep = max_versions_to_keep
+        self.max_retries = max_retries
+        self.client = secretmanager.SecretManagerServiceClient()
+        self.logger = SecretManagerLoggerAdapter(logger, {})
+        self.logger.info(f"Initialized SecretManager for project 
'{self.project_id}'")
+        self.secrets_ids = self._get_secrets_ids()
+
+    def _get_secrets_ids(self) -> List[str]:
+        """
+        Retrieves the list of secrets from the Secret Manager and populates 
the `secrets_ids` list.
+        This method filters secrets based on a specific label indicating they 
were created by this service.
+
+        Returns:
+            List[str]: A list of secret IDs that were created by this service.
+        """
+        self.logger.debug(f"Retrieving secrets with the label from project 
'{self.project_id}'")
+        secrets_ids = []
+
+        try:
+            for secret in self.client.list_secrets(request={"parent": 
f"projects/{self.project_id}"}):
+                secret_id = secret.name.split("/")[-1]
+                if "created_by" in secret.labels and 
secret.labels["created_by"] == "secretmanager-service":

Review Comment:
   Let's make "secretmanager-service" a constant - and let's change it to 
"beam-infra-secret-manager" (because "secretmanager-service" could be 
misunderstood as the GCP Secret Manager itself)



##########
infra/keys/secret_manager.py:
##########
@@ -0,0 +1,651 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import google_crc32c
+import logging
+import time
+from datetime import datetime, timezone, timedelta
+from google.cloud import secretmanager
+from typing import List, Union
+
+class SecretManagerLoggerAdapter(logging.LoggerAdapter):
+    """Logger adapter that adds a prefix to all log messages."""
+    
+    def process(self, msg, kwargs):
+        return f"[SecretManager] {msg}", kwargs
+
+class SecretManager:
+    """Service to manage GCP API keys rotation."""
+
+    project_id: str # The GCP project ID where secrets are managed
+    rotation_interval: int # The interval (in days) at which to rotate secrets
+    max_versions_to_keep: int # The maximum number of secret versions to keep
+    max_retries: int # The maximum number of retries for API calls
+    client: secretmanager.SecretManagerServiceClient # GCP Secret Manager 
client
+    logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging 
messages
+    secrets_ids: List[str] # List of secret IDs managed by this service
+
+    def __init__(self, project_id: str, logger: logging.Logger, 
rotation_interval: int = 30, max_versions_to_keep: int = 5, max_retries: int = 
3) -> None:
+        self.project_id = project_id
+        self.rotation_interval = rotation_interval
+        self.max_versions_to_keep = max_versions_to_keep
+        self.max_retries = max_retries
+        self.client = secretmanager.SecretManagerServiceClient()
+        self.logger = SecretManagerLoggerAdapter(logger, {})
+        self.logger.info(f"Initialized SecretManager for project 
'{self.project_id}'")
+        self.secrets_ids = self._get_secrets_ids()
+
+    def _get_secrets_ids(self) -> List[str]:
+        """
+        Retrieves the list of secrets from the Secret Manager and populates 
the `secrets_ids` list.
+        This method filters secrets based on a specific label indicating they 
were created by this service.
+
+        Returns:
+            List[str]: A list of secret IDs that were created by this service.
+        """
+        self.logger.debug(f"Retrieving secrets with the label from project 
'{self.project_id}'")
+        secrets_ids = []
+
+        try:
+            for secret in self.client.list_secrets(request={"parent": 
f"projects/{self.project_id}"}):
+                secret_id = secret.name.split("/")[-1]
+                if "created_by" in secret.labels and 
secret.labels["created_by"] == "secretmanager-service":
+                    secrets_ids.append(secret_id)
+        except Exception as e:
+            self.logger.error(f"Error retrieving secrets: {e}")
+
+        self.logger.debug(f"Found {len(secrets_ids)} secrets created by 
secretmanager-service in project '{self.project_id}'")
+        return secrets_ids
+
+    def _secret_exists(self, secret_id: str) -> bool:
+        """
+        Checks if a secret with the given ID exists in the Secret Manager GCP.
+
+        Args:
+            secret_id (str): The ID of the secret to check.
+        Returns:
+            bool: True if the secret exists, False otherwise.
+        """
+        self.logger.debug(f"Checking if secret '{secret_id}' exists")
+        try:
+            name = self.client.secret_path(self.project_id, secret_id)
+            secret = self.client.get_secret(request={"name": name})
+
+            if "created_by" in secret.labels and secret.labels["created_by"] 
== "secretmanager-service":
+                self.logger.debug(f"Secret '{secret_id}' exists and is managed 
by secretmanager-service")
+                return True
+            else:
+                self.logger.debug(f"Secret '{secret_id}' exists but is not 
managed by secretmanager-service")
+                return False
+        except Exception as e:
+            self.logger.debug(f"Secret '{secret_id}' does not exist: {e}")
+            return False
+
+    def create_secret(self, secret_id: str) -> str:
+        """
+        Create a new secret with the given name. A secret is a logical wrapper
+        around a collection of secret versions. Secret versions hold the actual
+        secret material. This method creates a new secret with automatic 
replication
+        and labels for tracking.
+
+        Args:
+            secret_id (str): The ID to assign to the new secret. This ID must 
be unique within the project.
+        Returns:
+            str: The secret path of the newly created secret.
+        """
+        if secret_id in self.secrets_ids:
+            self.logger.debug(f"Secret '{secret_id}' already exists, returning 
existing secret path")
+            name = self.client.secret_path(self.project_id, secret_id)
+            return name
+
+        self.logger.info(f"Creating new secret '{secret_id}' with rotation 
interval of {self.rotation_interval} days")
+        response = self.client.create_secret(
+            request={
+                "parent": f"projects/{self.project_id}",
+                "secret_id": f"{secret_id}",
+                "secret": {
+                    "replication": {
+                        "automatic": {}
+                    },
+                    "labels": {
+                        "created_by": "secretmanager-service",
+                        "created_at": 
datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"),
+                        "rotation_interval_days": str(self.rotation_interval),
+                        "last_version_created_at": 
datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"),
+                    }
+                }
+            }
+        )
+
+        # Wait for the secret to be created
+        self.logger.debug(f"Waiting for secret '{secret_id}' to be created")
+        delay = 1
+        for _ in range(self.max_retries):
+            if self._secret_exists(secret_id):
+                self.logger.debug(f"Secret '{secret_id}' is now available")
+                self.secrets_ids.append(secret_id)
+                break
+            self.logger.debug(f"Secret '{secret_id}' not found, retrying in 
{delay} seconds")
+            time.sleep(delay)
+            delay *= 2
+        else:
+            error_msg = f"Failed to create secret '{secret_id}' after 
{self.max_retries} retries."

Review Comment:
   ```suggestion
               error_msg = f"Could not verify secret '{secret_id}' was created 
after {self.max_retries} retries."
   ```
   
   since we are not retrying creation, but rather the existence check



##########
infra/keys/service_account.py:
##########
@@ -0,0 +1,485 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import json
+import time
+from typing import List,Optional
+from google.cloud import iam_admin_v1
+from google.cloud.iam_admin_v1 import types
+from google.oauth2 import service_account
+from google.auth.transport.requests import Request
+from google.api_core import exceptions
+
+class ServiceAccountManagerLoggerAdapter(logging.LoggerAdapter):
+    """Logger adapter that adds a prefix to all log messages."""
+    
+    def process(self, msg, kwargs):
+        return f"[ServiceAccountManager] {msg}", kwargs
+
+class ServiceAccountManager:
+    def __init__(self, project_id: str, logger: logging.Logger, max_retries: 
int = 3) -> None:
+        self.project_id = project_id
+        self.client = iam_admin_v1.IAMClient()
+        self.logger = ServiceAccountManagerLoggerAdapter(logger, {})
+        self.max_retries = max_retries
+        self.logger.info(f"Initialized ServiceAccountManager for project: 
{self.project_id}")
+
+    def _normalize_account_email(self, account_id: str) -> str:
+        """
+        Normalizes the account identifier to a full email format.
+        
+        Args:
+            account_id (str): The unique identifier or email of the service 
account.
+            
+        Returns:
+            str: The full service account email address.
+        """
+        # Handle both account ID and full email formats
+        if "@" in account_id and 
account_id.endswith(".iam.gserviceaccount.com"):
+            # account_id is already a full email
+            return account_id
+        else:
+            # account_id is just the account name
+            return f"{account_id}@{self.project_id}.iam.gserviceaccount.com"
+
+    def _get_service_accounts(self) -> List[iam_admin_v1.ServiceAccount]:
+        """
+        Retrieves all service accounts in the specified project.
+
+        Returns:
+            List[iam_admin_v1.ServiceAccount]: A list of service account 
objects.
+        """
+        request = types.ListServiceAccountsRequest()
+        request.name = f"projects/{self.project_id}"
+
+        accounts = self.client.list_service_accounts(request=request)
+        self.logger.debug(f"Listed service accounts: {[account.email for 
account in accounts.accounts]}")
+        return list(accounts.accounts)
+    
+    def _service_account_exists(self, account_id: str) -> bool:
+        """
+        Checks if a service account with the given account_id exists in the 
project.
+
+        Args:
+            account_id (str): The unique identifier or email of the service 
account.
+
+        Returns:
+            bool: True if the service account exists, False otherwise.
+        """
+        try:
+            self.get_service_account(account_id)
+            return True
+        except exceptions.NotFound:
+            return False
+        
+    def _service_account_is_enabled(self, account_id: str) -> bool:
+        """
+        Checks if a service account is enabled.
+
+        Args:
+            account_id (str): The unique identifier or email of the service 
account.
+
+        Returns:
+            bool: True if the service account is enabled, False otherwise.
+        """
+        try:
+            service_account = self.get_service_account(account_id)
+            return not service_account.disabled
+        except exceptions.NotFound:
+            self.logger.error(f"Service account {account_id} not found")
+            return False
+
+    def create_service_account(self, account_id: str, display_name: 
Optional[str] = None) -> types.ServiceAccount:
+        """
+        Creates a service account in the specified project.
+        If the service account already exists, returns the existing account 
(idempotent operation).
+
+        Args:
+            account_id (str): The unique identifier for the service account.
+            display_name (Optional[str]): A human-readable name for the 
service account.
+        Returns:
+            types.ServiceAccount: The created or existing service account 
object.
+        """
+        request = types.CreateServiceAccountRequest()
+        request.account_id = account_id
+        request.name = f"projects/{self.project_id}"
+
+        service_account = types.ServiceAccount()
+        service_account.display_name = display_name or account_id
+        request.service_account = service_account
+
+        try:
+            account = self.client.create_service_account(request=request)
+
+            # Wait for the service account to be created
+            delay = 1
+            for _ in range(self.max_retries):
+                if self._service_account_exists(account_id):
+                    break
+                time.sleep(delay)
+                delay *= 2
+            else:
+                self.logger.error(f"Service account {account_id} creation 
timed out after {self.max_retries} retries.")
+                raise exceptions.DeadlineExceeded(f"Service account 
{account_id} creation timed out.")
+
+            self.logger.info(f"Created service account: {account.email}")
+            return account
+        except exceptions.Conflict:
+            existing_account = self.get_service_account(account_id)
+            self.logger.info(f"Service account already exists: 
{existing_account.email}")
+            return existing_account
+    
+    def get_service_account(self, account_id: str) -> types.ServiceAccount:
+        """
+        Retrieves a service account by its unique identifier or email.
+
+        Args:
+            account_id (str): The unique identifier or email of the service 
account.
+        
+        Returns:
+            types.ServiceAccount: The service account object.
+        """
+        service_account_email = self._normalize_account_email(account_id)
+
+        request = types.GetServiceAccountRequest()
+        request.name = 
f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
+
+        try:
+            service_account = self.client.get_service_account(request=request)
+            self.logger.info(f"Retrieved service account: 
{service_account.email}")
+            return service_account
+        except exceptions.NotFound:
+            self.logger.error(f"Service account {account_id} not found")
+            raise
+    
+    def enable_service_account(self, account_id: str) -> None:
+        """
+        Enables a service account in the specified project.
+
+        Args:
+            account_id (str): The unique identifier or email of the service 
account to enable.
+        """
+        service_account_email = self._normalize_account_email(account_id)
+        request = types.EnableServiceAccountRequest()
+        request.name = 
f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
+
+        self.client.enable_service_account(request=request)
+
+        # Wait for the service account to be enabled
+        delay = 1
+        for _ in range(self.max_retries):
+            if self._service_account_is_enabled(account_id):
+                break
+            time.sleep(delay)
+            delay *= 2
+        else:
+            self.logger.error(f"Service account {account_id} enabling timed 
out after {self.max_retries} retries.")
+            raise exceptions.DeadlineExceeded(f"Service account {account_id} 
enabling timed out.")
+        
+        self.logger.info(f"Enabled service account: {account_id}")
+
+    def disable_service_account(self, account_id: str) -> None:
+        """
+        Disables a service account in the specified project.
+
+        Args:
+            account_id (str): The unique identifier or email of the service 
account to disable.
+        """
+        service_account_email = self._normalize_account_email(account_id)
+        request = types.DisableServiceAccountRequest()
+        request.name = 
f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
+
+        self.client.disable_service_account(request=request)
+
+        # Wait for the service account to be disabled
+        delay = 1
+        for _ in range(self.max_retries):
+            if not self._service_account_is_enabled(account_id):
+                break
+            time.sleep(delay)
+            delay *= 2
+        else:
+            self.logger.error(f"Service account {account_id} disabling timed 
out after {self.max_retries} retries.")
+            raise exceptions.DeadlineExceeded(f"Service account {account_id} 
disabling timed out.")
+        
+        self.logger.info(f"Disabled service account: {account_id}")
+
+    def delete_service_account(self, account_id: str) -> None:
+        """
+        Deletes a service account in the specified project.
+
+        Args:
+            account_id (str): The unique identifier or email of the service 
account to delete.
+        """
+        service_account_email = self._normalize_account_email(account_id)
+        request = types.DeleteServiceAccountRequest()
+        request.name = 
f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
+
+        self.client.delete_service_account(request=request)
+
+        # Wait for the service account to be deleted
+        delay = 1
+        for _ in range(self.max_retries):
+            if not self._service_account_exists(account_id):
+                break
+            time.sleep(delay)
+            delay *= 2
+        else:
+            self.logger.error(f"Service account {account_id} deletion timed 
out after {self.max_retries} retries.")
+            raise exceptions.DeadlineExceeded(f"Service account {account_id} 
deletion timed out.")
+
+        self.logger.info(f"Deleted service account: {account_id}")
+
+    def _get_service_account_keys(self, account_id: str) -> 
List[iam_admin_v1.ServiceAccountKey]:
+        """
+        Retrieves all keys for the specified service account.
+
+        Args:
+            account_id (str): The unique identifier or email of the service 
account.
+        
+        Returns:
+            List[iam_admin_v1.ServiceAccountKey]: A list of service account 
key objects.
+        """
+        service_account_email = self._normalize_account_email(account_id)
+        request = types.ListServiceAccountKeysRequest()
+        request.name = 
f"projects/{self.project_id}/serviceAccounts/{service_account_email}"
+
+        response = self.client.list_service_account_keys(request=request)
+        self.logger.debug(f"Listed keys for service account: {account_id}")
+        return list(response.keys)
+
+    def _service_account_key_exists(self, account_id: str, key_id: str) -> 
bool:
+        """
+        Checks if a service account key exists for the specified service 
account.
+
+        Args:
+            account_id (str): The unique identifier or email of the service 
account.
+            key_id (str): The ID of the service account key to check.
+        
+        Returns:
+            bool: True if the key exists, False otherwise.
+        """
+        keys = self._get_service_account_keys(account_id)
+        return any(key.name.endswith(key_id) for key in keys)

Review Comment:
   why checking `endswith` instead of full equality?



##########
infra/keys/gcp_logger.py:
##########
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import io
+import logging
+from google.cloud import storage
+from typing import List
+from datetime import datetime
+
+class GCSLogHandler(logging.Handler):

Review Comment:
   this is useful - just make sure that the logs are also printed to stderr or 
stdout - because the script will mainly run in GHA



##########
infra/keys/config.yaml:
##########
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is the configuration file for the secrets rotation service.
+# It defines the parameters for the secrets rotation process.
+
+# GENERAL CONFIGURATION
+
+# The project ID where the secrets rotation service will run
+project_id: "testing-me-460223"
+
+# Secret service configuration
+
+# Default secret rotation interval in days
+rotation_interval: 30
+
+# The maximum number of secrets versions to keep in the secret manager
+max_versions_to_keep: 5

Review Comment:
   let's make this 2, since we only care about the current and the previous 
key. All older keys should not work at all ever.



##########
infra/keys/secret_manager.py:
##########
@@ -0,0 +1,651 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import google_crc32c
+import logging
+import time
+from datetime import datetime, timezone, timedelta
+from google.cloud import secretmanager
+from typing import List, Union
+
+class SecretManagerLoggerAdapter(logging.LoggerAdapter):
+    """Logger adapter that adds a prefix to all log messages."""
+    
+    def process(self, msg, kwargs):
+        return f"[SecretManager] {msg}", kwargs
+
+class SecretManager:
+    """Service to manage GCP API keys rotation."""
+
+    project_id: str # The GCP project ID where secrets are managed
+    rotation_interval: int # The interval (in days) at which to rotate secrets
+    max_versions_to_keep: int # The maximum number of secret versions to keep
+    max_retries: int # The maximum number of retries for API calls
+    client: secretmanager.SecretManagerServiceClient # GCP Secret Manager 
client
+    logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging 
messages
+    secrets_ids: List[str] # List of secret IDs managed by this service
+
+    def __init__(self, project_id: str, logger: logging.Logger, 
rotation_interval: int = 30, max_versions_to_keep: int = 5, max_retries: int = 
3) -> None:
+        self.project_id = project_id
+        self.rotation_interval = rotation_interval
+        self.max_versions_to_keep = max_versions_to_keep
+        self.max_retries = max_retries
+        self.client = secretmanager.SecretManagerServiceClient()
+        self.logger = SecretManagerLoggerAdapter(logger, {})
+        self.logger.info(f"Initialized SecretManager for project 
'{self.project_id}'")
+        self.secrets_ids = self._get_secrets_ids()
+
+    def _get_secrets_ids(self) -> List[str]:
+        """
+        Retrieves the list of secrets from the Secret Manager and populates 
the `secrets_ids` list.
+        This method filters secrets based on a specific label indicating they 
were created by this service.
+
+        Returns:
+            List[str]: A list of secret IDs that were created by this service.
+        """
+        self.logger.debug(f"Retrieving secrets with the label from project 
'{self.project_id}'")
+        secrets_ids = []
+
+        try:
+            for secret in self.client.list_secrets(request={"parent": 
f"projects/{self.project_id}"}):
+                secret_id = secret.name.split("/")[-1]
+                if "created_by" in secret.labels and 
secret.labels["created_by"] == "secretmanager-service":
+                    secrets_ids.append(secret_id)
+        except Exception as e:
+            self.logger.error(f"Error retrieving secrets: {e}")
+
+        self.logger.debug(f"Found {len(secrets_ids)} secrets created by 
secretmanager-service in project '{self.project_id}'")
+        return secrets_ids
+
+    def _secret_exists(self, secret_id: str) -> bool:
+        """
+        Checks if a secret with the given ID exists in the Secret Manager GCP.
+
+        Args:
+            secret_id (str): The ID of the secret to check.
+        Returns:
+            bool: True if the secret exists, False otherwise.
+        """
+        self.logger.debug(f"Checking if secret '{secret_id}' exists")
+        try:
+            name = self.client.secret_path(self.project_id, secret_id)
+            secret = self.client.get_secret(request={"name": name})
+
+            if "created_by" in secret.labels and secret.labels["created_by"] 
== "secretmanager-service":
+                self.logger.debug(f"Secret '{secret_id}' exists and is managed 
by secretmanager-service")
+                return True
+            else:
+                self.logger.debug(f"Secret '{secret_id}' exists but is not 
managed by secretmanager-service")
+                return False
+        except Exception as e:
+            self.logger.debug(f"Secret '{secret_id}' does not exist: {e}")
+            return False
+
+    def create_secret(self, secret_id: str) -> str:
+        """
+        Create a new secret with the given name. A secret is a logical wrapper
+        around a collection of secret versions. Secret versions hold the actual
+        secret material. This method creates a new secret with automatic 
replication
+        and labels for tracking.
+
+        Args:
+            secret_id (str): The ID to assign to the new secret. This ID must 
be unique within the project.
+        Returns:
+            str: The secret path of the newly created secret.
+        """
+        if secret_id in self.secrets_ids:
+            self.logger.debug(f"Secret '{secret_id}' already exists, returning 
existing secret path")
+            name = self.client.secret_path(self.project_id, secret_id)
+            return name
+
+        self.logger.info(f"Creating new secret '{secret_id}' with rotation 
interval of {self.rotation_interval} days")
+        response = self.client.create_secret(
+            request={
+                "parent": f"projects/{self.project_id}",
+                "secret_id": f"{secret_id}",
+                "secret": {
+                    "replication": {
+                        "automatic": {}
+                    },
+                    "labels": {
+                        "created_by": "secretmanager-service",
+                        "created_at": 
datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"),
+                        "rotation_interval_days": str(self.rotation_interval),

Review Comment:
   what happens after `rotation_interval_days` with a secret that has not been 
rotated? Nothing? Does GCP SM delete it or something like that?



##########
infra/keys/README.md:
##########
@@ -0,0 +1,93 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Service Account Management
+
+This module is used to manage Google Cloud service accounts, including 
creating, retrieving, enabling, and deleting service accounts and their keys. 
It uses the Google Cloud IAM API to perform these operations.
+
+## Features
+
+- Create service accounts and service account keys.
+- Store service account keys securely in Google Secret Manager.
+- Rotate service account keys regularly.
+- Enable and disable service accounts.
+
+## Usage
+
+We use the main.py script to manage service account keys. The script can be 
run with different commands to create, rotate, or retrieve service account keys.
+
+### Prerequisites
+
+- Google Cloud SDK installed and configured.
+- Appropriate permissions to manage service accounts and secrets in your 
Google Cloud project.
+- Required Python packages installed (see requirements.txt).
+
+### Configuration
+
+#### config.yaml
+
+This file contains configuration settings for the service account management, 
including project ID, key rotation settings, and logging configuration.
+
+#### keys.yaml
+
+All the service accounts are managed through a configuration file in YAML 
format, `keys.yaml`. This file contains the necessary information about each 
service account, including its ID, display name, and authorized users.
+
+```yaml
+service_accounts:
+  - account_id: my-service-account
+    display_name: My Service Account
+    authorized_users:
+      - email: us...@example.com
+      - email: us...@example.com
+```
+
+Where:
+
+- `account_id`: The unique identifier for the service account.

Review Comment:
   Add something like 
   > The full name of the service account will be 
`$account...@googleserviceaccounts.com` 
   
   or whatever is the right domain name.



##########
infra/keys/main.py:
##########
@@ -0,0 +1,476 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import traceback
+import yaml
+import logging
+import argparse
+import sys
+from typing import List, TypedDict
+# Importing custom modules
+from gcp_logger import GCSLogHandler, GCPLogger
+from secret_manager import SecretManager
+from service_account import ServiceAccountManager
+
+
+# --- Configuration ---
+CONFIG_FILE = 'config.yaml'
+KEYS_FILE = 'keys.yaml'
+
+class ConfigDict(TypedDict):
+    project_id: str
+    rotation_interval: int
+    max_versions_to_keep: int
+    bucket_name: str
+    log_file_prefix: str
+    logging_level: str
+
+class AuthorizedUser(TypedDict):
+    email: str
+
+class ServiceAccount(TypedDict):
+    account_id: str
+    display_name: str
+    authorized_users: List[AuthorizedUser]
+
+class ServiceAccountsConfig(TypedDict):
+    service_accounts: List[ServiceAccount]
+
+def load_config() -> ConfigDict:
+    """Loads the configuration from the YAML file."""
+    with open(CONFIG_FILE, 'r') as f:
+        config = yaml.safe_load(f)
+
+    if not config:
+        raise ValueError("Configuration file is empty or invalid.")
+    
+    required_keys = ['project_id', 'rotation_interval', 
'max_versions_to_keep', 'bucket_name', 'log_file_prefix']
+    for key in required_keys:

Review Comment:
   Nit: This could be done a little 'fancier' with sets. E.g.:
   
   ```
   required_keys = set([....])
   missing_keys = required_keys - config.keys()
   if missing_keys:
       raise ValueError(f"Configuration is missing the following keys: 
{missing_keys}")
   ```
   
   This way you tell the user every key that's missing right away instead of 
failing once for every new key. This change is optional though. It's only a 
smol enhancement.



##########
infra/keys/README.md:
##########
@@ -0,0 +1,93 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Service Account Management
+
+This module is used to manage Google Cloud service accounts, including 
creating, retrieving, enabling, and deleting service accounts and their keys. 
It uses the Google Cloud IAM API to perform these operations.
+
+## Features
+
+- Create service accounts and service account keys.
+- Store service account keys securely in Google Secret Manager.
+- Rotate service account keys regularly.
+- Enable and disable service accounts.
+
+## Usage
+
+We use the main.py script to manage service account keys. The script can be 
run with different commands to create, rotate, or retrieve service account keys.
+
+### Prerequisites
+
+- Google Cloud SDK installed and configured.
+- Appropriate permissions to manage service accounts and secrets in your 
Google Cloud project.
+- Required Python packages installed (see requirements.txt).
+
+### Configuration
+
+#### config.yaml
+
+This file contains configuration settings for the service account management, 
including project ID, key rotation settings, and logging configuration.
+
+#### keys.yaml
+
+All the service accounts are managed through a configuration file in YAML 
format, `keys.yaml`. This file contains the necessary information about each 
service account, including its ID, display name, and authorized users.
+
+```yaml
+service_accounts:
+  - account_id: my-service-account
+    display_name: My Service Account
+    authorized_users:
+      - email: us...@example.com
+      - email: us...@example.com
+```
+
+Where:
+
+- `account_id`: The unique identifier for the service account.
+- `display_name`: A human-readable name for the service account.
+- `authorized_users`: A list of users who will be granted access to the 
service account.
+
+The accounts defined in this file will be created if they do not already exist 
when running the script.

Review Comment:
   What happens if they already do exist? Will the whole thing fail? Or will 
things work 'as intended'? Depending on the answer, please update the doc line 
to explain that.



##########
infra/keys/secret_manager.py:
##########
@@ -0,0 +1,651 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import google_crc32c
+import logging
+import time
+from datetime import datetime, timezone, timedelta
+from google.cloud import secretmanager
+from typing import List, Union
+
+class SecretManagerLoggerAdapter(logging.LoggerAdapter):
+    """Logger adapter that adds a prefix to all log messages."""
+    
+    def process(self, msg, kwargs):
+        return f"[SecretManager] {msg}", kwargs
+
+class SecretManager:
+    """Service to manage GCP API keys rotation."""
+
+    project_id: str # The GCP project ID where secrets are managed
+    rotation_interval: int # The interval (in days) at which to rotate secrets
+    max_versions_to_keep: int # The maximum number of secret versions to keep
+    max_retries: int # The maximum number of retries for API calls
+    client: secretmanager.SecretManagerServiceClient # GCP Secret Manager 
client
+    logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging 
messages
+    secrets_ids: List[str] # List of secret IDs managed by this service

Review Comment:
   "secret_ids" ; ) inglish grammar



##########
infra/keys/main.py:
##########
@@ -0,0 +1,476 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import traceback
+import yaml
+import logging
+import argparse
+import sys
+from typing import List, TypedDict
+# Importing custom modules
+from gcp_logger import GCSLogHandler, GCPLogger
+from secret_manager import SecretManager
+from service_account import ServiceAccountManager
+
+
+# --- Configuration ---
+CONFIG_FILE = 'config.yaml'
+KEYS_FILE = 'keys.yaml'
+
+class ConfigDict(TypedDict):
+    project_id: str
+    rotation_interval: int
+    max_versions_to_keep: int
+    bucket_name: str
+    log_file_prefix: str
+    logging_level: str
+
+class AuthorizedUser(TypedDict):
+    email: str
+
+class ServiceAccount(TypedDict):
+    account_id: str
+    display_name: str
+    authorized_users: List[AuthorizedUser]
+
+class ServiceAccountsConfig(TypedDict):
+    service_accounts: List[ServiceAccount]
+
+def load_config() -> ConfigDict:
+    """Loads the configuration from the YAML file."""
+    with open(CONFIG_FILE, 'r') as f:
+        config = yaml.safe_load(f)
+
+    if not config:
+        raise ValueError("Configuration file is empty or invalid.")
+    
+    required_keys = ['project_id', 'rotation_interval', 
'max_versions_to_keep', 'bucket_name', 'log_file_prefix']
+    for key in required_keys:
+        if key not in config:
+            raise ValueError(f"Missing required configuration key: {key}")
+    
+    if not isinstance(config['rotation_interval'], int) or 
config['rotation_interval'] <= 0:
+        raise ValueError("Configuration 'rotation_interval' must be a positive 
integer.")
+    if not isinstance(config['max_versions_to_keep'], int) or 
config['max_versions_to_keep'] <= 0:
+        raise ValueError("Configuration 'max_versions_to_keep' must be a 
positive integer.")
+    if not isinstance(config['bucket_name'], str) or not 
config['bucket_name'].strip():
+        raise ValueError("Configuration 'bucket_name' must be a non-empty 
string.")
+    if not isinstance(config['log_file_prefix'], str) or not 
config['log_file_prefix'].strip():
+        raise ValueError("Configuration 'log_file_prefix' must be a non-empty 
string.")
+    if 'logging_level' in config:
+        if not isinstance(config['logging_level'], str) or 
config['logging_level'].strip() not in logging._nameToLevel:
+            raise ValueError("Configuration 'logging_level' must be one of: " 
+ ", ".join(logging._nameToLevel.keys()))
+    else:
+        config['logging_level'] = 'INFO'
+
+    return config
+
+def load_service_accounts_config() -> ServiceAccountsConfig:
+    """Loads the service accounts configuration from the YAML file."""
+    with open(KEYS_FILE, 'r') as f:
+        service_accounts_config = yaml.safe_load(f)
+
+    if not service_accounts_config or 'service_accounts' not in 
service_accounts_config:
+        raise ValueError("Service accounts configuration file is empty or 
invalid.")
+    
+    if not isinstance(service_accounts_config['service_accounts'], list):
+        raise ValueError("Service accounts configuration must be a list of 
service accounts.")
+    
+    for account in service_accounts_config['service_accounts']:
+        if 'account_id' not in account or 'display_name' not in account:
+            raise ValueError("Each service account must have 'account_id' and 
'display_name'.")
+        if 'authorized_users' not in account or not 
isinstance(account['authorized_users'], list):
+            raise ValueError("Each service account must have a list of 
'authorized_users'.")
+        
+    return service_accounts_config
+
+def parse_arguments():
+    """Parse command line arguments."""
+    parser = argparse.ArgumentParser(
+        description="KeyService - GCP Service Account Key Management",
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+Examples:
+  python main.py                        # Initialize service accounts and setup
+  python main.py --cron                 # Run key rotation for accounts that 
need it
+  python main.py --get-key my-sa        # Get the latest key for service 
account 'my-sa'
+  python main.py --generate-key my-sa   # Manually rotate key for service 
account 'my-sa'
+        """
+    )
+    
+    group = parser.add_mutually_exclusive_group()
+    group.add_argument(
+        '--cron',
+        action='store_true',
+        help='Run the cron job to rotate keys that require rotation'
+    )
+    group.add_argument(
+        '--get-key',
+        metavar='ACCOUNT_ID',
+        type=str,
+        help='Get the latest key for the specified service account ID'
+    )
+    group.add_argument(
+        '--generate-key',
+        metavar='ACCOUNT_ID',
+        type=str,
+        help='Manually rotate (generate new) key for the specified service 
account ID'
+    )
+    
+    return parser.parse_args()
+
+class KeyService:
+    """Service to manage GCP API keys rotation."""
+
+    # Configuration
+    project_id: str
+    service_accounts: List[ServiceAccount]
+
+    # Clients
+    secret_manager_client: SecretManager
+    service_account_manager: ServiceAccountManager
+    logger: logging.Logger
+
+    def __init__(self, config: ConfigDict, service_accounts_config: 
ServiceAccountsConfig) -> None:
+        """
+        Initializes the KeyService with the provided configuration.
+
+        Args:
+            config (ConfigDict): Configuration dictionary containing:
+                - project_id: GCP project ID
+                - rotation_interval: Interval in days for secret rotation 
+                - max_versions_to_keep: Maximum number of secret versions to 
keep
+                - bucket_name: GCS bucket name for logging
+                - log_file_prefix: Prefix for log file names
+                - logging_level: Logging level (e.g., 'INFO', 'DEBUG')
+        Raises:
+            ValueError: If any required configuration parameter is missing.
+        """
+
+        self.project_id = config['project_id']
+        rotation_interval = config['rotation_interval']
+        max_versions_to_keep = config['max_versions_to_keep']
+        bucket_name = config['bucket_name']
+        log_file_prefix = config['log_file_prefix']
+        logging_level = config['logging_level']
+
+        self.service_accounts = service_accounts_config['service_accounts']
+
+        self.logger = GCPLogger("KeyService", logging_level, bucket_name, 
log_file_prefix, self.project_id)
+        self.secret_manager_client = SecretManager(self.project_id, 
self.logger, rotation_interval, max_versions_to_keep)
+        self.service_account_manager = ServiceAccountManager(self.project_id, 
self.logger)
+
+        self._start_all_service_accounts()
+        self.logger.info(f"Initialized KeyService for project: 
{self.project_id}")
+
+    def __del__(self) -> None:
+        """Manually flush all logs to Google Cloud Storage."""
+        try:
+            for handler in self.logger.handlers:
+                if isinstance(handler, GCSLogHandler):
+                    handler.flush_to_gcs()
+        except Exception:
+            pass
+
+    def cleanup(self) -> None:
+        """Explicit cleanup method to flush logs and close resources."""
+        try:
+            self.logger.info("KeyService cleanup: Flushing logs to Google 
Cloud Storage")
+            for handler in self.logger.handlers:
+                if isinstance(handler, GCSLogHandler):
+                    handler.flush_to_gcs()
+        except Exception as e:
+            self.logger.error(f"Error during cleanup: {e}")
+
+    def _start_all_service_accounts(self) -> None:
+        """
+        Reads the service accounts configuration and checks for service 
accounts.
+
+        1. If a service account does not exist, it will be created.
+        2. If a secret does not exist, it will be created.
+        3. If a service account does not have a key, it will be created.
+        """
+
+        self.logger.debug("Creating service accounts if they do not exist")
+        for account in self.service_accounts:
+            account_id = account['account_id']
+            authorized_users = [user['email'] for user in 
account.get('authorized_users', [])]
+
+            try:
+                # Check if the service account already exists
+                if not 
self.service_account_manager._service_account_exists(account_id):
+                    self.logger.debug(f"Service account {account_id} does not 
exist, creating it")
+                    display_name = account['display_name']
+                    self.logger.info(f"Creating service account: {account_id}")
+                    
self.service_account_manager.create_service_account(account_id, display_name)
+
+                # Check if the secret for the service account exists
+                secret_name = f"{account_id}-key"
+                if not self.secret_manager_client._secret_exists(secret_name):
+                    self.logger.debug(f"Secret {secret_name} does not exist, 
creating it")
+                    self.logger.info(f"Creating secret for service account: 
{account_id}")
+                    self.secret_manager_client.create_secret(secret_name)
+
+                # Check if the secret has the correct access policy
+                if 
self.secret_manager_client.is_different_user_access(secret_name, 
authorized_users):
+                    self.logger.info(f"Updating access policy for secret 
{secret_name}")
+                    
self.secret_manager_client.update_secret_access(secret_name, authorized_users)
+
+                # Start the service account key creation process, ensuring at 
least one key exists
+                self._start_service_account_key(account_id)
+
+                # Check if the service account key exists
+            except Exception as e:
+                self.logger.error(f"Error creating service account or secret 
for {account_id}: {e}")
+
+    def _start_service_account_key(self, account_id: str) -> None:
+        """
+        Creates a service account key for a given service account during 
initialization.
+        This method ensures that each service account has at least one key 
available.
+        During initialization, we always create a key if none exists in Secret 
Manager.
+        
+        Args:
+            account_id (str): The ID of the service account to create a key 
for.
+        """
+        self.logger.debug(f"Starting service account key for {account_id}")
+        try:
+            if not 
self.service_account_manager._service_account_exists(account_id):
+                self.logger.error(f"Service account {account_id} does not 
exist, cannot create key")
+                return
+            
+            # Check if a key exists in Secret Manager
+            secret_name = f"{account_id}-key"
+            try:
+                existing_secret = 
self.secret_manager_client.get_secret_version(secret_name, "latest")
+                if existing_secret:
+                    self.logger.debug(f"Service account {account_id} already 
has a key in Secret Manager, skipping key creation.")
+                    return
+            except Exception:
+                self.logger.debug(f"No existing key found in Secret Manager 
for {account_id}")
+            
+            # If no key exists in Secret Manager, create a new key
+            self.logger.debug(f"Creating service account key for {account_id}")
+            self.create_and_save_service_account_key(account_id)
+            self.logger.debug(f"Service account key for {account_id} created 
and saved successfully.")
+
+        except Exception as e:
+            self.logger.error(f"Error creating service account key for 
{account_id}: {e}")
+
+    def _cron_job(self) -> None:
+        """
+        Cron job to rotate service account keys and secrets.
+        This method should be called periodically based on the rotation 
interval.
+        It will rotate keys that have not been rotated in the last 
rotation_interval days.
+        """
+
+        self.logger.info("Starting cron job for service account key rotation")
+        for account in self.service_accounts:
+            account_id = account['account_id']
+            try:
+                secret_name = f"{account_id}-key"
+                # Check if the service account is due for key rotation
+                if 
self.secret_manager_client._is_key_rotation_due(secret_name):
+                    self.logger.info(f"Rotating service account key for 
{account_id}")
+                    self.rotate_service_account_key(account_id)
+                else:
+                    self.logger.debug(f"Service account key for {account_id} 
is not due for rotation")

Review Comment:
   An idea to implement the 'grace period' in this code - currently it looks 
like this:
   
   ```
   if _is_key_rotation_due(secret_name):
     self.rotate_service_account_key(account_id)
   ```
   
   And `rotate_service_account_key` creates a new key + deletes the old.
   
   Instead, how about:
   
   ```
   if _is_key_rotation_due(secret_name):
     # This function only creates a new key - it doesn't delete or disable the 
old one
     self.renew_service_account_key(account_id)
   
   # Later, we check for deletion of old ones
   if _old_key_past_grace_period(secret_name):
     self.delete_old_sa_key(account_id)
   ```
   
   Where:
   
   ```
   def _old_key_past_grace_period(account_id):
     return get_account(account_id).latest_creation_time - time.now() > 
grace_period:
   ```
   
   lmk if that makes sense



##########
infra/keys/README.md:
##########
@@ -0,0 +1,93 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Service Account Management
+
+This module is used to manage Google Cloud service accounts, including 
creating, retrieving, enabling, and deleting service accounts and their keys. 
It uses the Google Cloud IAM API to perform these operations.
+
+## Features
+
+- Create service accounts and service account keys.
+- Store service account keys securely in Google Secret Manager.
+- Rotate service account keys regularly.
+- Enable and disable service accounts.
+
+## Usage
+
+We use the main.py script to manage service account keys. The script can be 
run with different commands to create, rotate, or retrieve service account keys.
+
+### Prerequisites
+
+- Google Cloud SDK installed and configured.
+- Appropriate permissions to manage service accounts and secrets in your 
Google Cloud project.
+- Required Python packages installed (see requirements.txt).
+
+### Configuration
+
+#### config.yaml
+
+This file contains configuration settings for the service account management, 
including project ID, key rotation settings, and logging configuration.
+
+#### keys.yaml
+
+All the service accounts are managed through a configuration file in YAML 
format, `keys.yaml`. This file contains the necessary information about each 
service account, including its ID, display name, and authorized users.
+
+```yaml
+service_accounts:
+  - account_id: my-service-account
+    display_name: My Service Account
+    authorized_users:
+      - email: us...@example.com
+      - email: us...@example.com
+```
+
+Where:
+
+- `account_id`: The unique identifier for the service account.
+- `display_name`: A human-readable name for the service account.
+- `authorized_users`: A list of users who will be granted access to the 
service account.

Review Comment:
   ```suggestion
   - `authorized_users`: A list of users who will be granted access to the 
service account's keys in secret manager. This group of users will essentially 
be authorized to impersonate the service account in question.
   ```



##########
infra/keys/secret_manager.py:
##########
@@ -0,0 +1,651 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import google_crc32c
+import logging
+import time
+from datetime import datetime, timezone, timedelta
+from google.cloud import secretmanager
+from typing import List, Union
+
+class SecretManagerLoggerAdapter(logging.LoggerAdapter):
+    """Logger adapter that adds a prefix to all log messages."""
+    
+    def process(self, msg, kwargs):
+        return f"[SecretManager] {msg}", kwargs
+
+class SecretManager:
+    """Service to manage GCP API keys rotation."""
+
+    project_id: str # The GCP project ID where secrets are managed
+    rotation_interval: int # The interval (in days) at which to rotate secrets
+    max_versions_to_keep: int # The maximum number of secret versions to keep
+    max_retries: int # The maximum number of retries for API calls
+    client: secretmanager.SecretManagerServiceClient # GCP Secret Manager 
client
+    logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging 
messages
+    secrets_ids: List[str] # List of secret IDs managed by this service
+
+    def __init__(self, project_id: str, logger: logging.Logger, 
rotation_interval: int = 30, max_versions_to_keep: int = 5, max_retries: int = 
3) -> None:
+        self.project_id = project_id
+        self.rotation_interval = rotation_interval
+        self.max_versions_to_keep = max_versions_to_keep
+        self.max_retries = max_retries
+        self.client = secretmanager.SecretManagerServiceClient()
+        self.logger = SecretManagerLoggerAdapter(logger, {})
+        self.logger.info(f"Initialized SecretManager for project 
'{self.project_id}'")
+        self.secrets_ids = self._get_secrets_ids()
+
+    def _get_secrets_ids(self) -> List[str]:
+        """
+        Retrieves the list of secrets from the Secret Manager and populates 
the `secrets_ids` list.
+        This method filters secrets based on a specific label indicating they 
were created by this service.
+
+        Returns:
+            List[str]: A list of secret IDs that were created by this service.
+        """
+        self.logger.debug(f"Retrieving secrets with the label from project 
'{self.project_id}'")
+        secrets_ids = []
+
+        try:
+            for secret in self.client.list_secrets(request={"parent": 
f"projects/{self.project_id}"}):
+                secret_id = secret.name.split("/")[-1]
+                if "created_by" in secret.labels and 
secret.labels["created_by"] == "secretmanager-service":
+                    secrets_ids.append(secret_id)
+        except Exception as e:
+            self.logger.error(f"Error retrieving secrets: {e}")
+
+        self.logger.debug(f"Found {len(secrets_ids)} secrets created by 
secretmanager-service in project '{self.project_id}'")
+        return secrets_ids
+
+    def _secret_exists(self, secret_id: str) -> bool:
+        """
+        Checks if a secret with the given ID exists in the Secret Manager GCP.
+
+        Args:
+            secret_id (str): The ID of the secret to check.
+        Returns:
+            bool: True if the secret exists, False otherwise.
+        """
+        self.logger.debug(f"Checking if secret '{secret_id}' exists")
+        try:
+            name = self.client.secret_path(self.project_id, secret_id)
+            secret = self.client.get_secret(request={"name": name})
+
+            if "created_by" in secret.labels and secret.labels["created_by"] 
== "secretmanager-service":
+                self.logger.debug(f"Secret '{secret_id}' exists and is managed 
by secretmanager-service")
+                return True
+            else:
+                self.logger.debug(f"Secret '{secret_id}' exists but is not 
managed by secretmanager-service")
+                return False
+        except Exception as e:
+            self.logger.debug(f"Secret '{secret_id}' does not exist: {e}")
+            return False
+
+    def create_secret(self, secret_id: str) -> str:
+        """
+        Create a new secret with the given name. A secret is a logical wrapper
+        around a collection of secret versions. Secret versions hold the actual
+        secret material. This method creates a new secret with automatic 
replication
+        and labels for tracking.
+
+        Args:
+            secret_id (str): The ID to assign to the new secret. This ID must 
be unique within the project.
+        Returns:
+            str: The secret path of the newly created secret.
+        """
+        if secret_id in self.secrets_ids:
+            self.logger.debug(f"Secret '{secret_id}' already exists, returning 
existing secret path")
+            name = self.client.secret_path(self.project_id, secret_id)
+            return name
+
+        self.logger.info(f"Creating new secret '{secret_id}' with rotation 
interval of {self.rotation_interval} days")
+        response = self.client.create_secret(
+            request={
+                "parent": f"projects/{self.project_id}",
+                "secret_id": f"{secret_id}",
+                "secret": {
+                    "replication": {
+                        "automatic": {}
+                    },
+                    "labels": {
+                        "created_by": "secretmanager-service",
+                        "created_at": 
datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"),
+                        "rotation_interval_days": str(self.rotation_interval),
+                        "last_version_created_at": 
datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"),
+                    }
+                }
+            }
+        )
+
+        # Wait for the secret to be created
+        self.logger.debug(f"Waiting for secret '{secret_id}' to be created")
+        delay = 1
+        for _ in range(self.max_retries):
+            if self._secret_exists(secret_id):
+                self.logger.debug(f"Secret '{secret_id}' is now available")
+                self.secrets_ids.append(secret_id)
+                break
+            self.logger.debug(f"Secret '{secret_id}' not found, retrying in 
{delay} seconds")
+            time.sleep(delay)
+            delay *= 2
+        else:
+            error_msg = f"Failed to create secret '{secret_id}' after 
{self.max_retries} retries."
+            self.logger.error(error_msg)
+            raise RuntimeError(error_msg)
+
+        self.logger.info(f"Successfully created secret '{secret_id}'")
+        return response.name
+
+    def get_secret(self, secret_id: str) -> secretmanager.Secret:
+        """
+        Retrieves the specified secret by its ID.
+
+        Args:
+            secret_id (str): The ID of the secret to retrieve.
+        Returns:
+            secretmanager.Secret: The requested secret.
+        """
+        self.logger.info(f"Retrieving secret '{secret_id}'")
+
+        if secret_id not in self.secrets_ids:
+            self.logger.debug(f"Secret '{secret_id}' not in cached secrets, 
checking existence")
+            if not self._secret_exists(secret_id):
+                self.logger.error(f"Secret '{secret_id}' does not exist")
+                raise ValueError(f"Secret {secret_id} does not exist. Please 
create it first.")
+            else:
+                self.logger.debug(f"Secret '{secret_id}' exists but is not in 
cached secrets, updating cache")
+                self.secrets_ids.append(secret_id)
+
+        name = self.client.secret_path(self.project_id, secret_id)
+        return self.client.get_secret(request={"name": name})
+
+    def delete_secret(self, secret_id: str) -> None:
+        """
+        Deletes the specified secret and all its versions.
+
+        Args:
+            secret_id (str): The ID of the secret to delete.
+        """
+        if not self._secret_exists(secret_id):
+            self.logger.debug(f"Secret '{secret_id}' does not exist, nothing 
to delete")
+            return
+
+        self.logger.info(f"Deleting secret '{secret_id}' and all its versions")
+        name = self.client.secret_path(self.project_id, secret_id)
+        self.client.delete_secret(request={"name": name})
+        
+        # Wait for the secret to be deleted
+        self.logger.debug(f"Waiting for secret '{secret_id}' to be deleted")
+        delay = 1 
+        for _ in range(self.max_retries):
+            if not self._secret_exists(secret_id):
+                self.logger.debug(f"Secret '{secret_id}' is now deleted")
+                if secret_id in self.secrets_ids:
+                    self.logger.debug(f"Removing '{secret_id}' from cached 
secrets")
+                    self.secrets_ids.remove(secret_id)
+                break
+            self.logger.debug(f"Secret '{secret_id}' still exists, retrying in 
{delay} seconds")
+            time.sleep(delay)
+            delay *= 2
+        else:
+            error_msg = f"Failed to delete secret '{secret_id}' after 
{self.max_retries} retries."
+            self.logger.error(error_msg)
+            raise RuntimeError(error_msg)
+
+        self.logger.info(f"Successfully deleted secret '{secret_id}'")
+
+    def is_different_user_access(self, secret_id: str, allowed_users: 
List[str]) -> bool:
+        """
+        Checks if the current access policy of a secret allows only the 
specified users to read it.
+        This is used to determine if an update is needed.
+
+        Args:
+            secret_id (str): The ID of the secret to check access for.
+            allowed_users (List[str]): A list of user emails to check against 
the current access policy.
+        Returns:
+            bool: True if the current access policy is different from the 
specified users, False otherwise.
+        """
+        self.logger.debug(f"Checking if access for secret '{secret_id}' 
differs from allowed users: {allowed_users}")
+
+        if not self._secret_exists(secret_id):
+            self.logger.debug(f"Secret '{secret_id}' does not exist, cannot 
check access")
+            return True
+        
+        accessor_role = "roles/secretmanager.secretAccessor"
+        resource_name = self.client.secret_path(self.project_id, secret_id)
+        
+        try:
+            policy = self.client.get_iam_policy(request={"resource": 
resource_name})
+        except Exception as e:
+            self.logger.error(f"Failed to get IAM policy for secret 
'{secret_id}': {e}")
+            return True
+
+        current_members = set()
+        for binding in policy.bindings:
+            if binding.role == accessor_role:
+                current_members.update(binding.members)
+
+        allowed_members = {f"user:{user_email}" for user_email in 
allowed_users}
+        
+        is_different = current_members != allowed_members
+        self.logger.debug(f"Current members: {current_members}")
+        self.logger.debug(f"Allowed members: {allowed_members}")
+        self.logger.debug(f"Access for secret '{secret_id}' differs: 
{is_different}")
+        return is_different
+
+    def update_secret_access(self, secret_id: str, allowed_users: List[str]) 
-> None:
+        """
+        Updates the access policy of a secret to allow only the specified 
users to read it.
+        Any existing users will be removed and replaced with the new list.
+
+        Args:
+            secret_id (str): The ID of the secret to update access for.
+            allowed_users (List[str]): A list of user emails to grant read 
access to.
+        """
+        self.logger.debug(f"Updating access for secret '{secret_id}' to allow 
users: {allowed_users}")
+
+        if not self._secret_exists(secret_id):
+            error_msg = f"Secret {secret_id} does not exist. Please create it 
first."
+            self.logger.error(error_msg)
+            raise ValueError(error_msg)
+        
+        accessor_role = "roles/secretmanager.secretAccessor"
+        resource_name = self.client.secret_path(self.project_id, secret_id)
+        policy = self.client.get_iam_policy(request={"resource": 
resource_name})
+
+        members = [f"user:{user_email}" for user_email in allowed_users]
+
+        binding_found = False
+        for binding in policy.bindings:
+            if binding.role == accessor_role:
+                binding.members[:] = members
+                self.logger.debug(f"Replaced members for role 
'{accessor_role}' in secret '{secret_id}' with: {allowed_users}")
+                binding_found = True
+                break
+        
+        if not binding_found:
+            policy.bindings.add(
+                role=accessor_role,
+                members=members
+            )
+            self.logger.debug(f"Created new binding for role '{accessor_role}' 
in secret '{secret_id}'")
+
+        self.client.set_iam_policy(
+            request={
+                "resource": resource_name,
+                "policy": policy
+            }
+        )
+
+        self.logger.info(f"Successfully updated access for secret 
'{secret_id}' to allow users: {allowed_users}")
+
+    def _get_secret_versions(self, secret_id: str) -> 
List[secretmanager.SecretVersion]:
+        """
+       Retrieves all versions of a secret.
+
+        Args:
+            secret_id (str): The ID of the secret to list versions for.
+        Returns:
+            List[secretmanager.SecretVersion]: A list of secret versions.
+        """
+        self.logger.debug(f"Retrieving versions for secret '{secret_id}'")
+
+        if not self._secret_exists(secret_id):
+            self.logger.debug(f"Secret '{secret_id}' does not exist, cannot 
retrieve versions")
+            return []
+
+        parent = self.client.secret_path(self.project_id, secret_id)
+        versions = list(self.client.list_secret_versions(request={"parent": 
parent}))
+        self.logger.debug(f"Found {len(versions)} versions for secret 
'{secret_id}'")
+        return versions
+
+    def _get_enabled_secret_versions(self, secret_id: str) -> 
List[secretmanager.SecretVersion]:
+        """
+        Retrieves all enabled versions of a secret.
+
+        Args:
+            secret_id (str): The ID of the secret to list enabled versions for.
+        Returns:
+            List[secretmanager.SecretVersion]: A list of enabled secret 
versions.
+        """
+        self.logger.debug(f"Retrieving enabled versions for secret 
'{secret_id}'")
+        versions = self._get_secret_versions(secret_id)
+        enabled_versions = [version for version in versions if version.state 
== secretmanager.SecretVersion.State.ENABLED]
+        self.logger.debug(f"Found {len(enabled_versions)} enabled versions for 
secret '{secret_id}'")
+        return enabled_versions
+
+    def _secret_version_exists(self, secret_id: str, version_id: str) -> bool:
+        """
+        Checks if a specific version of a secret exists.
+
+        Args:
+            secret_id (str): The ID of the secret to check.
+            version_id (str): The ID of the version to check.
+        Returns:
+            bool: True if the version exists, False otherwise.
+        """
+        self.logger.debug(f"Checking if version '{version_id}' exists for 
secret '{secret_id}'")
+        if not self._secret_exists(secret_id):
+            self.logger.debug(f"Secret '{secret_id}' does not exist, version 
cannot exist")
+            return False
+        
+        versions = self._get_secret_versions(secret_id)
+        exists = any(version.name.split("/")[-1] == version_id for version in 
versions)
+        self.logger.debug(f"Version '{version_id}' exists: {exists}")
+        return exists
+    
+    def _secret_version_is_enabled(self, secret_id: str, version_id: str) -> 
bool:
+        """
+        Checks if a specific version of a secret is enabled.
+
+        Args:
+            secret_id (str): The ID of the secret to check.
+            version_id (str): The ID of the version to check.
+        Returns:
+            bool: True if the version is enabled, False otherwise.
+        """
+        self.logger.debug(f"Checking if version '{version_id}' of secret 
'{secret_id}' is enabled")
+        if not self._secret_exists(secret_id):
+            self.logger.debug(f"Secret '{secret_id}' does not exist, version 
cannot be enabled")
+            return False
+        
+        versions = self._get_secret_versions(secret_id)
+        for version in versions:
+            if version.name.split("/")[-1] == version_id:
+                is_enabled = version.state == 
secretmanager.SecretVersion.State.ENABLED
+                self.logger.debug(f"Version '{version_id}' is enabled: 
{is_enabled}")
+                return is_enabled
+        self.logger.debug(f"Version '{version_id}' does not exist for secret 
'{secret_id}'")
+        return False
+    
+    def _purge_old_secret_versions(self, secret_id: str) -> None:
+        """
+        Purges old secret versions that are not enabled and exceed the maximum 
allowed versions.
+
+        Args:
+            secret_id (str): The ID of the secret to purge old versions from.
+        """
+        self.logger.debug(f"Purging old versions for secret '{secret_id}'")
+        versions = self._get_secret_versions(secret_id)
+        enabled_versions = [v for v in versions if v.state == 
secretmanager.SecretVersion.State.ENABLED]
+
+        if len(enabled_versions) > self.max_versions_to_keep:
+            versions_to_purge = enabled_versions[:len(enabled_versions) - 
self.max_versions_to_keep]
+            self.logger.debug(f"Found {len(versions_to_purge)} versions to 
purge for secret '{secret_id}'")
+            for version in versions_to_purge:
+                version_id = version.name.split("/")[-1]
+                self.logger.debug(f"Disabling version '{version_id}' of secret 
'{secret_id}'")
+                self.disable_secret_version(secret_id, version_id=version_id)
+        else:
+            self.logger.debug(f"No versions to purge for secret '{secret_id}', 
current count: {len(enabled_versions)}")
+
+    
+    def _get_latest_secret_version_id(self, secret_id: str) -> str:
+        """
+        Retrieves the latest enabled version of a secret.
+
+        Args:
+            secret_id (str): The ID of the secret to retrieve the latest 
version for.
+        Returns:
+            str: The name of the latest secret version.
+        """
+        self.logger.debug(f"Retrieving latest enabled version of secret 
'{secret_id}'")
+
+        for version in self._get_secret_versions(secret_id):
+            if version.state == secretmanager.SecretVersion.State.ENABLED:
+                version_id = version.name.split("/")[-1]
+                self.logger.debug(f"Found latest enabled version 
'{version_id}' for secret '{secret_id}'")
+                return version_id
+        error_msg = f"No enabled versions found for secret {secret_id}."
+        self.logger.error(error_msg)
+        raise ValueError(error_msg)
+    
+    def _get_oldest_secret_version_id(self, secret_id: str) -> str:
+        """
+        Retrieves the oldest version of a secret.
+
+        Args:
+            secret_id (str): The ID of the secret to retrieve the oldest 
version for.
+        Returns:
+            str: The name of the oldest secret version.
+        """
+
+        self.logger.debug(f"Retrieving oldest version of secret '{secret_id}'")
+        for version in reversed(self._get_secret_versions(secret_id)):
+            if version.state == secretmanager.SecretVersion.State.ENABLED:
+                version_id = version.name.split("/")[-1]
+                self.logger.debug(f"Found oldest enabled version 
'{version_id}' for secret '{secret_id}'")
+                return version_id
+        error_msg = f"No enabled versions found for secret {secret_id}."
+        self.logger.error(error_msg)
+        raise ValueError(error_msg)
+
+    def _is_key_rotation_due(self, secret_id: str) -> bool:
+        """
+        Checks if the key rotation is due based on the last version created 
timestamp.
+
+        Args:
+            secret_id (str): The ID of the secret to check.
+        Returns:
+            bool: True if the key rotation is due, False otherwise.
+        """
+        self.logger.debug(f"Checking if key rotation is due for secret 
'{secret_id}'")
+        secret = self.get_secret(secret_id)
+        last_version_created_at = secret.labels.get("last_version_created_at")
+        
+        if not last_version_created_at:
+            self.logger.debug(f"No last version created timestamp found for 
secret '{secret_id}'")
+            return False

Review Comment:
   probably this should be `True`? If we don't know when the last version was 
created, probably we should force a rotation. Thoughts?



##########
infra/keys/config.yaml:
##########
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is the configuration file for the secrets rotation service.
+# It defines the parameters for the secrets rotation process.
+
+# GENERAL CONFIGURATION
+
+# The project ID where the secrets rotation service will run
+project_id: "testing-me-460223"

Review Comment:
   TODO: change this before merging



##########
infra/keys/README.md:
##########
@@ -0,0 +1,93 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Service Account Management
+
+This module is used to manage Google Cloud service accounts, including 
creating, retrieving, enabling, and deleting service accounts and their keys. 
It uses the Google Cloud IAM API to perform these operations.
+
+## Features
+
+- Create service accounts and service account keys.
+- Store service account keys securely in Google Secret Manager.
+- Rotate service account keys regularly.
+- Enable and disable service accounts.
+
+## Usage
+
+We use the main.py script to manage service account keys. The script can be 
run with different commands to create, rotate, or retrieve service account keys.
+
+### Prerequisites
+
+- Google Cloud SDK installed and configured.
+- Appropriate permissions to manage service accounts and secrets in your 
Google Cloud project.
+- Required Python packages installed (see requirements.txt).
+
+### Configuration
+
+#### config.yaml
+
+This file contains configuration settings for the service account management, 
including project ID, key rotation settings, and logging configuration.
+
+#### keys.yaml
+
+All the service accounts are managed through a configuration file in YAML 
format, `keys.yaml`. This file contains the necessary information about each 
service account, including its ID, display name, and authorized users.
+
+```yaml
+service_accounts:
+  - account_id: my-service-account
+    display_name: My Service Account
+    authorized_users:
+      - email: us...@example.com
+      - email: us...@example.com
+```
+
+Where:
+
+- `account_id`: The unique identifier for the service account.
+- `display_name`: A human-readable name for the service account.
+- `authorized_users`: A list of users who will be granted access to the 
service account.
+
+The accounts defined in this file will be created if they do not already exist 
when running the script.
+
+### Rotation

Review Comment:
   It's good that you are explaining how the script works - however most users 
should not need the command - and instead should only edit `keys.yaml` - and a 
Github Action can provision their changes (right?)
   
   So move all of this documentation to an "Advanced usage" section, and make 
sure to mention something like "a regular user will interact with this script 
by editing keys.yaml only. This section is for project administrators only."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to