This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c4f0d9f004 Update system test example_emr to have logs (#30715)
c4f0d9f004 is described below
commit c4f0d9f0043a52f6b8cf3847ac24c524290c461b
Author: Vincent <[email protected]>
AuthorDate: Tue Apr 18 12:01:29 2023 -0600
Update system test example_emr to have logs (#30715)
---
tests/system/providers/amazon/aws/example_emr.py | 16 ++++++++++++++++
1 file changed, 16 insertions(+)
diff --git a/tests/system/providers/amazon/aws/example_emr.py
b/tests/system/providers/amazon/aws/example_emr.py
index 792b2b9742..06d8b28f45 100644
--- a/tests/system/providers/amazon/aws/example_emr.py
+++ b/tests/system/providers/amazon/aws/example_emr.py
@@ -33,6 +33,7 @@ from airflow.providers.amazon.aws.operators.emr import (
EmrModifyClusterOperator,
EmrTerminateJobFlowOperator,
)
+from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator,
S3DeleteBucketOperator
from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
@@ -126,12 +127,18 @@ with DAG(
tags=["example"],
) as dag:
test_context = sys_test_context_task()
+
env_id = test_context[ENV_ID_KEY]
config_name = f"{CONFIG_NAME}-{env_id}"
execution_role_arn = test_context[EXECUTION_ROLE_ARN_KEY]
+ s3_bucket = f"{env_id}-emr-bucket"
+
+ JOB_FLOW_OVERRIDES["LogUri"] = f"s3://{s3_bucket}/"
JOB_FLOW_OVERRIDES["SecurityConfiguration"] = config_name
JOB_FLOW_OVERRIDES["Instances"]["InstanceGroups"][0]["CustomAmiId"] =
get_ami_id()
+ create_s3_bucket = S3CreateBucketOperator(task_id="create_s3_bucket",
bucket_name=s3_bucket)
+
create_security_configuration = configure_security_config(config_name)
# [START howto_operator_emr_create_job_flow]
@@ -172,9 +179,17 @@ with DAG(
delete_security_configuration = delete_security_config(config_name)
+ delete_s3_bucket = S3DeleteBucketOperator(
+ task_id="delete_s3_bucket",
+ bucket_name=s3_bucket,
+ force_delete=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
chain(
# TEST SETUP
test_context,
+ create_s3_bucket,
create_security_configuration,
# TEST BODY
create_job_flow,
@@ -184,6 +199,7 @@ with DAG(
remove_cluster,
check_job_flow,
delete_security_configuration,
+ delete_s3_bucket,
)
from tests.system.utils.watcher import watcher