pabloem commented on code in PR #35524: URL: https://github.com/apache/beam/pull/35524#discussion_r2255833484
########## infra/keys/config.yaml: ########## @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is the configuration file for the secrets rotation service. +# It defines the parameters for the secrets rotation process. + +# GENERAL CONFIGURATION + +# The project ID where the secrets rotation service will run +project_id: "testing-me-460223" + +# Secret service configuration + +# Default secret rotation interval in days +rotation_interval: 30 + +# The maximum number of secrets versions to keep in the secret manager +max_versions_to_keep: 5 + +# LOGGING + +# The secret rotation logs will be stored in a GCP bucket. +bucket_name: "testing-me-460223-tfstate" Review Comment: TODO: Change this before merge ########## infra/keys/secret_manager.py: ########## @@ -0,0 +1,651 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import google_crc32c +import logging +import time +from datetime import datetime, timezone, timedelta +from google.cloud import secretmanager +from typing import List, Union + +class SecretManagerLoggerAdapter(logging.LoggerAdapter): + """Logger adapter that adds a prefix to all log messages.""" + + def process(self, msg, kwargs): + return f"[SecretManager] {msg}", kwargs + +class SecretManager: + """Service to manage GCP API keys rotation.""" + + project_id: str # The GCP project ID where secrets are managed + rotation_interval: int # The interval (in days) at which to rotate secrets + max_versions_to_keep: int # The maximum number of secret versions to keep + max_retries: int # The maximum number of retries for API calls + client: secretmanager.SecretManagerServiceClient # GCP Secret Manager client + logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging messages + secrets_ids: List[str] # List of secret IDs managed by this service + + def __init__(self, project_id: str, logger: logging.Logger, rotation_interval: int = 30, max_versions_to_keep: int = 5, max_retries: int = 3) -> None: + self.project_id = project_id + self.rotation_interval = rotation_interval + self.max_versions_to_keep = max_versions_to_keep + self.max_retries = max_retries + self.client = secretmanager.SecretManagerServiceClient() + self.logger = SecretManagerLoggerAdapter(logger, {}) + self.logger.info(f"Initialized SecretManager for project '{self.project_id}'") + self.secrets_ids = self._get_secrets_ids() + + def _get_secrets_ids(self) -> List[str]: + """ + Retrieves the list of secrets from the Secret Manager and populates the `secrets_ids` list. + This method filters secrets based on a specific label indicating they were created by this service. + + Returns: + List[str]: A list of secret IDs that were created by this service. + """ + self.logger.debug(f"Retrieving secrets with the label from project '{self.project_id}'") + secrets_ids = [] + + try: + for secret in self.client.list_secrets(request={"parent": f"projects/{self.project_id}"}): + secret_id = secret.name.split("/")[-1] + if "created_by" in secret.labels and secret.labels["created_by"] == "secretmanager-service": Review Comment: Let's make "secretmanager-service" a constant - and let's change it to "beam-infra-secret-manager" (because "secretmanager-service" could be misunderstood as the GCP Secret Manager itself) ########## infra/keys/secret_manager.py: ########## @@ -0,0 +1,651 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import google_crc32c +import logging +import time +from datetime import datetime, timezone, timedelta +from google.cloud import secretmanager +from typing import List, Union + +class SecretManagerLoggerAdapter(logging.LoggerAdapter): + """Logger adapter that adds a prefix to all log messages.""" + + def process(self, msg, kwargs): + return f"[SecretManager] {msg}", kwargs + +class SecretManager: + """Service to manage GCP API keys rotation.""" + + project_id: str # The GCP project ID where secrets are managed + rotation_interval: int # The interval (in days) at which to rotate secrets + max_versions_to_keep: int # The maximum number of secret versions to keep + max_retries: int # The maximum number of retries for API calls + client: secretmanager.SecretManagerServiceClient # GCP Secret Manager client + logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging messages + secrets_ids: List[str] # List of secret IDs managed by this service + + def __init__(self, project_id: str, logger: logging.Logger, rotation_interval: int = 30, max_versions_to_keep: int = 5, max_retries: int = 3) -> None: + self.project_id = project_id + self.rotation_interval = rotation_interval + self.max_versions_to_keep = max_versions_to_keep + self.max_retries = max_retries + self.client = secretmanager.SecretManagerServiceClient() + self.logger = SecretManagerLoggerAdapter(logger, {}) + self.logger.info(f"Initialized SecretManager for project '{self.project_id}'") + self.secrets_ids = self._get_secrets_ids() + + def _get_secrets_ids(self) -> List[str]: + """ + Retrieves the list of secrets from the Secret Manager and populates the `secrets_ids` list. + This method filters secrets based on a specific label indicating they were created by this service. + + Returns: + List[str]: A list of secret IDs that were created by this service. + """ + self.logger.debug(f"Retrieving secrets with the label from project '{self.project_id}'") + secrets_ids = [] + + try: + for secret in self.client.list_secrets(request={"parent": f"projects/{self.project_id}"}): + secret_id = secret.name.split("/")[-1] + if "created_by" in secret.labels and secret.labels["created_by"] == "secretmanager-service": + secrets_ids.append(secret_id) + except Exception as e: + self.logger.error(f"Error retrieving secrets: {e}") + + self.logger.debug(f"Found {len(secrets_ids)} secrets created by secretmanager-service in project '{self.project_id}'") + return secrets_ids + + def _secret_exists(self, secret_id: str) -> bool: + """ + Checks if a secret with the given ID exists in the Secret Manager GCP. + + Args: + secret_id (str): The ID of the secret to check. + Returns: + bool: True if the secret exists, False otherwise. + """ + self.logger.debug(f"Checking if secret '{secret_id}' exists") + try: + name = self.client.secret_path(self.project_id, secret_id) + secret = self.client.get_secret(request={"name": name}) + + if "created_by" in secret.labels and secret.labels["created_by"] == "secretmanager-service": + self.logger.debug(f"Secret '{secret_id}' exists and is managed by secretmanager-service") + return True + else: + self.logger.debug(f"Secret '{secret_id}' exists but is not managed by secretmanager-service") + return False + except Exception as e: + self.logger.debug(f"Secret '{secret_id}' does not exist: {e}") + return False + + def create_secret(self, secret_id: str) -> str: + """ + Create a new secret with the given name. A secret is a logical wrapper + around a collection of secret versions. Secret versions hold the actual + secret material. This method creates a new secret with automatic replication + and labels for tracking. + + Args: + secret_id (str): The ID to assign to the new secret. This ID must be unique within the project. + Returns: + str: The secret path of the newly created secret. + """ + if secret_id in self.secrets_ids: + self.logger.debug(f"Secret '{secret_id}' already exists, returning existing secret path") + name = self.client.secret_path(self.project_id, secret_id) + return name + + self.logger.info(f"Creating new secret '{secret_id}' with rotation interval of {self.rotation_interval} days") + response = self.client.create_secret( + request={ + "parent": f"projects/{self.project_id}", + "secret_id": f"{secret_id}", + "secret": { + "replication": { + "automatic": {} + }, + "labels": { + "created_by": "secretmanager-service", + "created_at": datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"), + "rotation_interval_days": str(self.rotation_interval), + "last_version_created_at": datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"), + } + } + } + ) + + # Wait for the secret to be created + self.logger.debug(f"Waiting for secret '{secret_id}' to be created") + delay = 1 + for _ in range(self.max_retries): + if self._secret_exists(secret_id): + self.logger.debug(f"Secret '{secret_id}' is now available") + self.secrets_ids.append(secret_id) + break + self.logger.debug(f"Secret '{secret_id}' not found, retrying in {delay} seconds") + time.sleep(delay) + delay *= 2 + else: + error_msg = f"Failed to create secret '{secret_id}' after {self.max_retries} retries." Review Comment: ```suggestion error_msg = f"Could not verify secret '{secret_id}' was created after {self.max_retries} retries." ``` since we are not retrying creation, but rather the existence check ########## infra/keys/service_account.py: ########## @@ -0,0 +1,485 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import json +import time +from typing import List,Optional +from google.cloud import iam_admin_v1 +from google.cloud.iam_admin_v1 import types +from google.oauth2 import service_account +from google.auth.transport.requests import Request +from google.api_core import exceptions + +class ServiceAccountManagerLoggerAdapter(logging.LoggerAdapter): + """Logger adapter that adds a prefix to all log messages.""" + + def process(self, msg, kwargs): + return f"[ServiceAccountManager] {msg}", kwargs + +class ServiceAccountManager: + def __init__(self, project_id: str, logger: logging.Logger, max_retries: int = 3) -> None: + self.project_id = project_id + self.client = iam_admin_v1.IAMClient() + self.logger = ServiceAccountManagerLoggerAdapter(logger, {}) + self.max_retries = max_retries + self.logger.info(f"Initialized ServiceAccountManager for project: {self.project_id}") + + def _normalize_account_email(self, account_id: str) -> str: + """ + Normalizes the account identifier to a full email format. + + Args: + account_id (str): The unique identifier or email of the service account. + + Returns: + str: The full service account email address. + """ + # Handle both account ID and full email formats + if "@" in account_id and account_id.endswith(".iam.gserviceaccount.com"): + # account_id is already a full email + return account_id + else: + # account_id is just the account name + return f"{account_id}@{self.project_id}.iam.gserviceaccount.com" + + def _get_service_accounts(self) -> List[iam_admin_v1.ServiceAccount]: + """ + Retrieves all service accounts in the specified project. + + Returns: + List[iam_admin_v1.ServiceAccount]: A list of service account objects. + """ + request = types.ListServiceAccountsRequest() + request.name = f"projects/{self.project_id}" + + accounts = self.client.list_service_accounts(request=request) + self.logger.debug(f"Listed service accounts: {[account.email for account in accounts.accounts]}") + return list(accounts.accounts) + + def _service_account_exists(self, account_id: str) -> bool: + """ + Checks if a service account with the given account_id exists in the project. + + Args: + account_id (str): The unique identifier or email of the service account. + + Returns: + bool: True if the service account exists, False otherwise. + """ + try: + self.get_service_account(account_id) + return True + except exceptions.NotFound: + return False + + def _service_account_is_enabled(self, account_id: str) -> bool: + """ + Checks if a service account is enabled. + + Args: + account_id (str): The unique identifier or email of the service account. + + Returns: + bool: True if the service account is enabled, False otherwise. + """ + try: + service_account = self.get_service_account(account_id) + return not service_account.disabled + except exceptions.NotFound: + self.logger.error(f"Service account {account_id} not found") + return False + + def create_service_account(self, account_id: str, display_name: Optional[str] = None) -> types.ServiceAccount: + """ + Creates a service account in the specified project. + If the service account already exists, returns the existing account (idempotent operation). + + Args: + account_id (str): The unique identifier for the service account. + display_name (Optional[str]): A human-readable name for the service account. + Returns: + types.ServiceAccount: The created or existing service account object. + """ + request = types.CreateServiceAccountRequest() + request.account_id = account_id + request.name = f"projects/{self.project_id}" + + service_account = types.ServiceAccount() + service_account.display_name = display_name or account_id + request.service_account = service_account + + try: + account = self.client.create_service_account(request=request) + + # Wait for the service account to be created + delay = 1 + for _ in range(self.max_retries): + if self._service_account_exists(account_id): + break + time.sleep(delay) + delay *= 2 + else: + self.logger.error(f"Service account {account_id} creation timed out after {self.max_retries} retries.") + raise exceptions.DeadlineExceeded(f"Service account {account_id} creation timed out.") + + self.logger.info(f"Created service account: {account.email}") + return account + except exceptions.Conflict: + existing_account = self.get_service_account(account_id) + self.logger.info(f"Service account already exists: {existing_account.email}") + return existing_account + + def get_service_account(self, account_id: str) -> types.ServiceAccount: + """ + Retrieves a service account by its unique identifier or email. + + Args: + account_id (str): The unique identifier or email of the service account. + + Returns: + types.ServiceAccount: The service account object. + """ + service_account_email = self._normalize_account_email(account_id) + + request = types.GetServiceAccountRequest() + request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}" + + try: + service_account = self.client.get_service_account(request=request) + self.logger.info(f"Retrieved service account: {service_account.email}") + return service_account + except exceptions.NotFound: + self.logger.error(f"Service account {account_id} not found") + raise + + def enable_service_account(self, account_id: str) -> None: + """ + Enables a service account in the specified project. + + Args: + account_id (str): The unique identifier or email of the service account to enable. + """ + service_account_email = self._normalize_account_email(account_id) + request = types.EnableServiceAccountRequest() + request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}" + + self.client.enable_service_account(request=request) + + # Wait for the service account to be enabled + delay = 1 + for _ in range(self.max_retries): + if self._service_account_is_enabled(account_id): + break + time.sleep(delay) + delay *= 2 + else: + self.logger.error(f"Service account {account_id} enabling timed out after {self.max_retries} retries.") + raise exceptions.DeadlineExceeded(f"Service account {account_id} enabling timed out.") + + self.logger.info(f"Enabled service account: {account_id}") + + def disable_service_account(self, account_id: str) -> None: + """ + Disables a service account in the specified project. + + Args: + account_id (str): The unique identifier or email of the service account to disable. + """ + service_account_email = self._normalize_account_email(account_id) + request = types.DisableServiceAccountRequest() + request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}" + + self.client.disable_service_account(request=request) + + # Wait for the service account to be disabled + delay = 1 + for _ in range(self.max_retries): + if not self._service_account_is_enabled(account_id): + break + time.sleep(delay) + delay *= 2 + else: + self.logger.error(f"Service account {account_id} disabling timed out after {self.max_retries} retries.") + raise exceptions.DeadlineExceeded(f"Service account {account_id} disabling timed out.") + + self.logger.info(f"Disabled service account: {account_id}") + + def delete_service_account(self, account_id: str) -> None: + """ + Deletes a service account in the specified project. + + Args: + account_id (str): The unique identifier or email of the service account to delete. + """ + service_account_email = self._normalize_account_email(account_id) + request = types.DeleteServiceAccountRequest() + request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}" + + self.client.delete_service_account(request=request) + + # Wait for the service account to be deleted + delay = 1 + for _ in range(self.max_retries): + if not self._service_account_exists(account_id): + break + time.sleep(delay) + delay *= 2 + else: + self.logger.error(f"Service account {account_id} deletion timed out after {self.max_retries} retries.") + raise exceptions.DeadlineExceeded(f"Service account {account_id} deletion timed out.") + + self.logger.info(f"Deleted service account: {account_id}") + + def _get_service_account_keys(self, account_id: str) -> List[iam_admin_v1.ServiceAccountKey]: + """ + Retrieves all keys for the specified service account. + + Args: + account_id (str): The unique identifier or email of the service account. + + Returns: + List[iam_admin_v1.ServiceAccountKey]: A list of service account key objects. + """ + service_account_email = self._normalize_account_email(account_id) + request = types.ListServiceAccountKeysRequest() + request.name = f"projects/{self.project_id}/serviceAccounts/{service_account_email}" + + response = self.client.list_service_account_keys(request=request) + self.logger.debug(f"Listed keys for service account: {account_id}") + return list(response.keys) + + def _service_account_key_exists(self, account_id: str, key_id: str) -> bool: + """ + Checks if a service account key exists for the specified service account. + + Args: + account_id (str): The unique identifier or email of the service account. + key_id (str): The ID of the service account key to check. + + Returns: + bool: True if the key exists, False otherwise. + """ + keys = self._get_service_account_keys(account_id) + return any(key.name.endswith(key_id) for key in keys) Review Comment: why checking `endswith` instead of full equality? ########## infra/keys/gcp_logger.py: ########## @@ -0,0 +1,153 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +import logging +from google.cloud import storage +from typing import List +from datetime import datetime + +class GCSLogHandler(logging.Handler): Review Comment: this is useful - just make sure that the logs are also printed to stderr or stdout - because the script will mainly run in GHA ########## infra/keys/config.yaml: ########## @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is the configuration file for the secrets rotation service. +# It defines the parameters for the secrets rotation process. + +# GENERAL CONFIGURATION + +# The project ID where the secrets rotation service will run +project_id: "testing-me-460223" + +# Secret service configuration + +# Default secret rotation interval in days +rotation_interval: 30 + +# The maximum number of secrets versions to keep in the secret manager +max_versions_to_keep: 5 Review Comment: let's make this 2, since we only care about the current and the previous key. All older keys should not work at all ever. ########## infra/keys/secret_manager.py: ########## @@ -0,0 +1,651 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import google_crc32c +import logging +import time +from datetime import datetime, timezone, timedelta +from google.cloud import secretmanager +from typing import List, Union + +class SecretManagerLoggerAdapter(logging.LoggerAdapter): + """Logger adapter that adds a prefix to all log messages.""" + + def process(self, msg, kwargs): + return f"[SecretManager] {msg}", kwargs + +class SecretManager: + """Service to manage GCP API keys rotation.""" + + project_id: str # The GCP project ID where secrets are managed + rotation_interval: int # The interval (in days) at which to rotate secrets + max_versions_to_keep: int # The maximum number of secret versions to keep + max_retries: int # The maximum number of retries for API calls + client: secretmanager.SecretManagerServiceClient # GCP Secret Manager client + logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging messages + secrets_ids: List[str] # List of secret IDs managed by this service + + def __init__(self, project_id: str, logger: logging.Logger, rotation_interval: int = 30, max_versions_to_keep: int = 5, max_retries: int = 3) -> None: + self.project_id = project_id + self.rotation_interval = rotation_interval + self.max_versions_to_keep = max_versions_to_keep + self.max_retries = max_retries + self.client = secretmanager.SecretManagerServiceClient() + self.logger = SecretManagerLoggerAdapter(logger, {}) + self.logger.info(f"Initialized SecretManager for project '{self.project_id}'") + self.secrets_ids = self._get_secrets_ids() + + def _get_secrets_ids(self) -> List[str]: + """ + Retrieves the list of secrets from the Secret Manager and populates the `secrets_ids` list. + This method filters secrets based on a specific label indicating they were created by this service. + + Returns: + List[str]: A list of secret IDs that were created by this service. + """ + self.logger.debug(f"Retrieving secrets with the label from project '{self.project_id}'") + secrets_ids = [] + + try: + for secret in self.client.list_secrets(request={"parent": f"projects/{self.project_id}"}): + secret_id = secret.name.split("/")[-1] + if "created_by" in secret.labels and secret.labels["created_by"] == "secretmanager-service": + secrets_ids.append(secret_id) + except Exception as e: + self.logger.error(f"Error retrieving secrets: {e}") + + self.logger.debug(f"Found {len(secrets_ids)} secrets created by secretmanager-service in project '{self.project_id}'") + return secrets_ids + + def _secret_exists(self, secret_id: str) -> bool: + """ + Checks if a secret with the given ID exists in the Secret Manager GCP. + + Args: + secret_id (str): The ID of the secret to check. + Returns: + bool: True if the secret exists, False otherwise. + """ + self.logger.debug(f"Checking if secret '{secret_id}' exists") + try: + name = self.client.secret_path(self.project_id, secret_id) + secret = self.client.get_secret(request={"name": name}) + + if "created_by" in secret.labels and secret.labels["created_by"] == "secretmanager-service": + self.logger.debug(f"Secret '{secret_id}' exists and is managed by secretmanager-service") + return True + else: + self.logger.debug(f"Secret '{secret_id}' exists but is not managed by secretmanager-service") + return False + except Exception as e: + self.logger.debug(f"Secret '{secret_id}' does not exist: {e}") + return False + + def create_secret(self, secret_id: str) -> str: + """ + Create a new secret with the given name. A secret is a logical wrapper + around a collection of secret versions. Secret versions hold the actual + secret material. This method creates a new secret with automatic replication + and labels for tracking. + + Args: + secret_id (str): The ID to assign to the new secret. This ID must be unique within the project. + Returns: + str: The secret path of the newly created secret. + """ + if secret_id in self.secrets_ids: + self.logger.debug(f"Secret '{secret_id}' already exists, returning existing secret path") + name = self.client.secret_path(self.project_id, secret_id) + return name + + self.logger.info(f"Creating new secret '{secret_id}' with rotation interval of {self.rotation_interval} days") + response = self.client.create_secret( + request={ + "parent": f"projects/{self.project_id}", + "secret_id": f"{secret_id}", + "secret": { + "replication": { + "automatic": {} + }, + "labels": { + "created_by": "secretmanager-service", + "created_at": datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"), + "rotation_interval_days": str(self.rotation_interval), Review Comment: what happens after `rotation_interval_days` with a secret that has not been rotated? Nothing? Does GCP SM delete it or something like that? ########## infra/keys/README.md: ########## @@ -0,0 +1,93 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Service Account Management + +This module is used to manage Google Cloud service accounts, including creating, retrieving, enabling, and deleting service accounts and their keys. It uses the Google Cloud IAM API to perform these operations. + +## Features + +- Create service accounts and service account keys. +- Store service account keys securely in Google Secret Manager. +- Rotate service account keys regularly. +- Enable and disable service accounts. + +## Usage + +We use the main.py script to manage service account keys. The script can be run with different commands to create, rotate, or retrieve service account keys. + +### Prerequisites + +- Google Cloud SDK installed and configured. +- Appropriate permissions to manage service accounts and secrets in your Google Cloud project. +- Required Python packages installed (see requirements.txt). + +### Configuration + +#### config.yaml + +This file contains configuration settings for the service account management, including project ID, key rotation settings, and logging configuration. + +#### keys.yaml + +All the service accounts are managed through a configuration file in YAML format, `keys.yaml`. This file contains the necessary information about each service account, including its ID, display name, and authorized users. + +```yaml +service_accounts: + - account_id: my-service-account + display_name: My Service Account + authorized_users: + - email: us...@example.com + - email: us...@example.com +``` + +Where: + +- `account_id`: The unique identifier for the service account. Review Comment: Add something like > The full name of the service account will be `$account...@googleserviceaccounts.com` or whatever is the right domain name. ########## infra/keys/main.py: ########## @@ -0,0 +1,476 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import traceback +import yaml +import logging +import argparse +import sys +from typing import List, TypedDict +# Importing custom modules +from gcp_logger import GCSLogHandler, GCPLogger +from secret_manager import SecretManager +from service_account import ServiceAccountManager + + +# --- Configuration --- +CONFIG_FILE = 'config.yaml' +KEYS_FILE = 'keys.yaml' + +class ConfigDict(TypedDict): + project_id: str + rotation_interval: int + max_versions_to_keep: int + bucket_name: str + log_file_prefix: str + logging_level: str + +class AuthorizedUser(TypedDict): + email: str + +class ServiceAccount(TypedDict): + account_id: str + display_name: str + authorized_users: List[AuthorizedUser] + +class ServiceAccountsConfig(TypedDict): + service_accounts: List[ServiceAccount] + +def load_config() -> ConfigDict: + """Loads the configuration from the YAML file.""" + with open(CONFIG_FILE, 'r') as f: + config = yaml.safe_load(f) + + if not config: + raise ValueError("Configuration file is empty or invalid.") + + required_keys = ['project_id', 'rotation_interval', 'max_versions_to_keep', 'bucket_name', 'log_file_prefix'] + for key in required_keys: Review Comment: Nit: This could be done a little 'fancier' with sets. E.g.: ``` required_keys = set([....]) missing_keys = required_keys - config.keys() if missing_keys: raise ValueError(f"Configuration is missing the following keys: {missing_keys}") ``` This way you tell the user every key that's missing right away instead of failing once for every new key. This change is optional though. It's only a smol enhancement. ########## infra/keys/README.md: ########## @@ -0,0 +1,93 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Service Account Management + +This module is used to manage Google Cloud service accounts, including creating, retrieving, enabling, and deleting service accounts and their keys. It uses the Google Cloud IAM API to perform these operations. + +## Features + +- Create service accounts and service account keys. +- Store service account keys securely in Google Secret Manager. +- Rotate service account keys regularly. +- Enable and disable service accounts. + +## Usage + +We use the main.py script to manage service account keys. The script can be run with different commands to create, rotate, or retrieve service account keys. + +### Prerequisites + +- Google Cloud SDK installed and configured. +- Appropriate permissions to manage service accounts and secrets in your Google Cloud project. +- Required Python packages installed (see requirements.txt). + +### Configuration + +#### config.yaml + +This file contains configuration settings for the service account management, including project ID, key rotation settings, and logging configuration. + +#### keys.yaml + +All the service accounts are managed through a configuration file in YAML format, `keys.yaml`. This file contains the necessary information about each service account, including its ID, display name, and authorized users. + +```yaml +service_accounts: + - account_id: my-service-account + display_name: My Service Account + authorized_users: + - email: us...@example.com + - email: us...@example.com +``` + +Where: + +- `account_id`: The unique identifier for the service account. +- `display_name`: A human-readable name for the service account. +- `authorized_users`: A list of users who will be granted access to the service account. + +The accounts defined in this file will be created if they do not already exist when running the script. Review Comment: What happens if they already do exist? Will the whole thing fail? Or will things work 'as intended'? Depending on the answer, please update the doc line to explain that. ########## infra/keys/secret_manager.py: ########## @@ -0,0 +1,651 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import google_crc32c +import logging +import time +from datetime import datetime, timezone, timedelta +from google.cloud import secretmanager +from typing import List, Union + +class SecretManagerLoggerAdapter(logging.LoggerAdapter): + """Logger adapter that adds a prefix to all log messages.""" + + def process(self, msg, kwargs): + return f"[SecretManager] {msg}", kwargs + +class SecretManager: + """Service to manage GCP API keys rotation.""" + + project_id: str # The GCP project ID where secrets are managed + rotation_interval: int # The interval (in days) at which to rotate secrets + max_versions_to_keep: int # The maximum number of secret versions to keep + max_retries: int # The maximum number of retries for API calls + client: secretmanager.SecretManagerServiceClient # GCP Secret Manager client + logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging messages + secrets_ids: List[str] # List of secret IDs managed by this service Review Comment: "secret_ids" ; ) inglish grammar ########## infra/keys/main.py: ########## @@ -0,0 +1,476 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import traceback +import yaml +import logging +import argparse +import sys +from typing import List, TypedDict +# Importing custom modules +from gcp_logger import GCSLogHandler, GCPLogger +from secret_manager import SecretManager +from service_account import ServiceAccountManager + + +# --- Configuration --- +CONFIG_FILE = 'config.yaml' +KEYS_FILE = 'keys.yaml' + +class ConfigDict(TypedDict): + project_id: str + rotation_interval: int + max_versions_to_keep: int + bucket_name: str + log_file_prefix: str + logging_level: str + +class AuthorizedUser(TypedDict): + email: str + +class ServiceAccount(TypedDict): + account_id: str + display_name: str + authorized_users: List[AuthorizedUser] + +class ServiceAccountsConfig(TypedDict): + service_accounts: List[ServiceAccount] + +def load_config() -> ConfigDict: + """Loads the configuration from the YAML file.""" + with open(CONFIG_FILE, 'r') as f: + config = yaml.safe_load(f) + + if not config: + raise ValueError("Configuration file is empty or invalid.") + + required_keys = ['project_id', 'rotation_interval', 'max_versions_to_keep', 'bucket_name', 'log_file_prefix'] + for key in required_keys: + if key not in config: + raise ValueError(f"Missing required configuration key: {key}") + + if not isinstance(config['rotation_interval'], int) or config['rotation_interval'] <= 0: + raise ValueError("Configuration 'rotation_interval' must be a positive integer.") + if not isinstance(config['max_versions_to_keep'], int) or config['max_versions_to_keep'] <= 0: + raise ValueError("Configuration 'max_versions_to_keep' must be a positive integer.") + if not isinstance(config['bucket_name'], str) or not config['bucket_name'].strip(): + raise ValueError("Configuration 'bucket_name' must be a non-empty string.") + if not isinstance(config['log_file_prefix'], str) or not config['log_file_prefix'].strip(): + raise ValueError("Configuration 'log_file_prefix' must be a non-empty string.") + if 'logging_level' in config: + if not isinstance(config['logging_level'], str) or config['logging_level'].strip() not in logging._nameToLevel: + raise ValueError("Configuration 'logging_level' must be one of: " + ", ".join(logging._nameToLevel.keys())) + else: + config['logging_level'] = 'INFO' + + return config + +def load_service_accounts_config() -> ServiceAccountsConfig: + """Loads the service accounts configuration from the YAML file.""" + with open(KEYS_FILE, 'r') as f: + service_accounts_config = yaml.safe_load(f) + + if not service_accounts_config or 'service_accounts' not in service_accounts_config: + raise ValueError("Service accounts configuration file is empty or invalid.") + + if not isinstance(service_accounts_config['service_accounts'], list): + raise ValueError("Service accounts configuration must be a list of service accounts.") + + for account in service_accounts_config['service_accounts']: + if 'account_id' not in account or 'display_name' not in account: + raise ValueError("Each service account must have 'account_id' and 'display_name'.") + if 'authorized_users' not in account or not isinstance(account['authorized_users'], list): + raise ValueError("Each service account must have a list of 'authorized_users'.") + + return service_accounts_config + +def parse_arguments(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="KeyService - GCP Service Account Key Management", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python main.py # Initialize service accounts and setup + python main.py --cron # Run key rotation for accounts that need it + python main.py --get-key my-sa # Get the latest key for service account 'my-sa' + python main.py --generate-key my-sa # Manually rotate key for service account 'my-sa' + """ + ) + + group = parser.add_mutually_exclusive_group() + group.add_argument( + '--cron', + action='store_true', + help='Run the cron job to rotate keys that require rotation' + ) + group.add_argument( + '--get-key', + metavar='ACCOUNT_ID', + type=str, + help='Get the latest key for the specified service account ID' + ) + group.add_argument( + '--generate-key', + metavar='ACCOUNT_ID', + type=str, + help='Manually rotate (generate new) key for the specified service account ID' + ) + + return parser.parse_args() + +class KeyService: + """Service to manage GCP API keys rotation.""" + + # Configuration + project_id: str + service_accounts: List[ServiceAccount] + + # Clients + secret_manager_client: SecretManager + service_account_manager: ServiceAccountManager + logger: logging.Logger + + def __init__(self, config: ConfigDict, service_accounts_config: ServiceAccountsConfig) -> None: + """ + Initializes the KeyService with the provided configuration. + + Args: + config (ConfigDict): Configuration dictionary containing: + - project_id: GCP project ID + - rotation_interval: Interval in days for secret rotation + - max_versions_to_keep: Maximum number of secret versions to keep + - bucket_name: GCS bucket name for logging + - log_file_prefix: Prefix for log file names + - logging_level: Logging level (e.g., 'INFO', 'DEBUG') + Raises: + ValueError: If any required configuration parameter is missing. + """ + + self.project_id = config['project_id'] + rotation_interval = config['rotation_interval'] + max_versions_to_keep = config['max_versions_to_keep'] + bucket_name = config['bucket_name'] + log_file_prefix = config['log_file_prefix'] + logging_level = config['logging_level'] + + self.service_accounts = service_accounts_config['service_accounts'] + + self.logger = GCPLogger("KeyService", logging_level, bucket_name, log_file_prefix, self.project_id) + self.secret_manager_client = SecretManager(self.project_id, self.logger, rotation_interval, max_versions_to_keep) + self.service_account_manager = ServiceAccountManager(self.project_id, self.logger) + + self._start_all_service_accounts() + self.logger.info(f"Initialized KeyService for project: {self.project_id}") + + def __del__(self) -> None: + """Manually flush all logs to Google Cloud Storage.""" + try: + for handler in self.logger.handlers: + if isinstance(handler, GCSLogHandler): + handler.flush_to_gcs() + except Exception: + pass + + def cleanup(self) -> None: + """Explicit cleanup method to flush logs and close resources.""" + try: + self.logger.info("KeyService cleanup: Flushing logs to Google Cloud Storage") + for handler in self.logger.handlers: + if isinstance(handler, GCSLogHandler): + handler.flush_to_gcs() + except Exception as e: + self.logger.error(f"Error during cleanup: {e}") + + def _start_all_service_accounts(self) -> None: + """ + Reads the service accounts configuration and checks for service accounts. + + 1. If a service account does not exist, it will be created. + 2. If a secret does not exist, it will be created. + 3. If a service account does not have a key, it will be created. + """ + + self.logger.debug("Creating service accounts if they do not exist") + for account in self.service_accounts: + account_id = account['account_id'] + authorized_users = [user['email'] for user in account.get('authorized_users', [])] + + try: + # Check if the service account already exists + if not self.service_account_manager._service_account_exists(account_id): + self.logger.debug(f"Service account {account_id} does not exist, creating it") + display_name = account['display_name'] + self.logger.info(f"Creating service account: {account_id}") + self.service_account_manager.create_service_account(account_id, display_name) + + # Check if the secret for the service account exists + secret_name = f"{account_id}-key" + if not self.secret_manager_client._secret_exists(secret_name): + self.logger.debug(f"Secret {secret_name} does not exist, creating it") + self.logger.info(f"Creating secret for service account: {account_id}") + self.secret_manager_client.create_secret(secret_name) + + # Check if the secret has the correct access policy + if self.secret_manager_client.is_different_user_access(secret_name, authorized_users): + self.logger.info(f"Updating access policy for secret {secret_name}") + self.secret_manager_client.update_secret_access(secret_name, authorized_users) + + # Start the service account key creation process, ensuring at least one key exists + self._start_service_account_key(account_id) + + # Check if the service account key exists + except Exception as e: + self.logger.error(f"Error creating service account or secret for {account_id}: {e}") + + def _start_service_account_key(self, account_id: str) -> None: + """ + Creates a service account key for a given service account during initialization. + This method ensures that each service account has at least one key available. + During initialization, we always create a key if none exists in Secret Manager. + + Args: + account_id (str): The ID of the service account to create a key for. + """ + self.logger.debug(f"Starting service account key for {account_id}") + try: + if not self.service_account_manager._service_account_exists(account_id): + self.logger.error(f"Service account {account_id} does not exist, cannot create key") + return + + # Check if a key exists in Secret Manager + secret_name = f"{account_id}-key" + try: + existing_secret = self.secret_manager_client.get_secret_version(secret_name, "latest") + if existing_secret: + self.logger.debug(f"Service account {account_id} already has a key in Secret Manager, skipping key creation.") + return + except Exception: + self.logger.debug(f"No existing key found in Secret Manager for {account_id}") + + # If no key exists in Secret Manager, create a new key + self.logger.debug(f"Creating service account key for {account_id}") + self.create_and_save_service_account_key(account_id) + self.logger.debug(f"Service account key for {account_id} created and saved successfully.") + + except Exception as e: + self.logger.error(f"Error creating service account key for {account_id}: {e}") + + def _cron_job(self) -> None: + """ + Cron job to rotate service account keys and secrets. + This method should be called periodically based on the rotation interval. + It will rotate keys that have not been rotated in the last rotation_interval days. + """ + + self.logger.info("Starting cron job for service account key rotation") + for account in self.service_accounts: + account_id = account['account_id'] + try: + secret_name = f"{account_id}-key" + # Check if the service account is due for key rotation + if self.secret_manager_client._is_key_rotation_due(secret_name): + self.logger.info(f"Rotating service account key for {account_id}") + self.rotate_service_account_key(account_id) + else: + self.logger.debug(f"Service account key for {account_id} is not due for rotation") Review Comment: An idea to implement the 'grace period' in this code - currently it looks like this: ``` if _is_key_rotation_due(secret_name): self.rotate_service_account_key(account_id) ``` And `rotate_service_account_key` creates a new key + deletes the old. Instead, how about: ``` if _is_key_rotation_due(secret_name): # This function only creates a new key - it doesn't delete or disable the old one self.renew_service_account_key(account_id) # Later, we check for deletion of old ones if _old_key_past_grace_period(secret_name): self.delete_old_sa_key(account_id) ``` Where: ``` def _old_key_past_grace_period(account_id): return get_account(account_id).latest_creation_time - time.now() > grace_period: ``` lmk if that makes sense ########## infra/keys/README.md: ########## @@ -0,0 +1,93 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Service Account Management + +This module is used to manage Google Cloud service accounts, including creating, retrieving, enabling, and deleting service accounts and their keys. It uses the Google Cloud IAM API to perform these operations. + +## Features + +- Create service accounts and service account keys. +- Store service account keys securely in Google Secret Manager. +- Rotate service account keys regularly. +- Enable and disable service accounts. + +## Usage + +We use the main.py script to manage service account keys. The script can be run with different commands to create, rotate, or retrieve service account keys. + +### Prerequisites + +- Google Cloud SDK installed and configured. +- Appropriate permissions to manage service accounts and secrets in your Google Cloud project. +- Required Python packages installed (see requirements.txt). + +### Configuration + +#### config.yaml + +This file contains configuration settings for the service account management, including project ID, key rotation settings, and logging configuration. + +#### keys.yaml + +All the service accounts are managed through a configuration file in YAML format, `keys.yaml`. This file contains the necessary information about each service account, including its ID, display name, and authorized users. + +```yaml +service_accounts: + - account_id: my-service-account + display_name: My Service Account + authorized_users: + - email: us...@example.com + - email: us...@example.com +``` + +Where: + +- `account_id`: The unique identifier for the service account. +- `display_name`: A human-readable name for the service account. +- `authorized_users`: A list of users who will be granted access to the service account. Review Comment: ```suggestion - `authorized_users`: A list of users who will be granted access to the service account's keys in secret manager. This group of users will essentially be authorized to impersonate the service account in question. ``` ########## infra/keys/secret_manager.py: ########## @@ -0,0 +1,651 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import google_crc32c +import logging +import time +from datetime import datetime, timezone, timedelta +from google.cloud import secretmanager +from typing import List, Union + +class SecretManagerLoggerAdapter(logging.LoggerAdapter): + """Logger adapter that adds a prefix to all log messages.""" + + def process(self, msg, kwargs): + return f"[SecretManager] {msg}", kwargs + +class SecretManager: + """Service to manage GCP API keys rotation.""" + + project_id: str # The GCP project ID where secrets are managed + rotation_interval: int # The interval (in days) at which to rotate secrets + max_versions_to_keep: int # The maximum number of secret versions to keep + max_retries: int # The maximum number of retries for API calls + client: secretmanager.SecretManagerServiceClient # GCP Secret Manager client + logger: Union[logging.Logger, logging.LoggerAdapter] # Logger for logging messages + secrets_ids: List[str] # List of secret IDs managed by this service + + def __init__(self, project_id: str, logger: logging.Logger, rotation_interval: int = 30, max_versions_to_keep: int = 5, max_retries: int = 3) -> None: + self.project_id = project_id + self.rotation_interval = rotation_interval + self.max_versions_to_keep = max_versions_to_keep + self.max_retries = max_retries + self.client = secretmanager.SecretManagerServiceClient() + self.logger = SecretManagerLoggerAdapter(logger, {}) + self.logger.info(f"Initialized SecretManager for project '{self.project_id}'") + self.secrets_ids = self._get_secrets_ids() + + def _get_secrets_ids(self) -> List[str]: + """ + Retrieves the list of secrets from the Secret Manager and populates the `secrets_ids` list. + This method filters secrets based on a specific label indicating they were created by this service. + + Returns: + List[str]: A list of secret IDs that were created by this service. + """ + self.logger.debug(f"Retrieving secrets with the label from project '{self.project_id}'") + secrets_ids = [] + + try: + for secret in self.client.list_secrets(request={"parent": f"projects/{self.project_id}"}): + secret_id = secret.name.split("/")[-1] + if "created_by" in secret.labels and secret.labels["created_by"] == "secretmanager-service": + secrets_ids.append(secret_id) + except Exception as e: + self.logger.error(f"Error retrieving secrets: {e}") + + self.logger.debug(f"Found {len(secrets_ids)} secrets created by secretmanager-service in project '{self.project_id}'") + return secrets_ids + + def _secret_exists(self, secret_id: str) -> bool: + """ + Checks if a secret with the given ID exists in the Secret Manager GCP. + + Args: + secret_id (str): The ID of the secret to check. + Returns: + bool: True if the secret exists, False otherwise. + """ + self.logger.debug(f"Checking if secret '{secret_id}' exists") + try: + name = self.client.secret_path(self.project_id, secret_id) + secret = self.client.get_secret(request={"name": name}) + + if "created_by" in secret.labels and secret.labels["created_by"] == "secretmanager-service": + self.logger.debug(f"Secret '{secret_id}' exists and is managed by secretmanager-service") + return True + else: + self.logger.debug(f"Secret '{secret_id}' exists but is not managed by secretmanager-service") + return False + except Exception as e: + self.logger.debug(f"Secret '{secret_id}' does not exist: {e}") + return False + + def create_secret(self, secret_id: str) -> str: + """ + Create a new secret with the given name. A secret is a logical wrapper + around a collection of secret versions. Secret versions hold the actual + secret material. This method creates a new secret with automatic replication + and labels for tracking. + + Args: + secret_id (str): The ID to assign to the new secret. This ID must be unique within the project. + Returns: + str: The secret path of the newly created secret. + """ + if secret_id in self.secrets_ids: + self.logger.debug(f"Secret '{secret_id}' already exists, returning existing secret path") + name = self.client.secret_path(self.project_id, secret_id) + return name + + self.logger.info(f"Creating new secret '{secret_id}' with rotation interval of {self.rotation_interval} days") + response = self.client.create_secret( + request={ + "parent": f"projects/{self.project_id}", + "secret_id": f"{secret_id}", + "secret": { + "replication": { + "automatic": {} + }, + "labels": { + "created_by": "secretmanager-service", + "created_at": datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"), + "rotation_interval_days": str(self.rotation_interval), + "last_version_created_at": datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S"), + } + } + } + ) + + # Wait for the secret to be created + self.logger.debug(f"Waiting for secret '{secret_id}' to be created") + delay = 1 + for _ in range(self.max_retries): + if self._secret_exists(secret_id): + self.logger.debug(f"Secret '{secret_id}' is now available") + self.secrets_ids.append(secret_id) + break + self.logger.debug(f"Secret '{secret_id}' not found, retrying in {delay} seconds") + time.sleep(delay) + delay *= 2 + else: + error_msg = f"Failed to create secret '{secret_id}' after {self.max_retries} retries." + self.logger.error(error_msg) + raise RuntimeError(error_msg) + + self.logger.info(f"Successfully created secret '{secret_id}'") + return response.name + + def get_secret(self, secret_id: str) -> secretmanager.Secret: + """ + Retrieves the specified secret by its ID. + + Args: + secret_id (str): The ID of the secret to retrieve. + Returns: + secretmanager.Secret: The requested secret. + """ + self.logger.info(f"Retrieving secret '{secret_id}'") + + if secret_id not in self.secrets_ids: + self.logger.debug(f"Secret '{secret_id}' not in cached secrets, checking existence") + if not self._secret_exists(secret_id): + self.logger.error(f"Secret '{secret_id}' does not exist") + raise ValueError(f"Secret {secret_id} does not exist. Please create it first.") + else: + self.logger.debug(f"Secret '{secret_id}' exists but is not in cached secrets, updating cache") + self.secrets_ids.append(secret_id) + + name = self.client.secret_path(self.project_id, secret_id) + return self.client.get_secret(request={"name": name}) + + def delete_secret(self, secret_id: str) -> None: + """ + Deletes the specified secret and all its versions. + + Args: + secret_id (str): The ID of the secret to delete. + """ + if not self._secret_exists(secret_id): + self.logger.debug(f"Secret '{secret_id}' does not exist, nothing to delete") + return + + self.logger.info(f"Deleting secret '{secret_id}' and all its versions") + name = self.client.secret_path(self.project_id, secret_id) + self.client.delete_secret(request={"name": name}) + + # Wait for the secret to be deleted + self.logger.debug(f"Waiting for secret '{secret_id}' to be deleted") + delay = 1 + for _ in range(self.max_retries): + if not self._secret_exists(secret_id): + self.logger.debug(f"Secret '{secret_id}' is now deleted") + if secret_id in self.secrets_ids: + self.logger.debug(f"Removing '{secret_id}' from cached secrets") + self.secrets_ids.remove(secret_id) + break + self.logger.debug(f"Secret '{secret_id}' still exists, retrying in {delay} seconds") + time.sleep(delay) + delay *= 2 + else: + error_msg = f"Failed to delete secret '{secret_id}' after {self.max_retries} retries." + self.logger.error(error_msg) + raise RuntimeError(error_msg) + + self.logger.info(f"Successfully deleted secret '{secret_id}'") + + def is_different_user_access(self, secret_id: str, allowed_users: List[str]) -> bool: + """ + Checks if the current access policy of a secret allows only the specified users to read it. + This is used to determine if an update is needed. + + Args: + secret_id (str): The ID of the secret to check access for. + allowed_users (List[str]): A list of user emails to check against the current access policy. + Returns: + bool: True if the current access policy is different from the specified users, False otherwise. + """ + self.logger.debug(f"Checking if access for secret '{secret_id}' differs from allowed users: {allowed_users}") + + if not self._secret_exists(secret_id): + self.logger.debug(f"Secret '{secret_id}' does not exist, cannot check access") + return True + + accessor_role = "roles/secretmanager.secretAccessor" + resource_name = self.client.secret_path(self.project_id, secret_id) + + try: + policy = self.client.get_iam_policy(request={"resource": resource_name}) + except Exception as e: + self.logger.error(f"Failed to get IAM policy for secret '{secret_id}': {e}") + return True + + current_members = set() + for binding in policy.bindings: + if binding.role == accessor_role: + current_members.update(binding.members) + + allowed_members = {f"user:{user_email}" for user_email in allowed_users} + + is_different = current_members != allowed_members + self.logger.debug(f"Current members: {current_members}") + self.logger.debug(f"Allowed members: {allowed_members}") + self.logger.debug(f"Access for secret '{secret_id}' differs: {is_different}") + return is_different + + def update_secret_access(self, secret_id: str, allowed_users: List[str]) -> None: + """ + Updates the access policy of a secret to allow only the specified users to read it. + Any existing users will be removed and replaced with the new list. + + Args: + secret_id (str): The ID of the secret to update access for. + allowed_users (List[str]): A list of user emails to grant read access to. + """ + self.logger.debug(f"Updating access for secret '{secret_id}' to allow users: {allowed_users}") + + if not self._secret_exists(secret_id): + error_msg = f"Secret {secret_id} does not exist. Please create it first." + self.logger.error(error_msg) + raise ValueError(error_msg) + + accessor_role = "roles/secretmanager.secretAccessor" + resource_name = self.client.secret_path(self.project_id, secret_id) + policy = self.client.get_iam_policy(request={"resource": resource_name}) + + members = [f"user:{user_email}" for user_email in allowed_users] + + binding_found = False + for binding in policy.bindings: + if binding.role == accessor_role: + binding.members[:] = members + self.logger.debug(f"Replaced members for role '{accessor_role}' in secret '{secret_id}' with: {allowed_users}") + binding_found = True + break + + if not binding_found: + policy.bindings.add( + role=accessor_role, + members=members + ) + self.logger.debug(f"Created new binding for role '{accessor_role}' in secret '{secret_id}'") + + self.client.set_iam_policy( + request={ + "resource": resource_name, + "policy": policy + } + ) + + self.logger.info(f"Successfully updated access for secret '{secret_id}' to allow users: {allowed_users}") + + def _get_secret_versions(self, secret_id: str) -> List[secretmanager.SecretVersion]: + """ + Retrieves all versions of a secret. + + Args: + secret_id (str): The ID of the secret to list versions for. + Returns: + List[secretmanager.SecretVersion]: A list of secret versions. + """ + self.logger.debug(f"Retrieving versions for secret '{secret_id}'") + + if not self._secret_exists(secret_id): + self.logger.debug(f"Secret '{secret_id}' does not exist, cannot retrieve versions") + return [] + + parent = self.client.secret_path(self.project_id, secret_id) + versions = list(self.client.list_secret_versions(request={"parent": parent})) + self.logger.debug(f"Found {len(versions)} versions for secret '{secret_id}'") + return versions + + def _get_enabled_secret_versions(self, secret_id: str) -> List[secretmanager.SecretVersion]: + """ + Retrieves all enabled versions of a secret. + + Args: + secret_id (str): The ID of the secret to list enabled versions for. + Returns: + List[secretmanager.SecretVersion]: A list of enabled secret versions. + """ + self.logger.debug(f"Retrieving enabled versions for secret '{secret_id}'") + versions = self._get_secret_versions(secret_id) + enabled_versions = [version for version in versions if version.state == secretmanager.SecretVersion.State.ENABLED] + self.logger.debug(f"Found {len(enabled_versions)} enabled versions for secret '{secret_id}'") + return enabled_versions + + def _secret_version_exists(self, secret_id: str, version_id: str) -> bool: + """ + Checks if a specific version of a secret exists. + + Args: + secret_id (str): The ID of the secret to check. + version_id (str): The ID of the version to check. + Returns: + bool: True if the version exists, False otherwise. + """ + self.logger.debug(f"Checking if version '{version_id}' exists for secret '{secret_id}'") + if not self._secret_exists(secret_id): + self.logger.debug(f"Secret '{secret_id}' does not exist, version cannot exist") + return False + + versions = self._get_secret_versions(secret_id) + exists = any(version.name.split("/")[-1] == version_id for version in versions) + self.logger.debug(f"Version '{version_id}' exists: {exists}") + return exists + + def _secret_version_is_enabled(self, secret_id: str, version_id: str) -> bool: + """ + Checks if a specific version of a secret is enabled. + + Args: + secret_id (str): The ID of the secret to check. + version_id (str): The ID of the version to check. + Returns: + bool: True if the version is enabled, False otherwise. + """ + self.logger.debug(f"Checking if version '{version_id}' of secret '{secret_id}' is enabled") + if not self._secret_exists(secret_id): + self.logger.debug(f"Secret '{secret_id}' does not exist, version cannot be enabled") + return False + + versions = self._get_secret_versions(secret_id) + for version in versions: + if version.name.split("/")[-1] == version_id: + is_enabled = version.state == secretmanager.SecretVersion.State.ENABLED + self.logger.debug(f"Version '{version_id}' is enabled: {is_enabled}") + return is_enabled + self.logger.debug(f"Version '{version_id}' does not exist for secret '{secret_id}'") + return False + + def _purge_old_secret_versions(self, secret_id: str) -> None: + """ + Purges old secret versions that are not enabled and exceed the maximum allowed versions. + + Args: + secret_id (str): The ID of the secret to purge old versions from. + """ + self.logger.debug(f"Purging old versions for secret '{secret_id}'") + versions = self._get_secret_versions(secret_id) + enabled_versions = [v for v in versions if v.state == secretmanager.SecretVersion.State.ENABLED] + + if len(enabled_versions) > self.max_versions_to_keep: + versions_to_purge = enabled_versions[:len(enabled_versions) - self.max_versions_to_keep] + self.logger.debug(f"Found {len(versions_to_purge)} versions to purge for secret '{secret_id}'") + for version in versions_to_purge: + version_id = version.name.split("/")[-1] + self.logger.debug(f"Disabling version '{version_id}' of secret '{secret_id}'") + self.disable_secret_version(secret_id, version_id=version_id) + else: + self.logger.debug(f"No versions to purge for secret '{secret_id}', current count: {len(enabled_versions)}") + + + def _get_latest_secret_version_id(self, secret_id: str) -> str: + """ + Retrieves the latest enabled version of a secret. + + Args: + secret_id (str): The ID of the secret to retrieve the latest version for. + Returns: + str: The name of the latest secret version. + """ + self.logger.debug(f"Retrieving latest enabled version of secret '{secret_id}'") + + for version in self._get_secret_versions(secret_id): + if version.state == secretmanager.SecretVersion.State.ENABLED: + version_id = version.name.split("/")[-1] + self.logger.debug(f"Found latest enabled version '{version_id}' for secret '{secret_id}'") + return version_id + error_msg = f"No enabled versions found for secret {secret_id}." + self.logger.error(error_msg) + raise ValueError(error_msg) + + def _get_oldest_secret_version_id(self, secret_id: str) -> str: + """ + Retrieves the oldest version of a secret. + + Args: + secret_id (str): The ID of the secret to retrieve the oldest version for. + Returns: + str: The name of the oldest secret version. + """ + + self.logger.debug(f"Retrieving oldest version of secret '{secret_id}'") + for version in reversed(self._get_secret_versions(secret_id)): + if version.state == secretmanager.SecretVersion.State.ENABLED: + version_id = version.name.split("/")[-1] + self.logger.debug(f"Found oldest enabled version '{version_id}' for secret '{secret_id}'") + return version_id + error_msg = f"No enabled versions found for secret {secret_id}." + self.logger.error(error_msg) + raise ValueError(error_msg) + + def _is_key_rotation_due(self, secret_id: str) -> bool: + """ + Checks if the key rotation is due based on the last version created timestamp. + + Args: + secret_id (str): The ID of the secret to check. + Returns: + bool: True if the key rotation is due, False otherwise. + """ + self.logger.debug(f"Checking if key rotation is due for secret '{secret_id}'") + secret = self.get_secret(secret_id) + last_version_created_at = secret.labels.get("last_version_created_at") + + if not last_version_created_at: + self.logger.debug(f"No last version created timestamp found for secret '{secret_id}'") + return False Review Comment: probably this should be `True`? If we don't know when the last version was created, probably we should force a rotation. Thoughts? ########## infra/keys/config.yaml: ########## @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is the configuration file for the secrets rotation service. +# It defines the parameters for the secrets rotation process. + +# GENERAL CONFIGURATION + +# The project ID where the secrets rotation service will run +project_id: "testing-me-460223" Review Comment: TODO: change this before merging ########## infra/keys/README.md: ########## @@ -0,0 +1,93 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Service Account Management + +This module is used to manage Google Cloud service accounts, including creating, retrieving, enabling, and deleting service accounts and their keys. It uses the Google Cloud IAM API to perform these operations. + +## Features + +- Create service accounts and service account keys. +- Store service account keys securely in Google Secret Manager. +- Rotate service account keys regularly. +- Enable and disable service accounts. + +## Usage + +We use the main.py script to manage service account keys. The script can be run with different commands to create, rotate, or retrieve service account keys. + +### Prerequisites + +- Google Cloud SDK installed and configured. +- Appropriate permissions to manage service accounts and secrets in your Google Cloud project. +- Required Python packages installed (see requirements.txt). + +### Configuration + +#### config.yaml + +This file contains configuration settings for the service account management, including project ID, key rotation settings, and logging configuration. + +#### keys.yaml + +All the service accounts are managed through a configuration file in YAML format, `keys.yaml`. This file contains the necessary information about each service account, including its ID, display name, and authorized users. + +```yaml +service_accounts: + - account_id: my-service-account + display_name: My Service Account + authorized_users: + - email: us...@example.com + - email: us...@example.com +``` + +Where: + +- `account_id`: The unique identifier for the service account. +- `display_name`: A human-readable name for the service account. +- `authorized_users`: A list of users who will be granted access to the service account. + +The accounts defined in this file will be created if they do not already exist when running the script. + +### Rotation Review Comment: It's good that you are explaining how the script works - however most users should not need the command - and instead should only edit `keys.yaml` - and a Github Action can provision their changes (right?) So move all of this documentation to an "Advanced usage" section, and make sure to mention something like "a regular user will interact with this script by editing keys.yaml only. This section is for project administrators only." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org