vincbeck commented on code in PR #66736:
URL: https://github.com/apache/airflow/pull/66736#discussion_r3237046729
##########
providers/amazon/tests/system/amazon/aws/example_emr_eks.py:
##########
@@ -59,15 +60,13 @@
JOB_ROLE_ARN_KEY = "JOB_ROLE_ARN"
JOB_ROLE_NAME_KEY = "JOB_ROLE_NAME"
SUBNETS_KEY = "SUBNETS"
-UPDATE_TRUST_POLICY_WAIT_TIME_KEY = "UPDATE_TRUST_POLICY_WAIT_TIME"
sys_test_context_task = (
SystemTestContextBuilder()
.add_variable(ROLE_ARN_KEY)
.add_variable(JOB_ROLE_ARN_KEY)
.add_variable(JOB_ROLE_NAME_KEY)
.add_variable(SUBNETS_KEY, split_string=True)
- .add_variable(UPDATE_TRUST_POLICY_WAIT_TIME_KEY, optional=True,
default_value="10")
Review Comment:
Once merged, do not forget to remove this parameter from our internal
configuration as well
##########
providers/amazon/tests/system/amazon/aws/example_emr_eks.py:
##########
@@ -177,8 +176,71 @@ def update_trust_policy_execution_role(cluster_name,
cluster_namespace, role_nam
if build.returncode != 0:
raise RuntimeError(err)
- # Wait for IAM changes to propagate to avoid authentication failures
- time.sleep(int(wait_time))
+
+class TrustPolicyNotPropagatedError(Exception):
+ """Raised when the IAM trust policy has not yet propagated."""
+
+
+@task
+def wait_for_trust_policy_propagation(cluster_name, role_name):
+ """Validate that the IAM trust policy has propagated by checking the role's
+ trust policy contains the expected OIDC provider.
+
+ Uses exponential backoff retries (up to 5 minutes) instead of a fixed
sleep,
+ which avoids both wasting time when propagation is fast and failing when
it's slow.
+ """
+ log = logging.getLogger(__name__)
+
+ # Determine the expected OIDC provider ARN from the EKS cluster
+ eks_client = boto3.client("eks")
+ oidc_issuer_url =
eks_client.describe_cluster(name=cluster_name)["cluster"]["identity"]["oidc"]["issuer"]
+ oidc_issuer_endpoint = oidc_issuer_url.replace("https://", "")
+ account_id = boto3.client("sts").get_caller_identity()["Account"]
+ expected_oidc_provider_arn =
f"arn:aws:iam::{account_id}:oidc-provider/{oidc_issuer_endpoint}"
+
+ @retry(
+ retry=retry_if_exception_type(TrustPolicyNotPropagatedError),
+ wait=wait_exponential(multiplier=1, min=5, max=30),
+ stop=stop_after_delay(300),
+ reraise=True,
+ )
+ def _validate_trust_policy():
+ iam_client = boto3.client("iam")
+
+ # Step 1: Verify the trust policy document contains the expected OIDC
provider
+ role = iam_client.get_role(RoleName=role_name)["Role"]
+ trust_policy = role["AssumeRolePolicyDocument"]
+
+ has_oidc_statement = False
+ for statement in trust_policy.get("Statement", []):
+ if statement.get("Action") != "sts:AssumeRoleWithWebIdentity":
+ continue
+ principal = statement.get("Principal", {})
+ federated = principal.get("Federated", "")
+ if oidc_issuer_endpoint in federated:
+ has_oidc_statement = True
+ break
+
+ if not has_oidc_statement:
+ log.info(
+ "Trust policy does not yet contain OIDC provider %s,
retrying...",
+ expected_oidc_provider_arn,
+ )
+ raise TrustPolicyNotPropagatedError(
+ f"Trust policy for role {role_name} does not yet contain "
+ f"the expected OIDC provider: {expected_oidc_provider_arn}"
+ )
+
+ log.info("Trust policy document confirmed for role %s", role_name)
+
+ _validate_trust_policy()
+
+ # Brief buffer after IAM confirms the trust policy document — cross-service
+ # caches (EKS/EMR) may still serve the old policy for a few seconds.
+ import time
+
+ time.sleep(15)
Review Comment:
No longer needed then?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]