This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bb63475748c Add infra policy compliance checkers (#35848)
bb63475748c is described below
commit bb63475748c5e10d0988b8b779e5ca2724b044c2
Author: Enrique Calderon <[email protected]>
AuthorDate: Mon Aug 18 13:46:57 2025 -0600
Add infra policy compliance checkers (#35848)
* Add IAM policy compliance checker and configuration files
`infra/iam/generate` has been removed as it has been assimilated into
`iam.py`
* fix typo
* standardize the keys yaml
* Fix error on IAM not found
* Add service account keys compliance checker
* Improve readme by adding details, formats and Account Keys info
* Modified account keys compliance check class to allow reading an empty
keys.yaml
* Change the project id to apache beam
* Allow service accounts on terraform
* Proposed changes to make the policy compliant
It is adding some user permission changes and the service accounts roles.
* Proposed account keys changes to make it compliant
**Warning**: This commit modifies service account keys, clearing them and
starting them as custom, modified the generated file to just include the keys
you want to manage
* Solution for issues found by gemini-code-assitant
* Implement SendingClient for GitHub issue notifications and email alerts
* Restore original configuration files
* Add license to SendingClient
* Implement a print announcement to avoid issues and email all together
* Added announcement functionality in AccountKeys and IAM checkers
- Added SendingClient integration for creating and printing compliance
announcements.
- Updated main execution flow to support new announcement actions.
- Improved logging and error handling for announcement processes.
---
infra/enforcement/README.md | 165 ++++++++++++
infra/enforcement/account_keys.py | 523 +++++++++++++++++++++++++++++++++++++
infra/enforcement/config.yml | 37 +++
infra/enforcement/iam.py | 400 ++++++++++++++++++++++++++++
infra/enforcement/requirements.txt | 23 ++
infra/enforcement/sending.py | 179 +++++++++++++
infra/iam/generate.py | 212 ---------------
infra/iam/users.tf | 2 +-
infra/iam/users.yml | 2 +-
infra/keys/keys.yaml | 30 +--
10 files changed, 1342 insertions(+), 231 deletions(-)
diff --git a/infra/enforcement/README.md b/infra/enforcement/README.md
new file mode 100644
index 00000000000..c9b2bda8cab
--- /dev/null
+++ b/infra/enforcement/README.md
@@ -0,0 +1,165 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Infrastructure rules enforcement
+
+This module is used to check that the infrastructure rules are being used.
+
+## IAM Policies
+
+The enforcement is done by validating the IAM policies against the defined
policies.
+The tool monitors and enforces compliance for user permissions, service
account roles, and group memberships across your GCP project.
+
+### Usage
+
+You can specify the action either through the configuration file
(`config.yml`) or via command-line arguments:
+
+```bash
+# Check compliance and report issues (default)
+python iam.py --action check
+
+# Create GitHub issue if compliance violations are found
+python iam.py --action issue
+
+# Generate new compliance file based on current IAM policy
+python iam.py --action generate
+```
+
+### Actions
+
+- **check**: Validates IAM policies against defined policies and reports any
differences (default behavior)
+- **issue**: Creates a GitHub issue when IAM policies differ from the defined
ones, including detailed permission discrepancies
+- **generate**: Updates the compliance file to match the current GCP IAM
policy, creating a new baseline from existing permissions
+
+### Features
+
+The IAM Policy enforcement tool provides the following capabilities:
+
+- **Comprehensive Policy Export**: Automatically exports all IAM bindings and
roles from the GCP project
+- **Member Type Recognition**: Handles users, service accounts, and groups
with proper parsing and identification
+- **Permission Comparison**: Detailed comparison between expected and actual
permissions for each user
+- **Conditional Role Filtering**: Automatically excludes conditional roles
(roles with conditions) from compliance checks
+- **Sorted Output**: Provides consistent, sorted output for easy comparison
and review
+- **Detailed Reporting**: Comprehensive reporting of permission differences
with clear before/after comparisons
+- **GitHub Integration**: Automatic issue creation with detailed compliance
violation reports
+
+### Configuration
+
+The `config.yml` file supports the following parameters for IAM policies:
+
+- `project_id`: GCP project ID to check (default: `apache-beam-testing`)
+- `users_file`: Path to the YAML file containing expected IAM policies
(default: `../iam/users.yml`)
+- `action`: Default action to perform (`check`, `issue`, or `generate`)
+- `logging`: Logging configuration (level and format)
+
+### IAM Policy File Format
+
+The IAM policy file should follow this YAML structure:
+
+```yaml
+- username: john.doe
+ email: [email protected]
+ permissions:
+ - role: roles/viewer
+ - role: roles/storage.objectViewer
+- username: service-account-name
+ email: [email protected]
+ permissions:
+ - role: roles/compute.instanceAdmin
+ - role: roles/iam.serviceAccountUser
+```
+
+Each user entry includes:
+- `username`: The derived username (typically the part before @ in email
addresses)
+- `email`: The full email address of the user or service account
+- `permissions`: List of IAM roles assigned to this member
+ - `role`: The full GCP IAM role name (e.g., `roles/viewer`, `roles/editor`)
+
+### Compliance Checking Process
+
+1. **Policy Extraction**: Retrieves current IAM policy from the GCP project
+2. **Member Parsing**: Parses all IAM members and extracts usernames, emails,
and types
+3. **Role Processing**: Processes all roles while filtering out conditional
bindings
+4. **Comparison**: Compares current permissions with expected permissions from
the policy file
+5. **Reporting**: Generates detailed reports of any discrepancies found
+
+Command-line arguments take precedence over configuration file settings.
+
+## Account Keys
+
+The enforcement is also done by validating service account keys and their
access permissions against the defined policies.
+The tool supports three different actions when discrepancies are found:
+
+### Usage
+
+You can specify the action either through the configuration file
(`config.yml`) or via command-line arguments:
+
+```bash
+# Check compliance and report issues (default)
+python account_keys.py --action check
+
+# Create GitHub issue if compliance violations are found
+python account_keys.py --action issue
+
+# Generate new compliance file based on current service account keys policy
+python account_keys.py --action generate
+```
+
+### Actions
+
+- **check**: Validates service account keys and their permissions against
defined policies and reports any differences (default behavior)
+- **issue**: Creates a GitHub issue when service account keys policies differ
from the defined ones
+- **generate**: Updates the compliance file to match the current GCP service
account keys and Secret Manager permissions
+
+### Features
+
+The Account Keys enforcement tool provides the following capabilities:
+
+- **Service Account Discovery**: Automatically discovers all active
(non-disabled) service accounts in the project
+- **Secret Manager Integration**: Monitors secrets created by the
beam-infra-secret-manager service
+- **Permission Validation**: Ensures that Secret Manager permissions match the
declared authorized users
+- **Compliance Reporting**: Identifies missing service accounts, undeclared
managed secrets, and permission mismatches
+- **Automatic Remediation**: Can automatically update the compliance file to
match current infrastructure state
+
+### Configuration
+
+The `config.yml` file supports the following parameters for account keys:
+
+- `project_id`: GCP project ID to check
+- `service_account_keys_file`: Path to the YAML file containing expected
service account keys policies (default: `../keys/keys.yaml`)
+- `action`: Default action to perform (`check`, `issue`, or `generate`)
+- `logging`: Logging configuration (level and format)
+
+### Service Account Keys File Format
+
+The service account keys file should follow this YAML structure:
+
+```yaml
+service_accounts:
+- account_id: example-service-account
+ display_name: [email protected]
+ authorized_users:
+ - email: [email protected]
+ - email: [email protected]
+```
+
+Each service account entry includes:
+- `account_id`: The unique identifier for the service account (without the
full email domain)
+- `display_name`: The full service account email address or any custom display
name
+- `authorized_users`: List of users who should have access to the service
account's secrets
diff --git a/infra/enforcement/account_keys.py
b/infra/enforcement/account_keys.py
new file mode 100644
index 00000000000..4c3a8190d23
--- /dev/null
+++ b/infra/enforcement/account_keys.py
@@ -0,0 +1,523 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import logging
+import sys
+import yaml
+import argparse
+import os
+from typing import List, Dict, TypedDict, Optional
+from google.cloud import secretmanager
+from google.cloud import iam_admin_v1
+from google.cloud.iam_admin_v1 import types
+from sending import SendingClient
+
+SECRET_MANAGER_LABEL = "beam-infra-secret-manager"
+
+class AuthorizedUser(TypedDict):
+ email: str
+
+class ServiceAccount(TypedDict):
+ account_id: str
+ display_name: str
+ authorized_users: List[AuthorizedUser]
+
+class ServiceAccountsConfig(TypedDict):
+ service_accounts: List[ServiceAccount]
+
+CONFIG_FILE = "config.yml"
+
+class AccountKeysPolicyComplianceCheck:
+ def __init__(self, project_id: str, service_account_keys_file: str,
logger: logging.Logger, sending_client: Optional[SendingClient] = None):
+ self.project_id = project_id
+ self.service_account_keys_file = service_account_keys_file
+ self.logger = logger
+ self.sending_client = sending_client
+ self.secret_client = secretmanager.SecretManagerServiceClient()
+ self.service_account_client = iam_admin_v1.IAMClient()
+
+ def _normalize_account_email(self, account_id: str) -> str:
+ """
+ Normalizes the account identifier to a full email format.
+
+ Args:
+ account_id (str): The unique identifier or email of the service
account.
+
+ Returns:
+ str: The full service account email address.
+ """
+ if "@" in account_id:
+ return account_id
+ else:
+ return f"{account_id}@{self.project_id}.iam.gserviceaccount.com"
+
+ def _denormalize_account_email(self, email: str) -> str:
+ """
+ Denormalizes the full service account email address to its unique
identifier.
+
+ Args:
+ email (str): The full service account email address.
+
+ Returns:
+ str: The unique identifier for the service account.
+ """
+ if email.endswith(f"@{self.project_id}.iam.gserviceaccount.com"):
+ return email.split("@")[0]
+ return email
+
+ def _normalize_username(self, username: str) -> str:
+ """
+ Normalizes the username to a consistent format.
+
+ Args:
+ username (str): The username to normalize.
+
+ Returns:
+ str: The normalized username.
+ """
+ if not username.startswith("user:"):
+ return f"user:{username.strip().lower()}"
+ return username
+
+ def _denormalize_username(self, username: str) -> str:
+ """
+ Denormalizes the username from the consistent format.
+
+ Args:
+ username (str): The normalized username.
+
+ Returns:
+ str: The denormalized username.
+ """
+ if username.startswith("user:"):
+ return username.split(":", 1)[1].strip().lower()
+ return username
+
+ def _get_all_live_service_accounts(self) -> List[str]:
+ """
+ Retrieves all service accounts that are currently active (not
disabled) in the project.
+
+ Returns:
+ List[str]: A list of email addresses for all live service accounts.
+ """
+ request = types.ListServiceAccountsRequest()
+ request.name = f"projects/{self.project_id}"
+
+ try:
+ accounts =
self.service_account_client.list_service_accounts(request=request)
+ self.logger.debug(f"Retrieved {len(accounts.accounts)} service
accounts for project {self.project_id}")
+
+ if not accounts:
+ self.logger.warning(f"No service accounts found in project
{self.project_id}.")
+ return []
+
+ return [self._normalize_account_email(account.email) for account
in accounts.accounts if not account.disabled]
+ except Exception as e:
+ self.logger.error(f"Failed to retrieve service accounts for
project {self.project_id}: {e}")
+ raise
+
+ def _get_all_live_managed_secrets(self) -> List[str]:
+ """
+ Retrieves the list of secrets from the Secret Manager that where
created by the beam-secret-service
+
+ Returns:
+ List[str]: A list of secret ids
+ """
+ try:
+ secrets = list(self.secret_client.list_secrets(request={"parent":
f"projects/{self.project_id}"}))
+ self.logger.debug(f"Retrieved {len(secrets)} secrets for project
{self.project_id}")
+
+ if not secrets:
+ self.logger.warning(f"No secrets found in project
{self.project_id}.")
+ return []
+
+ return [secret.name.split("/")[-1] for secret in secrets if
"created_by" in secret.labels and secret.labels["created_by"] ==
SECRET_MANAGER_LABEL]
+ except Exception as e:
+ self.logger.error(f"Failed to retrieve secrets for project
{self.project_id}: {e}")
+ raise
+
+ def _get_all_secret_authorized_users(self, secret_id: str) -> List[str]:
+ """
+ Retrieves a list of all users who have access to the secrets in the
Secret Manager.
+
+ Args:
+ secret_id (str): The ID of the secret to check access for.
+ Returns:
+ List[str]: A list of email addresses for all users authorized to
access the secrets.
+ """
+ accessor_role = "roles/secretmanager.secretAccessor"
+ resource_name = self.secret_client.secret_path(self.project_id,
secret_id)
+
+ try:
+ policy = self.secret_client.get_iam_policy(request={"resource":
resource_name})
+ self.logger.debug(f"Retrieved IAM policy for secret '{secret_id}':
{policy}")
+
+ if not policy.bindings:
+ self.logger.warning(f"No IAM bindings found for secret
'{secret_id}'.")
+ return []
+
+ authorized_users = []
+ for binding in policy.bindings:
+ if binding.role == accessor_role:
+ for user in binding.members:
+ authorized_users.append(self._normalize_username(user))
+
+ return authorized_users
+ except Exception as e:
+ self.logger.error(f"Failed to get IAM policy for secret
'{secret_id}': {e}")
+ raise
+
+ def _read_service_account_keys(self) -> ServiceAccountsConfig:
+ """
+ Reads the service account keys from a YAML file and returns a list of
ServiceAccount objects.
+
+ Returns:
+ List[ServiceAccount]: A list of service account declarations.
+ """
+ try:
+ with open(self.service_account_keys_file, "r") as file:
+ keys = yaml.safe_load(file)
+
+ if not keys or keys.get("service_accounts") is None:
+ return {"service_accounts": []}
+
+ return keys
+ except FileNotFoundError:
+ self.logger.info(f"Service account keys file
{self.service_account_keys_file} not found, starting with empty configuration")
+ return {"service_accounts": []}
+ except IOError as e:
+ error_msg = f"Failed to read service account keys from
{self.service_account_keys_file}: {e}"
+ self.logger.error(error_msg)
+ raise
+
+ def _to_yaml_file(self, data: List[ServiceAccount], output_file: str,
header_info: str = "") -> None:
+ """
+ Writes a list of dictionaries to a YAML file.
+ Include the apache license header on the files
+
+ Args:
+ data: A list of dictionaries containing user permissions and
details.
+ output_file: The file path where the YAML output will be written.
+ header_info: A string containing the header information to be
included in the YAML file.
+ """
+
+ apache_license_header = """# Licensed to the Apache Software
Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+ """
+
+ # Prepare the header with the Apache license
+ header = f"{apache_license_header}\n# {header_info}\n# Generated on
{datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}
UTC\n\n"
+
+ try:
+ with open(output_file, "w") as file:
+ file.write(header)
+ yaml_data = {"service_accounts": data}
+ yaml.dump(yaml_data, file, sort_keys=False,
default_flow_style=False, indent=2)
+ self.logger.info(f"Successfully wrote Service Account Keys policy
data to {output_file}")
+ except IOError as e:
+ self.logger.error(f"Failed to write to {output_file}: {e}")
+
+
+ def check_compliance(self) -> List[str]:
+ """
+ Checks the compliance of service account keys with the defined
policies.
+
+ Returns:
+ List[str]: A list of compliance issue messages.
+ """
+
+ service_account_data = self._read_service_account_keys()
+ file_service_accounts = service_account_data.get("service_accounts")
+
+ if not file_service_accounts:
+ file_service_accounts = []
+ self.logger.info(f"No service account keys found in the
{self.service_account_keys_file}.")
+
+ compliance_issues = []
+
+ # Check that all service accounts that exist are declared
+ for service_account in self._get_all_live_service_accounts():
+ if self._denormalize_account_email(service_account) not in
[account["account_id"] for account in file_service_accounts]:
+ msg = f"Service account '{service_account}' is not declared in
the service account keys file."
+ compliance_issues.append(msg)
+ self.logger.warning(msg)
+
+ managed_secrets = self._get_all_live_managed_secrets()
+ extracted_secrets =
[f"{self._denormalize_account_email(account['account_id'])}-key" for account in
file_service_accounts]
+
+ # Check for managed secrets that are not declared
+ for secret in managed_secrets:
+ if secret not in extracted_secrets:
+ msg = f"Managed secret '{secret}' is not declared in the
service account keys file."
+ compliance_issues.append(msg)
+ self.logger.warning(msg)
+
+ # Check for each managed secret if it has the correct permissions
+ for account in file_service_accounts:
+ secret_name =
f"{self._denormalize_account_email(account['account_id'])}-key"
+ if secret_name not in managed_secrets:
+ # Skip accounts that don't have managed secrets
+ continue
+
+ authorized_users = [user["email"] for user in
account["authorized_users"]]
+ actual_users = [self._denormalize_username(user) for user in
self._get_all_secret_authorized_users(secret_name)]
+
+ # Sort both lists for proper comparison
+ authorized_users.sort()
+ actual_users.sort()
+
+ if authorized_users != actual_users:
+ msg = f"Managed secret '{account['account_id']}' does not have
the correct permissions. Expected: {authorized_users}, Actual: {actual_users}"
+ compliance_issues.append(msg)
+ self.logger.warning(msg)
+
+ return compliance_issues
+
+ def create_announcement(self, recipient: str) -> None:
+ """
+ Creates an announcement about compliance issues using the
SendingClient.
+
+ Args:
+ recipient (str): The email address of the announcement recipient.
+ """
+ if not self.sending_client:
+ raise ValueError("SendingClient is required for creating
announcements")
+
+ diff = self.check_compliance()
+
+ if not diff:
+ self.logger.info("No compliance issues found, no announcement will
be created.")
+ return
+
+ title = f"Account Keys Compliance Issue Detected"
+ body = f"Account keys for project {self.project_id} are not compliant
with the defined policies on {self.service_account_keys_file}\n\n"
+ for issue in diff:
+ body += f"- {issue}\n"
+
+ announcement = f"Dear team,\n\nThis is an automated notification about
compliance issues detected in the Account Keys policy for project
{self.project_id}.\n\n"
+ announcement += f"We found {len(diff)} compliance issue(s) that need
your attention.\n"
+ announcement += f"\nPlease check the GitHub issue for detailed
information and take appropriate action to resolve these compliance violations."
+
+ self.sending_client.create_announcement(title, body, recipient,
announcement)
+
+ def print_announcement(self, recipient: str) -> None:
+ """
+ Prints announcement details instead of sending them (for testing
purposes).
+ Args:
+ recipient (str): The email address of the announcement recipient.
+ """
+ if not self.sending_client:
+ raise ValueError("SendingClient is required for printing
announcements")
+
+ diff = self.check_compliance()
+
+ if not diff:
+ self.logger.info("No compliance issues found, no announcement will
be printed.")
+ return
+
+ title = f"Account Keys Compliance Issue Detected"
+ body = f"Account keys for project {self.project_id} are not compliant
with the defined policies on {self.service_account_keys_file}\n\n"
+ for issue in diff:
+ body += f"- {issue}\n"
+
+ announcement = f"Dear team,\n\nThis is an automated notification about
compliance issues detected in the Account Keys policy for project
{self.project_id}.\n\n"
+ announcement += f"We found {len(diff)} compliance issue(s) that need
your attention.\n"
+ announcement += f"\nPlease check the GitHub issue for detailed
information and take appropriate action to resolve these compliance violations."
+
+ self.sending_client.print_announcement(title, body, recipient,
announcement)
+
+ def generate_compliance(self) -> None:
+ """
+ Modifies the service account keys file to match the current state of
service accounts and secrets.
+ It will just add the non managed service accounts.
+ """
+
+ service_account_data = self._read_service_account_keys()
+ file_service_accounts = service_account_data.get("service_accounts",
[])
+
+ # Ensure file_service_accounts is a list
+ if file_service_accounts is None:
+ file_service_accounts = []
+
+ self.logger.info(f"Found {len(file_service_accounts)} existing service
accounts in the keys file")
+
+ # Check that all service accounts that exist are declared, if not, add
them
+ for service_account in self._get_all_live_service_accounts():
+ if self._denormalize_account_email(service_account) not in
[account["account_id"] for account in file_service_accounts]:
+ self.logger.info(f"Service account '{service_account}' is not
declared in the service account keys file, adding it")
+ file_service_accounts.append({
+ "account_id":
self._denormalize_account_email(service_account),
+ "display_name": service_account,
+ "authorized_users": []
+ })
+
+ managed_secrets = self._get_all_live_managed_secrets()
+ extracted_secrets =
[f"{self._denormalize_account_email(account['account_id'])}-key" for account in
file_service_accounts]
+
+ # Check for managed secrets that are not declared, if not, add them
+ for secret in managed_secrets:
+ if secret not in extracted_secrets:
+ self.logger.info(f"Managed secret '{secret}' is not declared
in the service account keys file, adding it")
+ file_service_accounts.append({
+ "account_id": secret.strip("-key"),
+ "display_name":
self._normalize_account_email(secret.strip("-key")),
+ "authorized_users": []
+ })
+
+ # Check for each managed secret if it has the correct permissions
+ for account in file_service_accounts:
+ secret_name =
f"{self._denormalize_account_email(account['account_id'])}-key"
+ if secret_name not in managed_secrets:
+ continue
+
+ authorized_users = sorted([user["email"] for user in
account["authorized_users"]])
+
+ if not authorized_users:
+ self.logger.info(f"Managed secret '{account}' is new, skipping
permission check")
+ continue
+
+ actual_users_normalized =
sorted(self._get_all_secret_authorized_users(secret_name))
+ actual_users = sorted([self._denormalize_username(user) for user
in actual_users_normalized])
+
+ if authorized_users != actual_users:
+ self.logger.info(f"Managed secret '{account}' does not have
the correct permissions, updating it")
+ account["authorized_users"] = [{"email": user} for user in
actual_users]
+
+ # Remove duplicates based on account_id
+ seen_accounts = set()
+ deduplicated_accounts = []
+ for account in file_service_accounts:
+ if account["account_id"] not in seen_accounts:
+ seen_accounts.add(account["account_id"])
+ deduplicated_accounts.append(account)
+ else:
+ self.logger.info(f"Removing duplicate entry for account
'{account['account_id']}'")
+
+ self._to_yaml_file(deduplicated_accounts,
self.service_account_keys_file, header_info="Service Account Keys")
+
+def config_process() -> Dict[str, str]:
+ with open(CONFIG_FILE, "r") as file:
+ config = yaml.safe_load(file)
+
+ if not config:
+ raise ValueError("Configuration file is empty or invalid.")
+
+ config_res = dict()
+
+ config_res["project_id"] = config.get("project_id", "apache-beam-testing")
+ config_res["logging_level"] = config.get("logging", {}).get("level",
"INFO")
+ config_res["logging_format"] = config.get("logging", {}).get("format",
"[%(asctime)s] %(levelname)s: %(message)s")
+ config_res["service_account_keys_file"] =
config.get("service_account_keys_file", "../keys/keys.yaml")
+ config_res["action"] = config.get("action", "check")
+
+ # SendingClient configuration
+ config_res["github_token"] = os.getenv("GITHUB_TOKEN", "")
+ config_res["github_repo"] = os.getenv("GITHUB_REPOSITORY", "apache/beam")
+ config_res["smtp_server"] = os.getenv("SMTP_SERVER", "")
+ config_res["smtp_port"] = os.getenv("SMTP_PORT", 587)
+ config_res["email"] = os.getenv("EMAIL_ADDRESS", "")
+ config_res["password"] = os.getenv("EMAIL_PASSWORD", "")
+ config_res["recipient"] = os.getenv("EMAIL_RECIPIENT", "")
+
+ return config_res
+
+def main():
+ # Parse command line arguments
+ parser = argparse.ArgumentParser(description="Account Keys Compliance
Checker")
+ parser.add_argument("--action", choices=["check", "announce", "print",
"generate"],
+ help="Action to perform: check compliance, create
announcement, print announcement, or generate new compliance")
+ args = parser.parse_args()
+
+ config = config_process()
+
+ # Command line argument takes precedence over config file
+ action = args.action if args.action else config.get("action", "check")
+
+ logging.basicConfig(level=getattr(logging,
config["logging_level"].upper(), logging.INFO),
+ format=config["logging_format"])
+ logger = logging.getLogger("AccountKeysPolicyComplianceCheck")
+
+ # Create SendingClient if needed for announcement actions
+ sending_client = None
+ if action in ["announce", "print"]:
+ try:
+ # Provide default values for testing, especially for print action
+ github_token = config["github_token"] or "dummy-token"
+ github_repo = config["github_repo"] or "dummy/repo"
+ smtp_server = config["smtp_server"] or "dummy-server"
+ smtp_port = int(config["smtp_port"]) if config["smtp_port"] else
587
+ email = config["email"] or "[email protected]"
+ password = config["password"] or "dummy-password"
+
+ sending_client = SendingClient(
+ logger=logger,
+ github_token=github_token,
+ github_repo=github_repo,
+ smtp_server=smtp_server,
+ smtp_port=smtp_port,
+ email=email,
+ password=password
+ )
+ except Exception as e:
+ logger.error(f"Failed to initialize SendingClient: {e}")
+ return 1
+
+ logger.info(f"Starting Account Keys policy compliance check with action:
{action}")
+ account_keys_checker =
AccountKeysPolicyComplianceCheck(config["project_id"],
config["service_account_keys_file"], logger, sending_client)
+
+ try:
+ if action == "check":
+ compliance_issues = account_keys_checker.check_compliance()
+ if compliance_issues:
+ logger.warning("Account Keys policy compliance issues found:")
+ for issue in compliance_issues:
+ logger.warning(issue)
+ else:
+ logger.info("Account Keys policy is compliant.")
+ elif action == "announce":
+ logger.info("Creating announcement for compliance violations...")
+ recipient = config["recipient"] or "[email protected]"
+ account_keys_checker.create_announcement(recipient)
+ elif action == "print":
+ logger.info("Printing announcement for compliance violations...")
+ recipient = config["recipient"] or "[email protected]"
+ account_keys_checker.print_announcement(recipient)
+ elif action == "generate":
+ logger.info("Generating new compliance based on current Account
Keys policy...")
+ account_keys_checker.generate_compliance()
+ else:
+ logger.error(f"Unknown action: {action}")
+ return 1
+ except Exception as e:
+ logger.error(f"Error executing action '{action}': {e}")
+ return 1
+
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/infra/enforcement/config.yml b/infra/enforcement/config.yml
new file mode 100644
index 00000000000..7fae0253b8f
--- /dev/null
+++ b/infra/enforcement/config.yml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Project ID
+project_id: apache-beam-testing
+
+# Logging
+logging:
+ level: DEBUG
+ format: "[%(asctime)s] %(levelname)s: %(message)s"
+
+# IAM
+
+# Working users file
+users_file: ../iam/users.yml
+
+# Service Account Keys
+service_account_keys_file: ../keys/keys.yaml
+
+# Action to perform when running the script
+# Options:
+# - check: Check compliance and report issues (default)
+# - issue: Create GitHub issue if compliance violations are found
+# - generate: Generate new compliance file based on current IAM policy
+action: check
diff --git a/infra/enforcement/iam.py b/infra/enforcement/iam.py
new file mode 100644
index 00000000000..92246aa7c62
--- /dev/null
+++ b/infra/enforcement/iam.py
@@ -0,0 +1,400 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import datetime
+import logging
+import os
+import sys
+import yaml
+from google.api_core import exceptions
+from google.cloud import resourcemanager_v3
+from typing import Optional, List, Dict, Tuple
+from sending import SendingClient
+
+CONFIG_FILE = "config.yml"
+
+class IAMPolicyComplianceChecker:
+ def __init__(self, project_id: str, users_file: str, logger:
logging.Logger, sending_client: Optional[SendingClient] = None):
+ self.project_id = project_id
+ self.users_file = users_file
+ self.client = resourcemanager_v3.ProjectsClient()
+ self.logger = logger
+ self.sending_client = sending_client
+
+ def _parse_member(self, member: str) -> tuple[str, Optional[str], str]:
+ """Parses an IAM member string to extract type, email, and a derived
username.
+
+ Args:
+ member: The IAM member string
+ Returns:
+ A tuple containing:
+ - username: The derived username from the member string.
+ - email: The email address if available, otherwise None.
+ - member_type: The type of the member (e.g., user,
serviceAccount, group).
+ """
+ email = None
+ username = member
+
+ # Split the member string to determine type and identifier
+ parts = member.split(':', 1)
+ member_type = parts[0] if len(parts) > 1 else "unknown"
+ identifier = parts[1] if len(parts) > 1 else member
+
+ if member_type in ["user", "serviceAccount", "group"]:
+ email = identifier
+ if '@' in identifier:
+ username = identifier.split('@')[0]
+ else:
+ username = identifier
+ else:
+ username = identifier
+ member_type = "unknown"
+ email = None
+
+ return username, email, member_type
+
+ def _export_project_iam(self) -> List[Dict]:
+ """Exports the IAM policy for a given project to YAML format.
+
+ Returns:
+ A list of dictionaries containing the IAM policy details.
+ """
+
+ try:
+ policy =
self.client.get_iam_policy(resource=f"projects/{self.project_id}")
+ self.logger.debug(f"Retrieved IAM policy for project
{self.project_id}")
+ except exceptions.NotFound as e:
+ self.logger.error(f"Project {self.project_id} not found: {e}")
+ raise
+ except exceptions.PermissionDenied as e:
+ self.logger.error(f"Permission denied for project
{self.project_id}: {e}")
+ raise
+ except Exception as e:
+ self.logger.error(f"An error occurred while retrieving IAM policy
for project {self.project_id}: {e}")
+ raise
+
+ members_data = {}
+
+ for binding in policy.bindings:
+ role = binding.role
+
+ for member_str in binding.members:
+ if member_str not in members_data:
+ username, email_address, member_type =
self._parse_member(member_str)
+ if member_type == "unknown":
+ self.logger.warning(f"Skipping member {member_str}
with no email address")
+ continue # Skip if no email address is found,
probably a malformed member
+ members_data[member_str] = {
+ "username": username,
+ "email": email_address,
+ "permissions": []
+ }
+
+ # Skip permissions that have a condition
+ if "withcond" in role:
+ continue
+
+ permission_entry = {}
+ permission_entry["role"] = role
+
+
members_data[member_str]["permissions"].append(permission_entry)
+
+ output_list = []
+ for data in members_data.values():
+ data["permissions"] = sorted(data["permissions"], key=lambda p:
p["role"])
+ output_list.append({
+ "username": data["username"],
+ "email": data["email"],
+ "permissions": data["permissions"]
+ })
+
+ output_list.sort(key=lambda x: x["username"])
+ return output_list
+
+ def _read_project_iam_file(self) -> List[Dict]:
+ """Reads the IAM policy from a YAML file.
+
+ Returns:
+ A list of dictionaries containing the IAM policy details.
+ """
+ try:
+ with open(self.users_file, "r") as file:
+ iam_policy = yaml.safe_load(file)
+
+
+ self.logger.debug(f"Retrieved IAM policy from file for project
{self.project_id}")
+ return iam_policy
+ except FileNotFoundError:
+ self.logger.error(f"IAM policy file not found for project
{self.project_id}")
+ return []
+ except Exception as e:
+ self.logger.error(f"An error occurred while reading IAM policy
file for project {self.project_id}: {e}")
+ return []
+
+ def _to_yaml_file(self, data: List[Dict], output_file: str, header_info:
str = "") -> None:
+ """
+ Writes a list of dictionaries to a YAML file.
+ Include the apache license header on the files
+
+ Args:
+ data: A list of dictionaries containing user permissions and
details.
+ output_file: The file path where the YAML output will be written.
+ header_info: A string containing the header information to be
included in the YAML file.
+ """
+
+ apache_license_header = """# Licensed to the Apache Software
Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+ """
+
+ # Prepare the header with the Apache license
+ header = f"{apache_license_header}\n# {header_info}\n# Generated on
{datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}
UTC\n\n"
+
+ try:
+ with open(output_file, "w") as file:
+ file.write(header)
+ yaml.dump(data, file, sort_keys=False,
default_flow_style=False, indent=2)
+ self.logger.info(f"Successfully wrote IAM policy data to
{output_file}")
+ except IOError as e:
+ self.logger.error(f"Failed to write to {output_file}: {e}")
+ raise
+
+ def check_compliance(self) -> List[str]:
+ """
+ Checks the compliance of the IAM policy against the defined policies.
+
+ Returns:
+ A list of strings describing any compliance issues found.
+ """
+ current_users = {user['email']: user for user in
self._export_project_iam()}
+ existing_users = {user['email']: user for user in
self._read_project_iam_file()}
+
+ if not existing_users:
+ error_msg = f"No IAM policy found in the {self.users_file}."
+ self.logger.info(error_msg)
+ raise RuntimeError(error_msg)
+
+ differences = []
+
+ all_emails = set(current_users.keys()) | set(existing_users.keys())
+
+ for email in sorted(list(all_emails)):
+ current_user = current_users.get(email)
+ existing_user = existing_users.get(email)
+
+ if current_user and not existing_user:
+ differences.append(f"User {email} not found in existing
policy.")
+ elif not current_user and existing_user:
+ differences.append(f"User {email} found in policy file but not
in GCP.")
+ elif current_user and existing_user:
+ if current_user["permissions"] != existing_user["permissions"]:
+ msg = f"\nPermissions for user {email} differ."
+ msg += f"\nIn GCP: {current_user['permissions']}"
+ msg += f"\nIn {self.users_file}:
{existing_user['permissions']}"
+ self.logger.info(msg)
+ differences.append(msg)
+
+ return differences
+
+ def create_announcement(self, recipient: str) -> None:
+ """
+ Creates an announcement about compliance issues using the
SendingClient.
+
+ Args:
+ recipient (str): The email address of the announcement recipient.
+ """
+ if not self.sending_client:
+ raise ValueError("SendingClient is required for creating
announcements")
+
+ diff = self.check_compliance()
+
+ if not diff:
+ self.logger.info("No compliance issues found, no announcement will
be created.")
+ return
+
+ title = f"IAM Policy Non-Compliance Detected"
+ body = f"IAM policy for project {self.project_id} is not compliant
with the defined policies on {self.users_file}\n\n"
+ for issue in diff:
+ body += f"- {issue}\n"
+
+ announcement = f"Dear team,\n\nThis is an automated notification about
compliance issues detected in the IAM policy for project {self.project_id}.\n\n"
+ announcement += f"We found {len(diff)} compliance issue(s) that need
your attention.\n"
+ announcement += f"\nPlease check the GitHub issue for detailed
information and take appropriate action to resolve these compliance violations."
+
+ self.sending_client.create_announcement(title, body, recipient,
announcement)
+
+ def print_announcement(self, recipient: str) -> None:
+ """
+ Prints announcement details instead of sending them (for testing
purposes).
+
+ Args:
+ recipient (str): The email address of the announcement recipient.
+ """
+ if not self.sending_client:
+ raise ValueError("SendingClient is required for printing
announcements")
+
+ diff = self.check_compliance()
+
+ if not diff:
+ self.logger.info("No compliance issues found, no announcement will
be printed.")
+ return
+
+ title = f"IAM Policy Non-Compliance Detected"
+ body = f"IAM policy for project {self.project_id} is not compliant
with the defined policies on {self.users_file}\n\n"
+ for issue in diff:
+ body += f"- {issue}\n"
+
+ announcement = f"Dear team,\n\nThis is an automated notification about
compliance issues detected in the IAM policy for project {self.project_id}.\n\n"
+ announcement += f"We found {len(diff)} compliance issue(s) that need
your attention.\n"
+ announcement += f"\nPlease check the GitHub issue for detailed
information and take appropriate action to resolve these compliance violations."
+
+ self.sending_client.print_announcement(title, body, recipient,
announcement)
+
+ def generate_compliance(self) -> None:
+ """
+ Modifies the users file to match the current IAM policy.
+ If no changes are needed, no file will be written.
+ """
+
+ try:
+ diff = self.check_compliance()
+ except RuntimeError:
+ self.logger.info("No existing IAM policy found.")
+ diff = ["No existing policy found"]
+
+ if not diff or (len(diff) == 1 and "No existing policy found" not in
diff[0]):
+ self.logger.info("No compliance issues found, no changes will be
made.")
+ return
+
+ current_policy = self._export_project_iam()
+ header_info = f"IAM policy for project {self.project_id}"
+
+ self._to_yaml_file(current_policy, self.users_file, header_info)
+ self.logger.info(f"Generated new compliance file: {self.users_file}")
+
+def config_process() -> Dict[str, str]:
+ with open(CONFIG_FILE, "r") as file:
+ config = yaml.safe_load(file)
+
+ if not config:
+ raise ValueError("Configuration file is empty or invalid.")
+
+ config_res = dict()
+
+ config_res["project_id"] = config.get("project_id", "apache-beam-testing")
+ config_res["logging_level"] = config.get("logging", {}).get("level",
"INFO")
+ config_res["logging_format"] = config.get("logging", {}).get("format",
"[%(asctime)s] %(levelname)s: %(message)s")
+ config_res["users_file"] = config.get("users_file", "../iam/users.yml")
+ config_res["action"] = config.get("action", "check")
+
+ # SendingClient configuration
+ config_res["github_token"] = os.getenv("GITHUB_TOKEN", "")
+ config_res["github_repo"] = os.getenv("GITHUB_REPOSITORY", "apache/beam")
+ config_res["smtp_server"] = os.getenv("SMTP_SERVER", "")
+ config_res["smtp_port"] = os.getenv("SMTP_PORT", 587)
+ config_res["email"] = os.getenv("EMAIL_ADDRESS", "")
+ config_res["password"] = os.getenv("EMAIL_PASSWORD", "")
+ config_res["recipient"] = os.getenv("EMAIL_RECIPIENT", "")
+
+ return config_res
+
+def main():
+ # Parse command line arguments
+ parser = argparse.ArgumentParser(description="IAM Policy Compliance
Checker")
+ parser.add_argument("--action", choices=["check", "announce", "print",
"generate"],
+ help="Action to perform: check compliance, create
announcement, print announcement, or generate new compliance")
+ args = parser.parse_args()
+
+ config = config_process()
+
+ # Command line argument takes precedence over config file
+ action = args.action if args.action else config.get("action", "check")
+
+ logging.basicConfig(level=getattr(logging,
config["logging_level"].upper(), logging.INFO),
+ format=config["logging_format"])
+ logger = logging.getLogger("IAMPolicyComplianceChecker")
+
+ # Create SendingClient if needed for announcement actions
+ sending_client = None
+ if action in ["announce", "print"]:
+ try:
+ # Provide default values for testing, especially for print action
+ github_token = config["github_token"] or "dummy-token"
+ github_repo = config["github_repo"] or "dummy/repo"
+ smtp_server = config["smtp_server"] or "dummy-server"
+ smtp_port = int(config["smtp_port"]) if config["smtp_port"] else
587
+ email = config["email"] or "[email protected]"
+ password = config["password"] or "dummy-password"
+
+ sending_client = SendingClient(
+ logger=logger,
+ github_token=github_token,
+ github_repo=github_repo,
+ smtp_server=smtp_server,
+ smtp_port=smtp_port,
+ email=email,
+ password=password
+ )
+ except Exception as e:
+ logger.error(f"Failed to initialize SendingClient: {e}")
+ return 1
+
+ logger.info(f"Starting IAM policy compliance check with action: {action}")
+ iam_checker = IAMPolicyComplianceChecker(config["project_id"],
config["users_file"], logger, sending_client)
+
+ try:
+ if action == "check":
+ compliance_issues = iam_checker.check_compliance()
+ if compliance_issues:
+ logger.warning("IAM policy compliance issues found:")
+ for issue in compliance_issues:
+ logger.warning(issue)
+ else:
+ logger.info("IAM policy is compliant.")
+ elif action == "announce":
+ logger.info("Creating announcement for compliance violations...")
+ recipient = config["recipient"] or "[email protected]"
+ iam_checker.create_announcement(recipient)
+ elif action == "print":
+ logger.info("Printing announcement for compliance violations...")
+ recipient = config["recipient"] or "[email protected]"
+ iam_checker.print_announcement(recipient)
+ elif action == "generate":
+ logger.info("Generating new compliance based on current IAM
policy...")
+ iam_checker.generate_compliance()
+ else:
+ logger.error(f"Unknown action: {action}")
+ return 1
+ except Exception as e:
+ logger.error(f"Error executing action '{action}': {e}")
+ return 1
+
+ return 0
+
+if __name__ == "__main__":
+
+ sys.exit(main())
diff --git a/infra/enforcement/requirements.txt
b/infra/enforcement/requirements.txt
new file mode 100644
index 00000000000..fa7fd181f9e
--- /dev/null
+++ b/infra/enforcement/requirements.txt
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to install the dependencies for the infrastructure
+
+PyYAML==6.0.2
+google-cloud-iam==2.19.0
+google-cloud-resource-manager==1.14.1
+google-cloud-secret-manager==2.24.0
+google-crc32c==1.7.1
diff --git a/infra/enforcement/sending.py b/infra/enforcement/sending.py
new file mode 100644
index 00000000000..961674ca2f1
--- /dev/null
+++ b/infra/enforcement/sending.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import requests
+import logging
+import smtplib, ssl
+from typing import List, Optional
+from dataclasses import dataclass
+
+@dataclass
+class GitHubIssue:
+ """
+ Represents a GitHub issue.
+ """
+ number: int
+ title: str
+ body: str
+ state: str
+ html_url: str
+ created_at: str
+ updated_at: str
+
+class SendingClient:
+ """
+ Sends notifications about GitHub issues.
+ """
+ def __init__(self, logger: logging.Logger, github_token: str, github_repo:
str,
+ smtp_server: str, smtp_port: int, email: str, password: str):
+
+ required_keys = [github_token, github_repo, smtp_server, smtp_port,
email, password]
+
+ if not all(required_keys):
+ raise ValueError("All parameters must be provided.")
+
+ self.github_repo = github_repo
+ self.headers = {
+ "Authorization": f"Bearer {github_token}",
+ "X-GitHub-Api-Version": "2022-11-28",
+ "Accept": "application/vnd.github+json"
+ }
+
+ self.smtp_server = smtp_server
+ self.smtp_port = smtp_port
+ self.email = email
+ self.password = password
+
+ self.logger = logger
+ self.github_api_url = "https://api.github.com"
+
+ def _make_github_request(self, method: str, endpoint: str, json:
Optional[dict] = None) -> requests.Response:
+ """
+ Makes a request to the GitHub API.
+
+ Args:
+ method (str): The HTTP method to use (e.g., "GET", "POST",
"PATCH").
+ endpoint (str): The API endpoint to call.
+ json (Optional[dict]): The JSON payload to send with the request.
+
+ Returns:
+ requests.Response: The response from the API.
+ """
+ url = f"{self.github_api_url}/{endpoint}"
+ response = requests.request(method, url, headers=self.headers,
json=json)
+
+ if not response.ok:
+ self.logger.error(f"Failed GitHub API request to {endpoint}:
{response.status_code} - {response.text}")
+ response.raise_for_status()
+
+ return response
+
+ def _send_email(self, title: str, body: str, recipient: str) -> None:
+ """
+ Sends an email notification.
+
+ Args:
+ title (str): The title of the email.
+ body (str): The body content of the email.
+ recipient (str): The email address of the recipient.
+ """
+ message = f"Subject: {title}\n\n{body}"
+ context = ssl.create_default_context()
+ with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port,
context=context) as server:
+ server.login(self.email, self.password)
+ server.sendmail(self.email, recipient, message)
+
+ def _get_open_issues(self, title: str) -> List[GitHubIssue]:
+ """
+ Retrieves the number of open GitHub issues with a given title.
+
+ Args:
+ title (str): The title of the GitHub issue.
+ """
+ endpoint =
f"search/issues/?q=is:issue+repo:{self.github_repo}+in:title+{title}+is:open"
+ response = self._make_github_request("GET", endpoint)
+ issues = response.json().get('items', [])
+ return [GitHubIssue(**issue) for issue in issues]
+
+ def create_issue(self, title: str, body: str) -> GitHubIssue:
+ """
+ Creates a GitHub issue in the specified repository.
+
+ Args:
+ title (str): The title of the GitHub issue.
+ body (str): The body content of the GitHub issue.
+ """
+ endpoint = f"repos/{self.github_repo}/issues"
+ payload = {"title": title, "body": body}
+ response = self._make_github_request("POST", endpoint, json=payload)
+ self.logger.info(f"Successfully created GitHub issue: {title}")
+ return GitHubIssue(**response.json())
+
+ def update_issue_body(self, issue_number: int, new_body: str) -> None:
+ """
+ Updates the body of a GitHub issue in the specified repository.
+
+ Args:
+ issue_number (int): The number of the GitHub issue to update.
+ new_body (str): The new body content for the GitHub issue.
+ """
+ endpoint = f"repos/{self.github_repo}/issues/{issue_number}"
+ payload = {"body": new_body}
+ self._make_github_request("PATCH", endpoint, json=payload)
+ self.logger.info(f"Successfully updated body on GitHub issue:
#{issue_number}")
+
+ def create_announcement(self, title: str, body: str, recipient: str,
announcement: str) -> None:
+ """
+ This method sends an email with an announcement. The email will point
to a GitHub issue.
+
+ Creates a GitHub issue in the specified repository if it doesn't
already exist.
+ If multiple open versions exist, the most recent one will be updated.
+
+ Args:
+ title (str): The title of the GitHub issue.
+ body (str): The body content of the GitHub issue.
+ recipient (str): The email address of the recipient.
+ announcement (str): The announcement message to include in the
email.
+ """
+ open_issues = self._get_open_issues(title)
+ open_issues.sort(key=lambda x: x.updated_at, reverse=True)
+ if open_issues:
+ self.logger.info(f"Issue with title '{title}' already exists:
#{open_issues[0].number}")
+ announcement += f"\n\nRelated GitHub Issue:
{open_issues[0].html_url}"
+
+ if open_issues[0].body != body:
+ self.logger.info(f"Updating body of issue
#{open_issues[0].number}")
+ self.update_issue_body(open_issues[0].number, body)
+ else:
+ self.logger.info(f"No changes detected for issue
#{open_issues[0].number}")
+ self._send_email(title, announcement, recipient)
+ else:
+ new_issue = self.create_issue(title, body)
+ announcement += f"\n\nRelated GitHub Issue: {new_issue.html_url}"
+ self._send_email(title, announcement, recipient)
+
+ def print_announcement(self, title: str, body: str, recipient: str,
announcement: str) -> None:
+ """
+ This method prints the data instead of sending the email or creating
an issue.
+ This is used for testing.
+ """
+ self.logger.info("Printing announcement...")
+ print(f"Simulating email sending...")
+ print(f"Recipient: {recipient}")
+ print(f"Announcement: {announcement}")
+
+ print("\nSimulating GitHub issue creation...")
+ print(f"Title: {title}")
+ print(f"Body: {body}")
diff --git a/infra/iam/generate.py b/infra/iam/generate.py
deleted file mode 100644
index 71f6379710e..00000000000
--- a/infra/iam/generate.py
+++ /dev/null
@@ -1,212 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# THIS IS NOT SUPPOSED TO RUN AFTER THE MIGRATION.
-# This script is used to export the IAM policy of a Google Cloud project to a
YAML format.
-# It retrieves the IAM policy bindings, parses the members, and formats the
output in a structured
-# YAML format, excluding service accounts and groups. The output includes
usernames, emails, and
-# their associated permissions, with optional conditions for roles that have
conditions attached.
-# You need to have the Google Cloud SDK installed and authenticated to run
this script.
-
-import argparse
-import datetime
-import yaml
-import logging
-from typing import Optional, List, Dict
-from google.cloud import resourcemanager_v3
-from google.api_core import exceptions
-
-# Configure logging
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s -
%(message)s')
-logger = logging.getLogger(__name__)
-
-def parse_member(member: str) -> tuple[str, Optional[str], str]:
- """Parses an IAM member string to extract type, email, and a derived
username.
-
- Args:
- member: The IAM member string
- Returns:
- A tuple containing:
- - username: The derived username from the member string.
- - email: The email address if available, otherwise None.
- - member_type: The type of the member (e.g., user, serviceAccount,
group).
- """
- email = None
- username = member
-
- # Split the member string to determine type and identifier
- parts = member.split(':', 1)
- member_type = parts[0] if len(parts) > 1 else "unknown"
- identifier = parts[1] if len(parts) > 1 else member
-
- if member_type in ["user", "serviceAccount", "group"]:
- email = identifier
- if '@' in identifier:
- username = identifier.split('@')[0]
- else:
- username = identifier
- else:
- username = identifier
- email = None
-
- return username, email, member_type
-
-def export_project_iam(project_id: str) -> List[Dict]:
- """Exports the IAM policy for a given project to YAML format.
-
- Args:
- project_id: The ID of the Google Cloud project.
- Returns:
- A list of dictionaries containing the IAM policy details.
- """
-
- try:
- client = resourcemanager_v3.ProjectsClient()
- policy = client.get_iam_policy(resource=f"projects/{project_id}")
- logger.info(f"Successfully retrieved IAM policy for project
{project_id}")
- except exceptions.NotFound as e:
- logger.error(f"Project {project_id} not found: {e}")
- raise
- except exceptions.PermissionDenied as e:
- logger.error(f"Permission denied for project {project_id}: {e}")
- raise
- except Exception as e:
- logger.error(f"An error occurred while retrieving IAM policy for
project {project_id}: {e}")
- raise
-
- members_data = {}
-
- for binding in policy.bindings:
- role = binding.role
-
- for member_str in binding.members:
- if member_str not in members_data:
- username, email_address, member_type = parse_member(member_str)
- if member_type == "serviceAccount":
- continue # Skip service accounts
- if member_type == "group":
- continue # Skip groups
- if not email_address:
- continue # Skip if no email address is found, probably a
malformed member
- members_data[member_str] = {
- "username": username,
- "email": email_address,
- "permissions": []
- }
-
- # Skip permissions that have a condition
- if "withcond" in role:
- continue
-
- permission_entry = {}
- permission_entry["role"] = role
-
- members_data[member_str]["permissions"].append(permission_entry)
-
- output_list = []
- for data in members_data.values():
- data["permissions"] = sorted(data["permissions"], key=lambda p:
p["role"])
- output_list.append({
- "username": data["username"],
- "email": data["email"],
- "permissions": data["permissions"]
- })
-
- output_list.sort(key=lambda x: x["username"])
- return output_list
-
-def to_yaml_file(data: List[Dict], output_file: str, header_info: str = "") ->
None:
- """
- Writes a list of dictionaries to a YAML file.
- Include the apache license header on the files
-
- Args:
- data: A list of dictionaries containing user permissions and details.
- output_file: The file path where the YAML output will be written.
- header_info: A string containing the header information to be included
in the YAML file.
- """
-
- apache_license_header = """#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-
- # Prepare the header with the Apache license
- header = f"{apache_license_header}\n# {header_info}\n# Generated on
{datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}
UTC\n\n"
-
- try:
- with open(output_file, "w") as file:
- file.write(header)
- yaml.dump(data, file, sort_keys=False, default_flow_style=False,
indent=2)
- logger.info(f"Successfully wrote IAM policy data to {output_file}")
- except IOError as e:
- logger.error(f"Failed to write to {output_file}: {e}")
- raise
-
-def main():
- """
- Main function to run the script.
-
- This function parses command-line arguments to either export IAM policies
- or generate permission differences for a specified GCP project.
- """
- parser = argparse.ArgumentParser(
- description="Export IAM policies or generate permission differences
for a GCP project."
- )
- parser.add_argument(
- "project_id",
- help="The Google Cloud project ID."
- )
- parser.add_argument(
- "output_file",
- help="Defaults to 'users.yml' if not specified. The file where the IAM
policy will be saved in YAML format.",
- nargs='?',
- default="users.yml"
- )
- parser.add_argument(
- "--yes-i-know-what-i-am-doing",
- action="store_true",
- help="If set, the script will proceed"
- )
-
- args = parser.parse_args()
- project_id = args.project_id
- output_file = args.output_file
-
- if not args.yes_i_know_what_i_am_doing:
- logger.error("You must use the --yes-i-know-what-i-am-doing flag to
proceed.")
- return
-
- # Export the IAM policy for the specified project
- iam_data = export_project_iam(project_id)
-
- # Write the exported data to the specified output file in YAML format
- to_yaml_file(iam_data, output_file, header_info=f"Exported IAM policy for
project {project_id}")
-
-if __name__ == "__main__":
- main()
\ No newline at end of file
diff --git a/infra/iam/users.tf b/infra/iam/users.tf
index 32c26b8bcaa..30d5bfddf8f 100644
--- a/infra/iam/users.tf
+++ b/infra/iam/users.tf
@@ -46,7 +46,7 @@ resource "google_project_iam_member" "project_members" {
}
project = var.project_id
role = each.value.role
- member = "user:${each.value.email}"
+ member = can(regex(".*\\.gserviceaccount\\.com$", each.value.email)) ?
"serviceAccount:${each.value.email}" : "user:${each.value.email}"
dynamic "condition" {
# Condition is only created if expiry_date is set
diff --git a/infra/iam/users.yml b/infra/iam/users.yml
index 06e9cea65e7..d76eb5ae267 100644
--- a/infra/iam/users.yml
+++ b/infra/iam/users.yml
@@ -544,4 +544,4 @@
- username: zhoufek
email: [email protected]
permissions:
- - role: roles/editor
+ - role: roles/editor
\ No newline at end of file
diff --git a/infra/keys/keys.yaml b/infra/keys/keys.yaml
index ca0e170cb21..4e56770b546 100644
--- a/infra/keys/keys.yaml
+++ b/infra/keys/keys.yaml
@@ -1,30 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
+# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
+# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-# Key management service for Apache Beam
-# This service manages the creation, rotation, and deletion of secrets used in
Apache Beam.
-# It uses Google Cloud Secret Manager to store and manage secrets securely.
-# This file contains the list of service accounts and the users who can access
the secrets.
+
+# Service Account Keys
+# This file contains the service account for the project, the account id
+# and the users authorized to use it
+# service_accounts:
+# - account_id: account_id
+# display_name: account_@project_id.iam.gserviceaccount.com
+# authorized_users:
+# - email: "[email protected]"
+# - email: "[email protected]"
service_accounts:
- - account_id: "test-service-account"
- display_name: "Service account for Beam secrets rotation"
- authorized_users:
- - email: "[email protected]"
- - email: "[email protected]"
- - account_id: "test-service-account-2"
- display_name: "Another service account for Beam secrets rotation"
- authorized_users:
- - email: "[email protected]"
\ No newline at end of file