pabloem commented on code in PR #38992:
URL: https://github.com/apache/beam/pull/38992#discussion_r3425470258
##########
infra/enforcement/account_keys.py:
##########
@@ -259,15 +306,28 @@ def check_compliance(self) -> List[str]:
self.logger.info(f"No service account keys found in the
{self.service_account_keys_file}.")
compliance_issues = []
+ live_service_accounts = self._get_all_live_service_accounts()
+ managed_secrets = self._get_all_live_managed_secrets()
# Check that all service accounts that exist are declared
- for service_account in self._get_all_live_service_accounts():
+ for service_account in live_service_accounts:
if self._denormalize_account_email(service_account) not in
[account["account_id"] for account in file_service_accounts]:
msg = f"Service account '{service_account}' is not declared in
the service account keys file."
compliance_issues.append(msg)
self.logger.warning(msg)
+ else:
+ iam_keys =
self._get_user_managed_keys_from_iam(service_account)
+ if iam_keys:
+ secret_name =
f"{self._denormalize_account_email(service_account)}-key"
+ legal_keys = []
+ if secret_name in managed_secrets:
+ legal_keys =
self._get_legal_keys_from_secret_manager(secret_name)
+ rogue_keys = set(iam_keys) - set(legal_keys)
+ for rogue_key in rogue_keys:
+ msg = f"SECURITY ALERT: Rogue key '{rogue_key}'
detected on account '{service_account}'. this key was created outside the
rotation system."
Review Comment:
let's not call it 'rogue' - let's call it 'unmanaged'. and instead of
'rotation system', let's call it "Beam's service account management system (in
`infra/keys/`)".
##########
infra/enforcement/account_keys.py:
##########
@@ -106,6 +106,53 @@ def _denormalize_username(self, username: str) -> str:
return username.split(":", 1)[1].strip().lower()
return username
+ def _get_user_managed_keys_from_iam(self, account_email: str) -> List[str]:
+ """"
+ Retrieves the list of user-managed keys for a given service account
from IAM.
+
+ Args:
+ account_email (str): The email of the service account to retrieve
keys for.
+
+ Returns:
+ List[str]: A list of key IDs for the user-managed keys associated
with the service account
+ """
+ request = types.ListServiceAccountKeysRequest()
+ request.name =
f"projects/{self.project_id}/serviceAccounts/{account_email}"
+ request.key_types =
[types.ListServiceAccountKeysRequest.KeyType.USER_MANAGED]
+
+ try:
+ response =
self.service_account_client.list_service_account_keys(request=request)
+ return [key.name.split("/")[-1] for key in response.keys]
+ except Exception as e:
+ self.logger.error(f"Failed to retrieve keys for service account
'{account_email}': {e}")
+ return []
+
+ def _get_legal_keys_from_secret_manager(self, secret_name: str) ->
List[str]:
+ """
+ Retrieves the list of legal keys for a given service account from
Secret Manager.
Review Comment:
what does legal mean here? Can you define 'lega' or improve the naming?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]