This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 025119e9f3 AWS system test example_emr_eks: create OIDC identity
provider (#31652)
025119e9f3 is described below
commit 025119e9f3cfba66bb2cf2735e5859e9a7a17d7a
Author: Vincent <[email protected]>
AuthorDate: Wed May 31 16:55:11 2023 -0400
AWS system test example_emr_eks: create OIDC identity provider (#31652)
---
.../system/providers/amazon/aws/example_emr_eks.py | 48 ++++++++++++++++++----
1 file changed, 40 insertions(+), 8 deletions(-)
diff --git a/tests/system/providers/amazon/aws/example_emr_eks.py
b/tests/system/providers/amazon/aws/example_emr_eks.py
index 16d1f3bc24..b6118c8520 100644
--- a/tests/system/providers/amazon/aws/example_emr_eks.py
+++ b/tests/system/providers/amazon/aws/example_emr_eks.py
@@ -70,6 +70,22 @@ print(s)
"""
+@task
+def create_launch_template(template_name: str):
+ # This launch template enables IMDSv2.
+ boto3.client("ec2").create_launch_template(
+ LaunchTemplateName=template_name,
+ LaunchTemplateData={
+ "MetadataOptions": {"HttpEndpoint": "enabled", "HttpTokens":
"required"},
+ },
+ )
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_launch_template(template_name: str):
+
boto3.client("ec2").delete_launch_template(LaunchTemplateName=template_name)
+
+
@task
def enable_access_emr_on_eks(cluster, ns):
# Install eksctl and enable access for EMR on EKS
@@ -93,6 +109,24 @@ def enable_access_emr_on_eks(cluster, ns):
raise RuntimeError(err)
+@task
+def create_iam_oidc_identity_provider(cluster):
+ # Create an IAM OIDC identity provider
+ # See
https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/setting-up-enable-IAM.html
+ command = f"eksctl utils associate-iam-oidc-provider --cluster {cluster}
--approve"
+
+ build = subprocess.Popen(
+ command,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ _, err = build.communicate()
+
+ if build.returncode != 0:
+ raise RuntimeError(err)
+
+
@task
def get_execution_role_name() -> str:
return boto3.client("sts").get_caller_identity()["Arn"].split("/")[-2]
@@ -162,6 +196,7 @@ with DAG(
virtual_cluster_name = f"{env_id}-virtual-cluster"
nodegroup_name = f"{env_id}-nodegroup"
eks_namespace = "default"
+ launch_template_name = f"{env_id}-launch-template"
# [START howto_operator_emr_eks_config]
job_driver_arg = {
@@ -213,8 +248,6 @@ with DAG(
poke_interval=10,
)
- emr_access_on_eks = enable_access_emr_on_eks(eks_cluster_name,
eks_namespace)
-
# [START howto_operator_emr_eks_create_cluster]
create_emr_eks_cluster = EmrEksCreateClusterOperator(
task_id="create_emr_eks_cluster",
@@ -224,10 +257,6 @@ with DAG(
)
# [END howto_operator_emr_eks_create_cluster]
- trust_policy_update = update_trust_policy_execution_role(
- eks_cluster_name, eks_namespace, get_execution_role_name()
- )
-
# [START howto_operator_emr_container]
job_starter = EmrContainerOperator(
task_id="start_job",
@@ -276,10 +305,12 @@ with DAG(
test_context,
create_bucket,
upload_s3_file,
+ create_launch_template(launch_template_name),
create_cluster_and_nodegroup,
await_create_nodegroup,
- emr_access_on_eks,
- trust_policy_update,
+ enable_access_emr_on_eks(eks_cluster_name, eks_namespace),
+ create_iam_oidc_identity_provider(eks_cluster_name),
+ update_trust_policy_execution_role(eks_cluster_name, eks_namespace,
get_execution_role_name()),
# TEST BODY
create_emr_eks_cluster,
job_starter,
@@ -288,6 +319,7 @@ with DAG(
delete_virtual_cluster(str(create_emr_eks_cluster.output)),
delete_eks_cluster,
await_delete_eks_cluster,
+ delete_launch_template(launch_template_name),
delete_bucket,
)