gemini-code-assist[bot] commented on code in PR #35848: URL: https://github.com/apache/beam/pull/35848#discussion_r2274237090
########## infra/enforcement/requirements.txt: ########## @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is used to install the dependencies for the infrastructure + +PyYAML==6.0.2 +google-cloud-iam==2.19.0 +google-cloud-resource-manager==1.14.1 +google-cloud-secret-manager==2.24.0 +google-cloud==0.34.0 Review Comment:  The `google-cloud` meta-package is deprecated and has not been updated since 2018. It can pull in outdated dependencies and is no longer recommended for use. Since you are already specifying the required `google-cloud-*` components individually, this package is redundant and should be removed to prevent potential dependency conflicts. ########## infra/enforcement/iam.py: ########## @@ -0,0 +1,339 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import datetime +import logging +import os +import requests +import sys +import yaml +from google.api_core import exceptions +from google.cloud import resourcemanager_v3 +from typing import Optional, List, Dict, Tuple + +CONFIG_FILE = "config.yml" + +class IAMPolicyComplianceChecker: + def __init__(self, project_id: str, users_file: str, logger: logging.Logger): + self.project_id = project_id + self.users_file = users_file + self.client = resourcemanager_v3.ProjectsClient() + self.logger = logger + + def _parse_member(self, member: str) -> tuple[str, Optional[str], str]: + """Parses an IAM member string to extract type, email, and a derived username. + + Args: + member: The IAM member string + Returns: + A tuple containing: + - username: The derived username from the member string. + - email: The email address if available, otherwise None. + - member_type: The type of the member (e.g., user, serviceAccount, group). + """ + email = None + username = member + + # Split the member string to determine type and identifier + parts = member.split(':', 1) + member_type = parts[0] if len(parts) > 1 else "unknown" + identifier = parts[1] if len(parts) > 1 else member + + if member_type in ["user", "serviceAccount", "group"]: + email = identifier + if '@' in identifier: + username = identifier.split('@')[0] + else: + username = identifier + else: + username = identifier + member_type = "unknown" + email = None + + return username, email, member_type + + def _export_project_iam(self) -> List[Dict]: + """Exports the IAM policy for a given project to YAML format. + + Returns: + A list of dictionaries containing the IAM policy details. + """ + + try: + policy = self.client.get_iam_policy(resource=f"projects/{self.project_id}") + self.logger.debug(f"Retrieved IAM policy for project {self.project_id}") + except exceptions.NotFound as e: + self.logger.error(f"Project {self.project_id} not found: {e}") + raise + except exceptions.PermissionDenied as e: + self.logger.error(f"Permission denied for project {self.project_id}: {e}") + raise + except Exception as e: + self.logger.error(f"An error occurred while retrieving IAM policy for project {self.project_id}: {e}") + raise + + members_data = {} + + for binding in policy.bindings: + role = binding.role + + for member_str in binding.members: + if member_str not in members_data: + username, email_address, member_type = self._parse_member(member_str) + if member_type == "unknown": + self.logger.warning(f"Skipping member {member_str} with no email address") + continue # Skip if no email address is found, probably a malformed member + members_data[member_str] = { + "username": username, + "email": email_address, + "permissions": [] + } + + # Skip permissions that have a condition + if "withcond" in role: + continue + + permission_entry = {} + permission_entry["role"] = role + + members_data[member_str]["permissions"].append(permission_entry) + + output_list = [] + for data in members_data.values(): + data["permissions"] = sorted(data["permissions"], key=lambda p: p["role"]) + output_list.append({ + "username": data["username"], + "email": data["email"], + "permissions": data["permissions"] + }) + + output_list.sort(key=lambda x: x["username"]) + return output_list + + def _read_project_iam_file(self) -> List[Dict]: + """Reads the IAM policy from a YAML file. + + Returns: + A list of dictionaries containing the IAM policy details. + """ + try: + with open(self.users_file, "r") as file: + iam_policy = yaml.safe_load(file) + + + self.logger.debug(f"Retrieved IAM policy from file for project {self.project_id}") + return iam_policy + except FileNotFoundError: + self.logger.error(f"IAM policy file not found for project {self.project_id}") + return [] + except Exception as e: + self.logger.error(f"An error occurred while reading IAM policy file for project {self.project_id}: {e}") + return [] + + def _to_yaml_file(self, data: List[Dict], output_file: str, header_info: str = "") -> None: + """ + Writes a list of dictionaries to a YAML file. + Include the apache license header on the files + + Args: + data: A list of dictionaries containing user permissions and details. + output_file: The file path where the YAML output will be written. + header_info: A string containing the header information to be included in the YAML file. + """ + + apache_license_header = """# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """ + + # Prepare the header with the Apache license + header = f"{apache_license_header}\n# {header_info}\n# Generated on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n" + + try: + with open(output_file, "w") as file: + file.write(header) + yaml.dump(data, file, sort_keys=False, default_flow_style=False, indent=2) + self.logger.info(f"Successfully wrote IAM policy data to {output_file}") + except IOError as e: + self.logger.error(f"Failed to write to {output_file}: {e}") + raise + + def check_compliance(self) -> List[str]: + """ + Checks the compliance of the IAM policy against the defined policies. + + Returns: + A list of strings describing any compliance issues found. + """ + current_policy = self._export_project_iam() + existing_policy = self._read_project_iam_file() + + if not existing_policy: + error_msg = f"No IAM policy found in the {self.users_file}." + self.logger.info(error_msg) + raise RuntimeError(error_msg) + + differences = [] + + for current_user in current_policy: + found = False + for existing_user in existing_policy: + if current_user["email"] == existing_user["email"]: + found = True + if current_user["permissions"] != existing_user["permissions"]: + msg = f"\nPermissions for user {current_user['email']} differ." + msg += f"\nIn GCP: {current_user['permissions']}" + msg += f"\nIn {self.users_file}: {existing_user['permissions']}" + self.logger.info(msg) + differences.append(msg) + break + if not found: + differences.append(f"User {current_user['email']} not found in existing policy.") Review Comment:  The `check_compliance` function currently performs a one-way check. It verifies if users in GCP exist in the policy file and if their permissions match. However, it does not check for users that are in the policy file but have been removed from GCP. This can lead to stale entries in the `users.yml` file. The check should be made bidirectional to also report users that need to be removed from the policy file. You can add the following logic after the existing loop to find and report these users: ```python existing_emails = {user['email'] for user in existing_policy} current_emails = {user['email'] for user in current_policy} removed_users = existing_emails - current_emails for email in removed_users: differences.append(f"User {email} found in policy file but not in GCP.") ``` ########## infra/enforcement/account_keys.py: ########## @@ -0,0 +1,460 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import logging +import sys +import yaml +import argparse +import os +import requests +from typing import List, Dict, TypedDict +from google.cloud import secretmanager +from google.cloud import iam_admin_v1 +from google.cloud.iam_admin_v1 import types + +SECRET_MANAGER_LABEL = "beam-infra-secret-manager" + +class AuthorizedUser(TypedDict): + email: str + +class ServiceAccount(TypedDict): + account_id: str + display_name: str + authorized_users: List[AuthorizedUser] + +class ServiceAccountsConfig(TypedDict): + service_accounts: List[ServiceAccount] + +CONFIG_FILE = "config.yml" + +class AccountKeysPolicyComplianceCheck: + def __init__(self, project_id: str, service_account_keys_file: str, logger: logging.Logger): + self.project_id = project_id + self.service_account_keys_file = service_account_keys_file + self.logger = logger + self.secret_client = secretmanager.SecretManagerServiceClient() + self.service_account_client = iam_admin_v1.IAMClient() + + def _normalize_account_email(self, account_id: str) -> str: + """ + Normalizes the account identifier to a full email format. + + Args: + account_id (str): The unique identifier or email of the service account. + + Returns: + str: The full service account email address. + """ + if "@" in account_id and account_id.endswith(".iam.gserviceaccount.com"): + return account_id + else: + return f"{account_id}@{self.project_id}.iam.gserviceaccount.com" + + def _denormalize_account_email(self, email: str) -> str: + """ + Denormalizes the full service account email address to its unique identifier. + + Args: + email (str): The full service account email address. + + Returns: + str: The unique identifier for the service account. + """ + if email.endswith(f"@{self.project_id}.iam.gserviceaccount.com"): + return email.split("@")[0] + return email + + def _normalize_username(self, username: str) -> str: + """ + Normalizes the username to a consistent format. + + Args: + username (str): The username to normalize. + + Returns: + str: The normalized username. + """ + if not username.startswith("user:"): + return f"user:{username.strip().lower()}" + return username + + def _denormalize_username(self, username: str) -> str: + """ + Denormalizes the username from the consistent format. + + Args: + username (str): The normalized username. + + Returns: + str: The denormalized username. + """ + if username.startswith("user:"): + return username.split(":", 1)[1].strip().lower() + return username + + def _get_all_live_service_accounts(self) -> List[str]: + """ + Retrieves all service accounts that are currently active (not disabled) in the project. + + Returns: + List[str]: A list of email addresses for all live service accounts. + """ + request = types.ListServiceAccountsRequest() + request.name = f"projects/{self.project_id}" + + try: + accounts = self.service_account_client.list_service_accounts(request=request) + self.logger.debug(f"Retrieved {len(accounts.accounts)} service accounts for project {self.project_id}") + + if not accounts: + self.logger.warning(f"No service accounts found in project {self.project_id}.") + return [] + + return [self._normalize_account_email(account.email) for account in accounts.accounts if not account.disabled] + except Exception as e: + self.logger.error(f"Failed to retrieve service accounts for project {self.project_id}: {e}") + raise + + def _get_all_live_managed_secrets(self) -> List[str]: + """ + Retrieves the list of secrets from the Secret Manager that where created by the beam-secret-service + + Returns: + List[str]: A list of secret ids + """ + try: + secrets = list(self.secret_client.list_secrets(request={"parent": f"projects/{self.project_id}"})) + self.logger.debug(f"Retrieved {len(secrets)} secrets for project {self.project_id}") + + if not secrets: + self.logger.warning(f"No secrets found in project {self.project_id}.") + return [] + + return [secret.name.split("/")[-1] for secret in secrets if "created_by" in secret.labels and secret.labels["created_by"] == SECRET_MANAGER_LABEL] + except Exception as e: + self.logger.error(f"Failed to retrieve secrets for project {self.project_id}: {e}") + raise + + def _get_all_secret_authorized_users(self, secret_id: str) -> List[str]: + """ + Retrieves a list of all users who have access to the secrets in the Secret Manager. + + Args: + secret_id (str): The ID of the secret to check access for. + Returns: + List[str]: A list of email addresses for all users authorized to access the secrets. + """ + accessor_role = "roles/secretmanager.secretAccessor" + resource_name = self.secret_client.secret_path(self.project_id, secret_id) + + try: + policy = self.secret_client.get_iam_policy(request={"resource": resource_name}) + self.logger.debug(f"Retrieved IAM policy for secret '{secret_id}': {policy}") + + if not policy.bindings: + self.logger.warning(f"No IAM bindings found for secret '{secret_id}'.") + return [] + + authorized_users = [] + for binding in policy.bindings: + if binding.role == accessor_role: + for user in binding.members: + authorized_users.append(self._normalize_username(user)) + + return authorized_users + except Exception as e: + self.logger.error(f"Failed to get IAM policy for secret '{secret_id}': {e}") + raise + + def _read_service_account_keys(self) -> ServiceAccountsConfig: + """ + Reads the service account keys from a YAML file and returns a list of ServiceAccount objects. + + Returns: + List[ServiceAccount]: A list of service account declarations. + """ + try: + with open(self.service_account_keys_file, "r") as file: + keys = yaml.safe_load(file) + + if not keys: + raise ValueError("No service account keys found.") + + return keys + except IOError as e: + error_msg = f"Failed to read service account keys from {self.service_account_keys_file}: {e}" + self.logger.error(error_msg) + raise + + def _to_yaml_file(self, data: List[ServiceAccount], output_file: str, header_info: str = "") -> None: + """ + Writes a list of dictionaries to a YAML file. + Include the apache license header on the files + + Args: + data: A list of dictionaries containing user permissions and details. + output_file: The file path where the YAML output will be written. + header_info: A string containing the header information to be included in the YAML file. + """ + + apache_license_header = """# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """ + + # Prepare the header with the Apache license + header = f"{apache_license_header}\n# {header_info}\n# Generated on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n" + + try: + with open(output_file, "w") as file: + file.write(header) + yaml_data = {"service_accounts": data} + yaml.dump(yaml_data, file, sort_keys=False, default_flow_style=False, indent=2) + self.logger.info(f"Successfully wrote Service Account Keys policy data to {output_file}") + except IOError as e: + self.logger.error(f"Failed to write to {output_file}: {e}") + + + def check_compliance(self) -> List[str]: + """ + Checks the compliance of service account keys with the defined policies. + + Returns: + List[str]: A list of compliance issue messages. + """ + + file_service_accounts = self._read_service_account_keys()["service_accounts"] + + if not file_service_accounts: + error_msg = f"No service account keys found in the {self.service_account_keys_file}." + self.logger.info(error_msg) + raise RuntimeError(error_msg) + + compliance_issues = [] + + # Check that all service accounts that exist are declared + for service_account in self._get_all_live_service_accounts(): + if self._denormalize_account_email(service_account) not in [account["account_id"] for account in file_service_accounts]: + msg = f"Service account '{service_account}' is not declared in the service account keys file." + compliance_issues.append(msg) + self.logger.warning(msg) + + managed_secrets = self._get_all_live_managed_secrets() + extracted_secrets = [f"{self._denormalize_account_email(account['account_id'])}-key" for account in file_service_accounts] + + # Check for managed secrets that are not declared + for secret in managed_secrets: + if secret not in extracted_secrets: + msg = f"Managed secret '{secret}' is not declared in the service account keys file." + compliance_issues.append(msg) + self.logger.warning(msg) + + # Check for each managed secret if it has the correct permissions + for account in file_service_accounts: + secret_name = f"{self._denormalize_account_email(account['account_id'])}-key" + if secret_name not in managed_secrets: + # Skip accounts that don't have managed secrets + continue + + authorized_users = [user["email"] for user in account["authorized_users"]] + actual_users = [self._denormalize_username(user) for user in self._get_all_secret_authorized_users(secret_name)] + + # Sort both lists for proper comparison + authorized_users.sort() + actual_users.sort() + + if authorized_users != actual_users: + msg = f"Managed secret '{account['account_id']}' does not have the correct permissions. Expected: {authorized_users}, Actual: {actual_users}" + compliance_issues.append(msg) + self.logger.warning(msg) + + return compliance_issues + + def create_github_issue(self) -> None: + # Implement GitHub issue creation logic here + diff = self.check_compliance() + + if not diff: + self.logger.info("No compliance issues found.") + return + + title = f"Account Keys Compliance Issue Detected" + body = f"Account keys for project {self.project_id} are not compliant with the defined policies on {self.service_account_keys_file}\n\n" + for issue in diff: + body += f"- {issue}\n" + + repo = os.getenv("GITHUB_REPOSITORY", "apache/beam") + token = os.getenv("GITHUB_TOKEN") + + url = f"https://api.github.com/repos/{repo}/issues" + + headers = { + "Authorization": f"token {token}", + "Accept": "application/vnd.github.v3+json" + } + + payload = { + "title": title, + "body": body + } + + response = requests.post(url, headers=headers, json=payload) + if response.status_code == 201: + self.logger.info(f"Successfully created GitHub issue: {title}") + else: + self.logger.error(f"Failed to create GitHub issue: {response.content}") + raise RuntimeError("Failed to create GitHub issue.") + + def generate_compliance(self) -> None: + """ + Modifies the service account keys file to match the current state of service accounts and secrets. + It will just add the non managed service accounts. + """ + + file_service_accounts = self._read_service_account_keys()["service_accounts"] + + if not file_service_accounts: + error_msg = f"No service account keys found in the {self.service_account_keys_file}." + self.logger.info(error_msg) + raise RuntimeError(error_msg) + + compliance_issues = [] + + # Check that all service accounts that exist are declared, if not, add them + for service_account in self._get_all_live_service_accounts(): + if self._denormalize_account_email(service_account) not in [account["account_id"] for account in file_service_accounts]: + self.logger.info(f"Service account '{service_account}' is not declared in the service account keys file, adding it") + file_service_accounts.append({ + "account_id": self._denormalize_account_email(service_account), + "display_name": service_account, + "authorized_users": [] + }) + + managed_secrets = self._get_all_live_managed_secrets() + extracted_secrets = [f"{self._denormalize_account_email(account['account_id'])}-key" for account in file_service_accounts] + + # Check for managed secrets that are not declared, if not, add them + for secret in managed_secrets: + if secret not in extracted_secrets: + self.logger.info(f"Managed secret '{secret}' is not declared in the service account keys file, adding it") + file_service_accounts.append({ + "account_id": secret.strip("-key"), + "display_name": self._normalize_account_email(secret.strip("-key")), + "authorized_users": [] + }) + + # Check for each managed secret if it has the correct permissions + for account in file_service_accounts: + authorized_users = [user["email"] for user in account["authorized_users"]] + # If no authorized users, its new, skip + if not authorized_users: + self.logger.info(f"Managed secret '{account}' is new, skipping permission check") + continue + + users = self._get_all_secret_authorized_users(f"{self._denormalize_account_email(account['account_id'])}-key") + users = [self._denormalize_username(user) for user in users] + if authorized_users != users: + self.logger.info(f"Managed secret '{account}' does not have the correct permissions, updating it") + for user in users: + if user not in authorized_users: + self.logger.info(f"Adding user '{user}' to managed secret '{account}'") + account["authorized_users"].append({"email": user}) Review Comment:  The logic for updating permissions in `generate_compliance` is incorrect. It only appends users who are missing from the file but present in GCP, and it doesn't remove users who have been revoked access in GCP but are still in the file. Additionally, the list comparison `authorized_users != users` is unreliable as it's order-sensitive. The logic should be updated to fully synchronize the permissions from GCP to the file, replacing the existing list of users. ```python for account in file_service_accounts: secret_name = f"{self._denormalize_account_email(account['account_id'])}-key" if secret_name not in managed_secrets: continue authorized_users = sorted([user["email"] for user in account["authorized_users"]]) actual_users_normalized = self._get_all_secret_authorized_users(secret_name) actual_users = sorted([self._denormalize_username(user) for user in actual_users_normalized]) if authorized_users != actual_users: self.logger.info(f"Managed secret '{account['account_id']}' does not have the correct permissions, updating it") account["authorized_users"] = [{"email": user} for user in actual_users] ``` ########## infra/enforcement/account_keys.py: ########## @@ -0,0 +1,460 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import logging +import sys +import yaml +import argparse +import os +import requests +from typing import List, Dict, TypedDict +from google.cloud import secretmanager +from google.cloud import iam_admin_v1 +from google.cloud.iam_admin_v1 import types + +SECRET_MANAGER_LABEL = "beam-infra-secret-manager" + +class AuthorizedUser(TypedDict): + email: str + +class ServiceAccount(TypedDict): + account_id: str + display_name: str + authorized_users: List[AuthorizedUser] + +class ServiceAccountsConfig(TypedDict): + service_accounts: List[ServiceAccount] + +CONFIG_FILE = "config.yml" + +class AccountKeysPolicyComplianceCheck: + def __init__(self, project_id: str, service_account_keys_file: str, logger: logging.Logger): + self.project_id = project_id + self.service_account_keys_file = service_account_keys_file + self.logger = logger + self.secret_client = secretmanager.SecretManagerServiceClient() + self.service_account_client = iam_admin_v1.IAMClient() + + def _normalize_account_email(self, account_id: str) -> str: + """ + Normalizes the account identifier to a full email format. + + Args: + account_id (str): The unique identifier or email of the service account. + + Returns: + str: The full service account email address. + """ + if "@" in account_id and account_id.endswith(".iam.gserviceaccount.com"): + return account_id + else: + return f"{account_id}@{self.project_id}.iam.gserviceaccount.com" + + def _denormalize_account_email(self, email: str) -> str: + """ + Denormalizes the full service account email address to its unique identifier. + + Args: + email (str): The full service account email address. + + Returns: + str: The unique identifier for the service account. + """ + if email.endswith(f"@{self.project_id}.iam.gserviceaccount.com"): + return email.split("@")[0] + return email + + def _normalize_username(self, username: str) -> str: + """ + Normalizes the username to a consistent format. + + Args: + username (str): The username to normalize. + + Returns: + str: The normalized username. + """ + if not username.startswith("user:"): + return f"user:{username.strip().lower()}" + return username + + def _denormalize_username(self, username: str) -> str: + """ + Denormalizes the username from the consistent format. + + Args: + username (str): The normalized username. + + Returns: + str: The denormalized username. + """ + if username.startswith("user:"): + return username.split(":", 1)[1].strip().lower() + return username + + def _get_all_live_service_accounts(self) -> List[str]: + """ + Retrieves all service accounts that are currently active (not disabled) in the project. + + Returns: + List[str]: A list of email addresses for all live service accounts. + """ + request = types.ListServiceAccountsRequest() + request.name = f"projects/{self.project_id}" + + try: + accounts = self.service_account_client.list_service_accounts(request=request) + self.logger.debug(f"Retrieved {len(accounts.accounts)} service accounts for project {self.project_id}") + + if not accounts: + self.logger.warning(f"No service accounts found in project {self.project_id}.") + return [] + + return [self._normalize_account_email(account.email) for account in accounts.accounts if not account.disabled] + except Exception as e: + self.logger.error(f"Failed to retrieve service accounts for project {self.project_id}: {e}") + raise + + def _get_all_live_managed_secrets(self) -> List[str]: + """ + Retrieves the list of secrets from the Secret Manager that where created by the beam-secret-service + + Returns: + List[str]: A list of secret ids + """ + try: + secrets = list(self.secret_client.list_secrets(request={"parent": f"projects/{self.project_id}"})) + self.logger.debug(f"Retrieved {len(secrets)} secrets for project {self.project_id}") + + if not secrets: + self.logger.warning(f"No secrets found in project {self.project_id}.") + return [] + + return [secret.name.split("/")[-1] for secret in secrets if "created_by" in secret.labels and secret.labels["created_by"] == SECRET_MANAGER_LABEL] + except Exception as e: + self.logger.error(f"Failed to retrieve secrets for project {self.project_id}: {e}") + raise + + def _get_all_secret_authorized_users(self, secret_id: str) -> List[str]: + """ + Retrieves a list of all users who have access to the secrets in the Secret Manager. + + Args: + secret_id (str): The ID of the secret to check access for. + Returns: + List[str]: A list of email addresses for all users authorized to access the secrets. + """ + accessor_role = "roles/secretmanager.secretAccessor" + resource_name = self.secret_client.secret_path(self.project_id, secret_id) + + try: + policy = self.secret_client.get_iam_policy(request={"resource": resource_name}) + self.logger.debug(f"Retrieved IAM policy for secret '{secret_id}': {policy}") + + if not policy.bindings: + self.logger.warning(f"No IAM bindings found for secret '{secret_id}'.") + return [] + + authorized_users = [] + for binding in policy.bindings: + if binding.role == accessor_role: + for user in binding.members: + authorized_users.append(self._normalize_username(user)) + + return authorized_users + except Exception as e: + self.logger.error(f"Failed to get IAM policy for secret '{secret_id}': {e}") + raise + + def _read_service_account_keys(self) -> ServiceAccountsConfig: + """ + Reads the service account keys from a YAML file and returns a list of ServiceAccount objects. + + Returns: + List[ServiceAccount]: A list of service account declarations. + """ + try: + with open(self.service_account_keys_file, "r") as file: + keys = yaml.safe_load(file) + + if not keys: + raise ValueError("No service account keys found.") + + return keys + except IOError as e: + error_msg = f"Failed to read service account keys from {self.service_account_keys_file}: {e}" + self.logger.error(error_msg) + raise + + def _to_yaml_file(self, data: List[ServiceAccount], output_file: str, header_info: str = "") -> None: + """ + Writes a list of dictionaries to a YAML file. + Include the apache license header on the files + + Args: + data: A list of dictionaries containing user permissions and details. + output_file: The file path where the YAML output will be written. + header_info: A string containing the header information to be included in the YAML file. + """ + + apache_license_header = """# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """ + + # Prepare the header with the Apache license + header = f"{apache_license_header}\n# {header_info}\n# Generated on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n" + + try: + with open(output_file, "w") as file: + file.write(header) + yaml_data = {"service_accounts": data} + yaml.dump(yaml_data, file, sort_keys=False, default_flow_style=False, indent=2) + self.logger.info(f"Successfully wrote Service Account Keys policy data to {output_file}") + except IOError as e: + self.logger.error(f"Failed to write to {output_file}: {e}") + + + def check_compliance(self) -> List[str]: + """ + Checks the compliance of service account keys with the defined policies. + + Returns: + List[str]: A list of compliance issue messages. + """ + + file_service_accounts = self._read_service_account_keys()["service_accounts"] + + if not file_service_accounts: + error_msg = f"No service account keys found in the {self.service_account_keys_file}." + self.logger.info(error_msg) + raise RuntimeError(error_msg) + + compliance_issues = [] + + # Check that all service accounts that exist are declared + for service_account in self._get_all_live_service_accounts(): + if self._denormalize_account_email(service_account) not in [account["account_id"] for account in file_service_accounts]: + msg = f"Service account '{service_account}' is not declared in the service account keys file." + compliance_issues.append(msg) + self.logger.warning(msg) + + managed_secrets = self._get_all_live_managed_secrets() + extracted_secrets = [f"{self._denormalize_account_email(account['account_id'])}-key" for account in file_service_accounts] + + # Check for managed secrets that are not declared + for secret in managed_secrets: + if secret not in extracted_secrets: + msg = f"Managed secret '{secret}' is not declared in the service account keys file." + compliance_issues.append(msg) + self.logger.warning(msg) + + # Check for each managed secret if it has the correct permissions + for account in file_service_accounts: + secret_name = f"{self._denormalize_account_email(account['account_id'])}-key" + if secret_name not in managed_secrets: + # Skip accounts that don't have managed secrets + continue + + authorized_users = [user["email"] for user in account["authorized_users"]] + actual_users = [self._denormalize_username(user) for user in self._get_all_secret_authorized_users(secret_name)] + + # Sort both lists for proper comparison + authorized_users.sort() + actual_users.sort() + + if authorized_users != actual_users: + msg = f"Managed secret '{account['account_id']}' does not have the correct permissions. Expected: {authorized_users}, Actual: {actual_users}" + compliance_issues.append(msg) + self.logger.warning(msg) + + return compliance_issues + + def create_github_issue(self) -> None: + # Implement GitHub issue creation logic here + diff = self.check_compliance() + + if not diff: + self.logger.info("No compliance issues found.") + return + + title = f"Account Keys Compliance Issue Detected" + body = f"Account keys for project {self.project_id} are not compliant with the defined policies on {self.service_account_keys_file}\n\n" + for issue in diff: + body += f"- {issue}\n" + + repo = os.getenv("GITHUB_REPOSITORY", "apache/beam") + token = os.getenv("GITHUB_TOKEN") + + url = f"https://api.github.com/repos/{repo}/issues" + + headers = { + "Authorization": f"token {token}", + "Accept": "application/vnd.github.v3+json" + } + + payload = { + "title": title, + "body": body + } + + response = requests.post(url, headers=headers, json=payload) + if response.status_code == 201: + self.logger.info(f"Successfully created GitHub issue: {title}") + else: + self.logger.error(f"Failed to create GitHub issue: {response.content}") + raise RuntimeError("Failed to create GitHub issue.") + + def generate_compliance(self) -> None: + """ + Modifies the service account keys file to match the current state of service accounts and secrets. + It will just add the non managed service accounts. + """ + + file_service_accounts = self._read_service_account_keys()["service_accounts"] + + if not file_service_accounts: + error_msg = f"No service account keys found in the {self.service_account_keys_file}." + self.logger.info(error_msg) + raise RuntimeError(error_msg) + + compliance_issues = [] Review Comment:  The variable `compliance_issues` is initialized but never used within the `generate_compliance` method. It should be removed to improve code clarity. ########## infra/enforcement/requirements.txt: ########## @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is used to install the dependencies for the infrastructure + +PyYAML==6.0.2 Review Comment:  The `PyYAML==6.0.2` package version does not exist on PyPI, which will cause `pip install` to fail. The latest available version is `6.0.1`. Please correct the version number. ``` PyYAML==6.0.1 ``` ########## infra/enforcement/README.md: ########## @@ -0,0 +1,165 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Infrastructure rules enforcement + +This module is used to check that the infrastructure roles are being used. Review Comment:  The word "roles" appears to be a typo and should likely be "rules" to match the title of this document, "Infrastructure rules enforcement", and to improve clarity. ```suggestion This module is used to check that the infrastructure rules are being used. ``` ########## infra/enforcement/requirements.txt: ########## @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is used to install the dependencies for the infrastructure + +PyYAML==6.0.2 +google-cloud-iam==2.19.0 +google-cloud-resource-manager==1.14.1 +google-cloud-secret-manager==2.24.0 +google-cloud==0.34.0 +google-crc32c==1.7.1 Review Comment:  This file is missing a newline character at the end. While many tools handle this gracefully, it's a standard convention to end files with a newline to ensure compatibility with various command-line tools and editors. ``` google-crc32c==1.7.1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org