This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 372b25b7571 Add GitHub Actions workflow for managing GCP Service 
Account keys (#35911)
372b25b7571 is described below

commit 372b25b75718376040a2a4eadfbf719022979940
Author: Enrique Calderon <[email protected]>
AuthorDate: Fri Aug 29 13:03:46 2025 -0600

    Add GitHub Actions workflow for managing GCP Service Account keys (#35911)
    
    * Add GitHub Actions workflow for managing GCP Service Account keys
    
    * Update GCP Service Account keys workflow to allow queueing and run the 
workflow on Mondays
    
    * Refactor key rotation service to remove GCS logging and add dry run 
option for cron job
---
 .../beam_Infrastructure_ServiceAccountKeys.yml     |  68 +++++++++
 infra/keys/README.md                               |  14 +-
 infra/keys/config.yaml                             |   4 -
 infra/keys/gcp_logger.py                           | 153 ---------------------
 infra/keys/keys.py                                 | 101 +++++++-------
 infra/keys/keys.yaml                               |   2 +-
 6 files changed, 128 insertions(+), 214 deletions(-)

diff --git a/.github/workflows/beam_Infrastructure_ServiceAccountKeys.yml 
b/.github/workflows/beam_Infrastructure_ServiceAccountKeys.yml
new file mode 100644
index 00000000000..cd5eb2a0698
--- /dev/null
+++ b/.github/workflows/beam_Infrastructure_ServiceAccountKeys.yml
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This workflow modifies the GCP Service Account keys and manages the
+# storage, saving them onto Google Cloud Secret Manager. It also handles
+# the rotation of the keys.
+
+name: Service Account Keys Management
+
+on:
+  workflow_dispatch:
+  # Trigger when the keys.yaml file is modified on the main branch
+  push:
+    branches:
+      - main
+    paths:
+      - 'infra/keys/keys.yaml'
+  schedule:
+    # Once a week at 9:00 AM on Monday
+    - cron: '0 9 * * 1'
+
+# This ensures that only one workflow run is running at a time, and others are 
queued.
+concurrency:
+  group: ${{ github.workflow }}
+  cancel-in-progress: false
+
+#Setting explicit permissions for the action to avoid the default permissions 
which are `write-all` in case of pull_request_target event
+permissions:
+  contents: read
+
+jobs:
+  beam_UserRoles:
+    name: Apply user roles changes
+    runs-on: [self-hosted, ubuntu-20.04, main]
+    timeout-minutes: 30
+    steps:
+      - uses: actions/checkout@v4
+      - name: Setup gcloud
+        uses: google-github-actions/setup-gcloud@v2
+
+      - name: Setup Python
+        uses: actions/setup-python@v4
+        with:
+          python-version: '3.13'
+
+      - name: Install Python dependencies
+        working-directory: ./infra/keys
+        run: |
+          python -m pip install --upgrade pip
+          pip install -r requirements.txt
+      
+      - name: Run Service Account Key Management
+        working-directory: ./infra/keys
+        run: python keys.py --cron-dry-run
diff --git a/infra/keys/README.md b/infra/keys/README.md
index 17d66f0eb0e..f7bf1d927c1 100644
--- a/infra/keys/README.md
+++ b/infra/keys/README.md
@@ -44,7 +44,17 @@ This section is intended for developers who need to manage 
service accounts and
 
 ### How it works
 
-This module provide a script `keys.py` that allows you to manage the service 
accounts and their keys. This script is run as a cron job daily to ensure that 
service account keys are rotated regularly and that the latest keys are 
available for authorized users. It is also run every time a PR is merged over 
the `keys.yaml` file to ensure that the service accounts, their keys and 
authorized users are up to date.
+This module provide a script `keys.py` that allows you to manage the service 
accounts and their keys. This script is run automatically by a GitHub Action to 
ensure that service account keys are rotated regularly and that the latest keys 
are available for authorized users. It is also run every time a PR is merged 
over the `keys.yaml` file to ensure that the service accounts, their keys and 
authorized users are up to date.
+
+### Automation with GitHub Actions
+
+A GitHub Actions workflow is set up to automate the execution of the `keys.py` 
script. This workflow is defined in 
`.github/workflows/beam_Infrastructure_ServiceAccountKeys.yml`.
+
+The workflow is triggered automatically on the following events:
+- A push to the `main` branch that includes changes to the 
`infra/keys/keys.yaml` file.
+- A manual trigger (`workflow_dispatch`) by a developer.
+
+When triggered, the workflow runs the `python keys.py --cron` command, which 
handles the creation and rotation of service account keys based on the 
configuration in `keys.yaml` and `config.yaml`.
 
 ### Files
 
@@ -88,6 +98,6 @@ This will rotate keys for all service accounts defined in the 
`keys.yaml` file t
 To retrieve the latest service account key, use the `--get-key` flag:
 
 ```bash
-python main.py --get-key my-service-account
+python keys.py --get-key my-service-account
 ```
 
diff --git a/infra/keys/config.yaml b/infra/keys/config.yaml
index 7b3e4d8e31d..e28c3a58953 100644
--- a/infra/keys/config.yaml
+++ b/infra/keys/config.yaml
@@ -30,9 +30,5 @@ grace_period: 2
 
 # LOGGING
 
-# The secret rotation logs will be stored in a GCP bucket.
-bucket_name: "beam-terraform-infra-state"
-# The log file will be stored in the bucket with the following prefix
-log_file_prefix: "beam-secrets-rotation"
 # Logging level for the secrets rotation service
 logging_level: "DEBUG" # Options: DEBUG, INFO, WARNING, ERROR, CRITICAL
diff --git a/infra/keys/gcp_logger.py b/infra/keys/gcp_logger.py
deleted file mode 100644
index ad34995c6ec..00000000000
--- a/infra/keys/gcp_logger.py
+++ /dev/null
@@ -1,153 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import io
-import logging
-from google.cloud import storage
-from typing import List
-from datetime import datetime
-
-class GCSLogHandler(logging.Handler):
-    """Custom logging handler that writes logs to Google Cloud Storage."""
-
-    bucket_name: str
-    log_file_prefix: str
-    project_id: str
-    storage_client: storage.Client
-    bucket: storage.Bucket
-    log_buffer: io.StringIO
-    log_entries_count: int
-    max_buffer_size: int
-    session_logs: List[str]
-    blob_name: str
-    
-    def __init__(self, bucket_name: str, log_file_prefix: str, project_id: 
str) -> None:
-        """
-        Initialize the GCS log handler.
-        
-        Args:
-            bucket_name (str): Name of the GCS bucket
-            log_file_prefix (str): Prefix for log file names
-            project_id (str): Google Cloud project ID
-        """
-        super().__init__()
-        self.bucket_name = bucket_name
-        self.log_file_prefix = log_file_prefix
-        self.project_id = project_id
-        self.storage_client = storage.Client(project=project_id)
-        self.bucket = self.storage_client.bucket(bucket_name)
-        self.log_buffer = io.StringIO()
-        self.log_entries_count = 0
-        self.max_buffer_size = 100
-        
-        # Create a session-based filename that stays consistent
-        session_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
-        self.blob_name = 
f"{self.log_file_prefix}/secret_service_{session_timestamp}.log"
-        self.session_logs = []  # Store all logs for the session
-
-    def emit(self, record: logging.LogRecord) -> None:
-        """
-        Emit a log record to the buffer and flush to GCS when buffer is full.
-        
-        Args:
-            record: Log record to emit
-        """
-        try:
-            if self.log_buffer.closed:
-                return
-                
-            log_entry = self.format(record)
-            self.log_buffer.write(log_entry + '\n')
-            self.session_logs.append(log_entry)
-            self.log_entries_count += 1
-            
-            # Flush to GCS when buffer reaches max size
-            if self.log_entries_count >= self.max_buffer_size:
-                self.flush_to_gcs()
-                
-        except Exception:
-            self.handleError(record)
-
-    def flush_to_gcs(self) -> None:
-        """Flush the log buffer to Google Cloud Storage."""
-        if self.log_entries_count == 0:
-            return
-            
-        try:
-            # Upload all session logs to GCS
-            blob = self.bucket.blob(self.blob_name)
-            complete_log_content = '\n'.join(self.session_logs) + '\n'
-            blob.upload_from_string(complete_log_content, 
content_type='text/plain')
-            
-            if not self.log_buffer.closed:
-                self.log_buffer = io.StringIO()
-            self.log_entries_count = 0
-            
-        except Exception as e:
-            # If GCS upload fails, print to stderr to avoid losing logs
-            import sys
-            print(f"Failed to upload logs to GCS: {e}", file=sys.stderr)
-            print(f"Log file would be: {self.blob_name}", file=sys.stderr)
-    
-    def close(self) -> None:
-        """Close the handler and flush remaining logs to GCS."""
-        self.flush_to_gcs()
-        super().close()
-
-    def get_log_file_path(self) -> str:
-        """Get the GCS path for the current log file."""
-        return f"gs://{self.bucket_name}/{self.blob_name}"
-
-class GCPLogger(logging.Logger):
-    """Custom logger that uses GCSLogHandler to log messages to Google Cloud 
Storage."""
-
-    def __init__(self, name: str, logging_level: str, bucket_name: str, 
log_file_prefix: str, project_id: str) -> None:
-        """
-        Initialize the GCP logger.
-        
-        Args:
-            name (str): Name of the logger
-            bucket_name (str): Name of the GCS bucket
-            log_file_prefix (str): Prefix for log file names
-            project_id (str): Google Cloud project ID
-        """
-        super().__init__(name)
-
-        # Set the logging level, defaulting to INFO if not provided
-        logging_level = logging_level.upper() if isinstance(logging_level, 
str) else logging_level
-        if not logging_level or logging_level not in logging._nameToLevel:
-            logging_level = 'INFO'
-        self.setLevel(logging_level)
-        
-        # Clear any existing handlers to avoid duplicates
-        self.handlers.clear()
-
-        # Add console handler
-        console_handler = logging.StreamHandler()
-        console_formatter = logging.Formatter('%(asctime)s - %(name)s - 
%(levelname)s - %(message)s')
-        console_handler.setFormatter(console_formatter)
-        self.addHandler(console_handler)
-
-        # Add GCS handler for persistent logging
-        try:
-            gcs_handler = GCSLogHandler(bucket_name, log_file_prefix, 
project_id)
-            gcs_formatter = logging.Formatter('%(asctime)s - %(name)s - 
%(levelname)s - %(funcName)s:%(lineno)d - %(message)s')
-            gcs_handler.setFormatter(gcs_formatter)
-            self.addHandler(gcs_handler)
-            self.info("GCS logging handler initialized successfully")
-            self.info(f"Logs will be stored at: 
{gcs_handler.get_log_file_path()}")
-        except Exception as e:
-            self.warning(f"Failed to initialize GCS logging handler: {e}. Logs 
will only be written to console.")
-
diff --git a/infra/keys/keys.py b/infra/keys/keys.py
index 2a01f34b8cc..c06307ecb24 100644
--- a/infra/keys/keys.py
+++ b/infra/keys/keys.py
@@ -21,7 +21,6 @@ import sys
 from typing import List, TypedDict
 from google.api_core.exceptions import PermissionDenied
 # Importing custom modules
-from gcp_logger import GCSLogHandler, GCPLogger
 from secret_manager import SecretManager
 from service_account import ServiceAccountManager
 
@@ -34,8 +33,6 @@ class ConfigDict(TypedDict):
     project_id: str
     rotation_interval: int
     grace_period: int
-    bucket_name: str
-    log_file_prefix: str
     logging_level: str
 
 class AuthorizedUser(TypedDict):
@@ -57,7 +54,7 @@ def load_config() -> ConfigDict:
     if not config:
         raise ValueError("Configuration file is empty or invalid.")
 
-    required_keys = set(['project_id', 'rotation_interval', 'grace_period', 
'bucket_name', 'log_file_prefix'])
+    required_keys = set(['project_id', 'rotation_interval', 'grace_period'])
     missing_keys = required_keys - config.keys()
     if missing_keys:
         raise ValueError(f"Missing required configuration keys: {', 
'.join(missing_keys)}")
@@ -66,10 +63,6 @@ def load_config() -> ConfigDict:
         raise ValueError("Configuration 'rotation_interval' must be a positive 
integer.")
     if not isinstance(config['grace_period'], int) or config['grace_period'] < 
0:
         raise ValueError("Configuration 'grace_period' must be a non-negative 
integer.")
-    if not isinstance(config['bucket_name'], str) or not 
config['bucket_name'].strip():
-        raise ValueError("Configuration 'bucket_name' must be a non-empty 
string.")
-    if not isinstance(config['log_file_prefix'], str) or not 
config['log_file_prefix'].strip():
-        raise ValueError("Configuration 'log_file_prefix' must be a non-empty 
string.")
     if 'logging_level' in config:
         if not isinstance(config['logging_level'], str) or 
config['logging_level'].strip() not in logging._nameToLevel:
             raise ValueError("Configuration 'logging_level' must be one of: " 
+ ", ".join(logging._nameToLevel.keys()))
@@ -105,6 +98,7 @@ def parse_arguments():
         epilog="""
 Examples:
   python keys.py --cron                 # Run key rotation for accounts that 
need it, ran only by cron job
+  python keys.py --cron-dry-run         # Run a dry run of the key rotation 
cron job
   python keys.py --get-key my-sa        # Get the latest key for service 
account 'my-sa', ran by users
         """
     )
@@ -115,6 +109,11 @@ Examples:
         action='store_true',
         help='Run the cron job to rotate keys that require rotation'
     )
+    group.add_argument(
+        '--cron-dry-run',
+        action='store_true',
+        help='Run a dry run of the cron job to see what would be rotated'
+    )
     group.add_argument(
         '--get-key',
         metavar='ACCOUNT_ID',
@@ -159,18 +158,20 @@ class KeyService:
         self.project_id = config['project_id']
         rotation_interval = config['rotation_interval']
         grace_period = config['grace_period']
-        bucket_name = config['bucket_name']
-        log_file_prefix = config['log_file_prefix']
         logging_level = config['logging_level']
 
         self.service_accounts = service_accounts_config['service_accounts']
         self.enable_logging = enable_logging
 
+        self.logger = logging.getLogger("KeyService")
         if self.enable_logging:
-            self.logger = GCPLogger("KeyService", logging_level, bucket_name, 
log_file_prefix, self.project_id)
+            self.logger.setLevel(logging_level)
+            handler = logging.StreamHandler(sys.stdout)
+            formatter = logging.Formatter('%(asctime)s - %(name)s - 
%(levelname)s - %(message)s')
+            handler.setFormatter(formatter)
+            self.logger.addHandler(handler)
         else:
             # Create a null logger that doesn't actually log anything
-            self.logger = logging.getLogger("KeyService")
             self.logger.setLevel(logging.CRITICAL + 1)  # Set to a level 
higher than CRITICAL to disable all logging
 
         self.secret_manager_client = SecretManager(self.project_id, 
self.logger, rotation_interval, grace_period)
@@ -179,28 +180,6 @@ class KeyService:
         if self.enable_logging:
             self.logger.info(f"Initialized KeyService for project: 
{self.project_id}")
 
-    def __del__(self) -> None:
-        """Manually flush all logs to Google Cloud Storage."""
-        try:
-            if self.enable_logging:
-                for handler in self.logger.handlers:
-                    if isinstance(handler, GCSLogHandler):
-                        handler.flush_to_gcs()
-        except Exception:
-            pass
-
-    def cleanup(self) -> None:
-        """Explicit cleanup method to flush logs and close resources."""
-        try:
-            if self.enable_logging:
-                self.logger.info("KeyService cleanup: Flushing logs to Google 
Cloud Storage")
-                for handler in self.logger.handlers:
-                    if isinstance(handler, GCSLogHandler):
-                        handler.flush_to_gcs()
-        except Exception as e:
-            if self.enable_logging:
-                self.logger.error(f"Error during cleanup: {e}")
-
     def _start_all_service_accounts(self) -> None:
         """
         Reads the service accounts configuration and checks for service 
accounts.
@@ -208,7 +187,7 @@ class KeyService:
         1. If a service account exists and is managed, it checks if the secret 
exists and updates access if needed.
         2. If the service account exists but the secret does not, it creates 
the secret and clears the service account
               keys as now keys will be managed by the Secret Manager.
-        3. If neither the service account nor the secret exists , it creates 
both.
+        3. If neither the service account nor the secret exists, it creates 
and initializes both.
         4. If any other case is encountered, it logs an error and skips the 
account.
         """
 
@@ -257,7 +236,7 @@ class KeyService:
             except Exception as e:
                 self.logger.error(f"Error creating service account or secret 
for {account_id}: {e}")
 
-    def cron(self) -> None:
+    def cron(self, dry_run: bool = False) -> None:
         """
         Cron job to rotate service account keys and secrets.
 
@@ -268,20 +247,31 @@ class KeyService:
             1.1. If the key is due for rotation, it will rotate the key and 
update the secret in Secret Manager.
             1.2. If the key is not due for rotation, it will log that no 
action is needed.
         2. Check for keys that have expired the grace period and delete them 
from both the service account and Secret Manager.
+        
+        Args:
+            dry_run (bool): If True, the method will only log the actions that 
would be taken.
         """
 
-        self.logger.info("Starting cron job for service account key rotation")
-        self._start_all_service_accounts()
+        if dry_run:
+            self.logger.info("Starting cron job DRY RUN for service account 
key rotation")
+        else:
+            self.logger.info("Starting cron job for service account key 
rotation")
+        
+        if not dry_run:
+            self._start_all_service_accounts()
 
         for account in self.service_accounts:
             account_id = account['account_id']
             secret_name = f"{account_id}-key"
             try:
                 if 
self.secret_manager_client._is_key_rotation_due(secret_name):
-                    self.logger.info(f"Service account key for {account_id} is 
due for rotation, rotating key")
-                    new_key = 
self.service_account_manager.create_service_account_key(account_id)
-                    new_key_id = new_key.name.split('/')[-1]
-                    self.secret_manager_client.add_secret_version(secret_name, 
new_key_id, new_key.private_key_data)
+                    if dry_run:
+                        self.logger.info(f"[DRY RUN] Service account key for 
{account_id} is due for rotation, would rotate key.")
+                    else:
+                        self.logger.info(f"Service account key for 
{account_id} is due for rotation, rotating key")
+                        new_key = 
self.service_account_manager.create_service_account_key(account_id)
+                        new_key_id = new_key.name.split('/')[-1]
+                        
self.secret_manager_client.add_secret_version(secret_name, new_key_id, 
new_key.private_key_data)
                 else:
                     self.logger.debug(f"Service account key for {account_id} 
is not due for rotation")
             except Exception as e:
@@ -294,13 +284,19 @@ class KeyService:
         for secret_id, key_ids in keys_to_delete:
             try:
                 for key_id in key_ids:
-                    self.logger.info(f"Deleting expired key {key_id} for 
secret {secret_id}")
-                    
self.service_account_manager.delete_service_account_key(secret_id, key_id)
+                    if dry_run:
+                        self.logger.info(f"[DRY RUN] Would delete expired key 
{key_id} for secret {secret_id}")
+                    else:
+                        self.logger.info(f"Deleting expired key {key_id} for 
secret {secret_id}")
+                        
self.service_account_manager.delete_service_account_key(secret_id, key_id)
             except Exception as e:
                 self.logger.error(f"Error deleting expired keys for secret 
{secret_id}: {e}")
                 continue
 
-        self.logger.info("Cron job for service account key rotation completed")
+        if dry_run:
+            self.logger.info("Cron job DRY RUN for service account key 
rotation completed")
+        else:
+            self.logger.info("Cron job for service account key rotation 
completed")
 
     def get_latest_service_account_key(self, account_id: str) -> str:
         """
@@ -339,11 +335,13 @@ def main():
         config = load_config()
         service_accounts_config = load_service_accounts_config()
         
-        if args.cron:
-            print("Running cron job for key rotation...")
+        if args.cron or args.cron_dry_run:
+            is_dry_run = args.cron_dry_run
+            run_type = "dry run" if is_dry_run else "job"
+            print(f"Running cron {run_type} for key rotation...")
             key_service = KeyService(config, service_accounts_config)
-            key_service.cron()
-            print("Cron job completed successfully.")
+            key_service.cron(dry_run=is_dry_run)
+            print(f"Cron {run_type} completed successfully.")
             
         elif args.get_key:
             account_id = args.get_key
@@ -379,11 +377,6 @@ def main():
         print(f"An error occurred: {e}")
         logging.error(f"An error occurred: {e}")
         logging.error(f"Full traceback: {traceback.format_exc()}")
-        if key_service is not None:
-            try:
-                key_service.cleanup()
-            except:
-                pass
         sys.exit(1)
 
 if __name__ == "__main__":
diff --git a/infra/keys/keys.yaml b/infra/keys/keys.yaml
index 4e56770b546..269a2841d91 100644
--- a/infra/keys/keys.yaml
+++ b/infra/keys/keys.yaml
@@ -23,4 +23,4 @@
 #     - email: "[email protected]"
 #     - email: "[email protected]"
 
-service_accounts:
+service_accounts: []

Reply via email to