This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6f57ee1256 Add capability of passing a bucket to `example_emr` system
test (#36192)
6f57ee1256 is described below
commit 6f57ee12564759892af1152606af5373d2b709c0
Author: Vincent <[email protected]>
AuthorDate: Tue Dec 12 16:11:36 2023 -0500
Add capability of passing a bucket to `example_emr` system test (#36192)
---
tests/system/providers/amazon/aws/example_emr.py | 66 ++++++++++++++--------
.../system/providers/amazon/aws/utils/__init__.py | 23 ++++++--
2 files changed, 61 insertions(+), 28 deletions(-)
diff --git a/tests/system/providers/amazon/aws/example_emr.py
b/tests/system/providers/amazon/aws/example_emr.py
index 32f10f6f1b..e131b18f3c 100644
--- a/tests/system/providers/amazon/aws/example_emr.py
+++ b/tests/system/providers/amazon/aws/example_emr.py
@@ -42,6 +42,7 @@ from tests.system.providers.amazon.aws.utils import
ENV_ID_KEY, SystemTestContex
DAG_ID = "example_emr"
CONFIG_NAME = "EMR Runtime Role Security Configuration"
EXECUTION_ROLE_ARN_KEY = "EXECUTION_ROLE_ARN"
+BUCKET_NAME_KEY = "BUCKET_NAME"
SECURITY_CONFIGURATION = {
"AuthorizationConfiguration": {
@@ -123,7 +124,12 @@ def get_step_id(step_ids: list):
return step_ids[0]
-sys_test_context_task =
SystemTestContextBuilder().add_variable(EXECUTION_ROLE_ARN_KEY).build()
+sys_test_context_task = (
+ SystemTestContextBuilder()
+ .add_variable(EXECUTION_ROLE_ARN_KEY)
+ .add_variable(BUCKET_NAME_KEY, optional=True)
+ .build()
+)
with DAG(
dag_id=DAG_ID,
@@ -137,14 +143,12 @@ with DAG(
env_id = test_context[ENV_ID_KEY]
config_name = f"{CONFIG_NAME}-{env_id}"
execution_role_arn = test_context[EXECUTION_ROLE_ARN_KEY]
- s3_bucket = f"{env_id}-emr-bucket"
+ s3_bucket = test_context[BUCKET_NAME_KEY] or f"{env_id}-emr-bucket"
JOB_FLOW_OVERRIDES["LogUri"] = f"s3://{s3_bucket}/"
JOB_FLOW_OVERRIDES["SecurityConfiguration"] = config_name
JOB_FLOW_OVERRIDES["Instances"]["InstanceGroups"][0]["CustomAmiId"] =
get_ami_id()
- create_s3_bucket = S3CreateBucketOperator(task_id="create_s3_bucket",
bucket_name=s3_bucket)
-
create_security_configuration = configure_security_config(config_name)
# [START howto_operator_emr_create_job_flow]
@@ -196,28 +200,42 @@ with DAG(
delete_security_configuration = delete_security_config(config_name)
- delete_s3_bucket = S3DeleteBucketOperator(
- task_id="delete_s3_bucket",
- bucket_name=s3_bucket,
- force_delete=True,
- trigger_rule=TriggerRule.ALL_DONE,
- )
+ # There are two options:
+ # - Pass the bucket name as an argument to the system test. This bucket
will then be used to store
+ # the EMR-related logs.
+ # - The test itself creates and delete the S3 bucket needed for this test.
+ create_s3_bucket: S3CreateBucketOperator | None = None
+ delete_s3_bucket: S3DeleteBucketOperator | None = None
+ if not test_context[BUCKET_NAME_KEY]:
+ create_s3_bucket = S3CreateBucketOperator(task_id="create_s3_bucket",
bucket_name=s3_bucket)
+ delete_s3_bucket = S3DeleteBucketOperator(
+ task_id="delete_s3_bucket",
+ bucket_name=s3_bucket,
+ force_delete=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
chain(
- # TEST SETUP
- test_context,
- create_s3_bucket,
- create_security_configuration,
- # TEST BODY
- create_job_flow,
- modify_cluster,
- add_steps,
- wait_for_step,
- # TEST TEARDOWN
- remove_cluster,
- check_job_flow,
- delete_security_configuration,
- delete_s3_bucket,
+ *[
+ task
+ for task in [
+ # TEST SETUP
+ test_context,
+ create_s3_bucket,
+ create_security_configuration,
+ # TEST BODY
+ create_job_flow,
+ modify_cluster,
+ add_steps,
+ wait_for_step,
+ # TEST TEARDOWN
+ remove_cluster,
+ check_job_flow,
+ delete_security_configuration,
+ delete_s3_bucket,
+ ]
+ if task is not None
+ ]
)
from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/amazon/aws/utils/__init__.py
b/tests/system/providers/amazon/aws/utils/__init__.py
index d86abd0486..1bdcbf656f 100644
--- a/tests/system/providers/amazon/aws/utils/__init__.py
+++ b/tests/system/providers/amazon/aws/utils/__init__.py
@@ -131,6 +131,7 @@ class Variable:
to_split: bool = False,
delimiter: str | None = None,
test_name: str | None = None,
+ optional: bool = False,
):
self.name = name
self.test_name = test_name
@@ -140,6 +141,8 @@ class Variable:
elif delimiter:
raise ValueError(f"Variable {name} has a delimiter but
split_string is set to False.")
+ self.optional = optional
+
def get_value(self):
if hasattr(self, "default_value"):
return self._format_value(
@@ -147,10 +150,13 @@ class Variable:
key=self.name,
default_value=self.default_value,
test_name=self.test_name,
+ optional=self.optional,
)
)
- return self._format_value(fetch_variable(key=self.name,
test_name=self.test_name))
+ return self._format_value(
+ fetch_variable(key=self.name, test_name=self.test_name,
optional=self.optional)
+ )
def set_default(self, default):
# Since 'None' is a potentially valid "default" value, we are only
creating this
@@ -182,6 +188,7 @@ class SystemTestContextBuilder:
variable_name: str,
split_string: bool = False,
delimiter: str | None = None,
+ optional: bool = False,
**kwargs,
):
"""Register a variable to fetch from environment or cloud parameter
store"""
@@ -193,6 +200,7 @@ class SystemTestContextBuilder:
to_split=split_string,
delimiter=delimiter,
test_name=self.test_name,
+ optional=optional,
)
# default_value is accepted via kwargs so that it is completely
optional and no
@@ -220,7 +228,12 @@ class SystemTestContextBuilder:
return variable_fetcher
-def fetch_variable(key: str, default_value: str | None = None, test_name: str
| None = None) -> str:
+def fetch_variable(
+ key: str,
+ default_value: str | None = None,
+ test_name: str | None = None,
+ optional: bool = False,
+) -> str | None:
"""
Given a Parameter name: first check for an existing Environment Variable,
then check SSM for a value. If neither are available, fall back on the
@@ -229,11 +242,13 @@ def fetch_variable(key: str, default_value: str | None =
None, test_name: str |
:param key: The name of the Parameter to fetch a value for.
:param default_value: The default value to use if no value can be found.
:param test_name: The system test name.
+ :param optional: Whether the variable is optional. If True, does not raise
`ValueError` if variable
+ does not exist
:return: The value of the parameter.
"""
value: str | None = os.getenv(key, _fetch_from_ssm(key, test_name)) or
default_value
- if not value:
+ if not optional and not value:
raise ValueError(NO_VALUE_MSG.format(key=key))
return value
@@ -249,7 +264,7 @@ def set_env_id() -> str:
:return: A valid System Test Environment ID.
"""
- env_id: str = fetch_variable(ENV_ID_ENVIRON_KEY, DEFAULT_ENV_ID)
+ env_id: str = str(fetch_variable(ENV_ID_ENVIRON_KEY, DEFAULT_ENV_ID))
env_id = _validate_env_id(env_id)
os.environ[ENV_ID_ENVIRON_KEY] = env_id