This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ef5eebdb26 Fix AWS system tests (#36091)
ef5eebdb26 is described below
commit ef5eebdb26ca9ddb49c529625660b72b6c9b55b4
Author: Vincent <[email protected]>
AuthorDate: Wed Dec 6 15:18:34 2023 -0500
Fix AWS system tests (#36091)
---
.../amazon/aws/operators/redshift_cluster.py | 29 ++++++++++
.../providers/amazon/aws/example_redshift.py | 60 +++++----------------
.../amazon/aws/example_redshift_s3_transfers.py | 60 +++++----------------
.../providers/amazon/aws/example_s3_to_sql.py | 55 +++++--------------
.../providers/amazon/aws/example_sql_to_s3.py | 61 ++++++----------------
5 files changed, 83 insertions(+), 182 deletions(-)
diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py
b/airflow/providers/amazon/aws/operators/redshift_cluster.py
index 3ad8c1ae66..fdeddf42a0 100644
--- a/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -103,8 +103,37 @@ class RedshiftCreateClusterOperator(BaseOperator):
"cluster_identifier",
"cluster_type",
"node_type",
+ "master_username",
+ "master_user_password",
+ "cluster_type",
+ "db_name",
"number_of_nodes",
+ "cluster_security_groups",
"vpc_security_group_ids",
+ "cluster_subnet_group_name",
+ "availability_zone",
+ "preferred_maintenance_window",
+ "cluster_parameter_group_name",
+ "automated_snapshot_retention_period",
+ "manual_snapshot_retention_period",
+ "port",
+ "cluster_version",
+ "allow_version_upgrade",
+ "publicly_accessible",
+ "encrypted",
+ "hsm_client_certificate_identifier",
+ "hsm_configuration_identifier",
+ "elastic_ip",
+ "tags",
+ "kms_key_id",
+ "enhanced_vpc_routing",
+ "additional_info",
+ "iam_roles",
+ "maintenance_track_name",
+ "snapshot_schedule_identifier",
+ "availability_zone_relocation",
+ "aqua_configuration_status",
+ "default_iam_role_arn",
)
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"
diff --git a/tests/system/providers/amazon/aws/example_redshift.py
b/tests/system/providers/amazon/aws/example_redshift.py
index 70da271fdf..84be4c702c 100644
--- a/tests/system/providers/amazon/aws/example_redshift.py
+++ b/tests/system/providers/amazon/aws/example_redshift.py
@@ -20,8 +20,6 @@ from __future__ import annotations
from datetime import datetime
-import boto3
-
from airflow import settings
from airflow.decorators import task
from airflow.models import Connection
@@ -41,22 +39,22 @@ from airflow.providers.amazon.aws.sensors.redshift_cluster
import RedshiftCluste
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
-from tests.system.providers.amazon.aws.utils.ec2 import get_default_vpc_id
DAG_ID = "example_redshift"
+
+# Externally fetched variables:
+SECURITY_GROUP_KEY = "SECURITY_GROUP"
+CLUSTER_SUBNET_GROUP_KEY = "CLUSTER_SUBNET_GROUP"
+
+sys_test_context_task = (
+
SystemTestContextBuilder().add_variable(SECURITY_GROUP_KEY).add_variable(CLUSTER_SUBNET_GROUP_KEY).build()
+)
+
DB_LOGIN = "adminuser"
DB_PASS = "MyAmazonPassword1"
DB_NAME = "dev"
POLL_INTERVAL = 10
-IP_PERMISSION = {
- "FromPort": -1,
- "IpProtocol": "All",
- "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "Test description"}],
-}
-
-sys_test_context_task = SystemTestContextBuilder().build()
-
@task
def create_connection(conn_id_name: str, cluster_id: str):
@@ -76,28 +74,6 @@ def create_connection(conn_id_name: str, cluster_id: str):
session.commit()
-@task
-def setup_security_group(sec_group_name: str, ip_permissions: list[dict],
vpc_id: str):
- client = boto3.client("ec2")
- security_group = client.create_security_group(
- Description="Redshift-system-test", GroupName=sec_group_name,
VpcId=vpc_id
- )
- client.get_waiter("security_group_exists").wait(
- GroupIds=[security_group["GroupId"]],
- GroupNames=[sec_group_name],
- WaiterConfig={"Delay": 15, "MaxAttempts": 4},
- )
- client.authorize_security_group_ingress(
- GroupId=security_group["GroupId"], GroupName=sec_group_name,
IpPermissions=ip_permissions
- )
- return security_group["GroupId"]
-
-
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def delete_security_group(sec_group_id: str, sec_group_name: str):
- boto3.client("ec2").delete_security_group(GroupId=sec_group_id,
GroupName=sec_group_name)
-
-
with DAG(
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
@@ -107,21 +83,20 @@ with DAG(
) as dag:
test_context = sys_test_context_task()
env_id = test_context[ENV_ID_KEY]
+ security_group_id = test_context[SECURITY_GROUP_KEY]
+ cluster_subnet_group_name = test_context[CLUSTER_SUBNET_GROUP_KEY]
redshift_cluster_identifier = f"{env_id}-redshift-cluster"
redshift_cluster_snapshot_identifier = f"{env_id}-snapshot"
conn_id_name = f"{env_id}-conn-id"
sg_name = f"{env_id}-sg"
- get_vpc_id = get_default_vpc_id()
-
- set_up_sg = setup_security_group(sg_name, [IP_PERMISSION], get_vpc_id)
-
# [START howto_operator_redshift_cluster]
create_cluster = RedshiftCreateClusterOperator(
task_id="create_cluster",
cluster_identifier=redshift_cluster_identifier,
- vpc_security_group_ids=[set_up_sg],
- publicly_accessible=True,
+ vpc_security_group_ids=[security_group_id],
+ cluster_subnet_group_name=cluster_subnet_group_name,
+ publicly_accessible=False,
cluster_type="single-node",
node_type="dc2.large",
master_username=DB_LOGIN,
@@ -249,14 +224,9 @@ with DAG(
)
# [END howto_operator_redshift_delete_cluster_snapshot]
- delete_sg = delete_security_group(
- sec_group_id=set_up_sg,
- sec_group_name=sg_name,
- )
chain(
# TEST SETUP
test_context,
- set_up_sg,
# TEST BODY
create_cluster,
wait_cluster_available,
@@ -272,8 +242,6 @@ with DAG(
drop_table,
delete_cluster_snapshot,
delete_cluster,
- # TEST TEARDOWN
- delete_sg,
)
from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
index 8d45390860..db6160d93f 100644
--- a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
+++ b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
@@ -18,8 +18,6 @@ from __future__ import annotations
from datetime import datetime
-import boto3
-
from airflow import settings
from airflow.decorators import task
from airflow.models import Connection
@@ -42,20 +40,21 @@ from airflow.providers.amazon.aws.transfers.s3_to_redshift
import S3ToRedshiftOp
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
-from tests.system.providers.amazon.aws.utils.ec2 import get_default_vpc_id
DAG_ID = "example_redshift_to_s3"
+# Externally fetched variables:
+SECURITY_GROUP_KEY = "SECURITY_GROUP"
+CLUSTER_SUBNET_GROUP_KEY = "CLUSTER_SUBNET_GROUP"
+
+sys_test_context_task = (
+
SystemTestContextBuilder().add_variable(SECURITY_GROUP_KEY).add_variable(CLUSTER_SUBNET_GROUP_KEY).build()
+)
+
DB_LOGIN = "adminuser"
DB_PASS = "MyAmazonPassword1"
DB_NAME = "dev"
-IP_PERMISSION = {
- "FromPort": -1,
- "IpProtocol": "All",
- "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "Test description"}],
-}
-
S3_KEY = "s3_output_"
S3_KEY_2 = "s3_key_2"
S3_KEY_PREFIX = "s3_k"
@@ -76,9 +75,6 @@ SQL_DROP_TABLE = f"DROP TABLE IF EXISTS {REDSHIFT_TABLE};"
DATA = "0, 'Airflow', 'testing'"
-sys_test_context_task = SystemTestContextBuilder().build()
-
-
@task
def create_connection(conn_id_name: str, cluster_id: str):
redshift_hook = RedshiftHook()
@@ -97,28 +93,6 @@ def create_connection(conn_id_name: str, cluster_id: str):
session.commit()
-@task
-def setup_security_group(sec_group_name: str, ip_permissions: list[dict],
vpc_id: str):
- client = boto3.client("ec2")
- security_group = client.create_security_group(
- Description="Redshift-system-test", GroupName=sec_group_name,
VpcId=vpc_id
- )
- client.get_waiter("security_group_exists").wait(
- GroupIds=[security_group["GroupId"]],
- GroupNames=[sec_group_name],
- WaiterConfig={"Delay": 15, "MaxAttempts": 4},
- )
- client.authorize_security_group_ingress(
- GroupId=security_group["GroupId"], GroupName=sec_group_name,
IpPermissions=ip_permissions
- )
- return security_group["GroupId"]
-
-
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def delete_security_group(sec_group_id: str, sec_group_name: str):
- boto3.client("ec2").delete_security_group(GroupId=sec_group_id,
GroupName=sec_group_name)
-
-
with DAG(
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
@@ -128,15 +102,13 @@ with DAG(
) as dag:
test_context = sys_test_context_task()
env_id = test_context[ENV_ID_KEY]
+ security_group_id = test_context[SECURITY_GROUP_KEY]
+ cluster_subnet_group_name = test_context[CLUSTER_SUBNET_GROUP_KEY]
redshift_cluster_identifier = f"{env_id}-redshift-cluster"
conn_id_name = f"{env_id}-conn-id"
sg_name = f"{env_id}-sg"
bucket_name = f"{env_id}-bucket"
- get_vpc_id = get_default_vpc_id()
-
- set_up_sg = setup_security_group(sg_name, [IP_PERMISSION], get_vpc_id)
-
create_bucket = S3CreateBucketOperator(
task_id="s3_create_bucket",
bucket_name=bucket_name,
@@ -145,8 +117,9 @@ with DAG(
create_cluster = RedshiftCreateClusterOperator(
task_id="create_cluster",
cluster_identifier=redshift_cluster_identifier,
- vpc_security_group_ids=[set_up_sg],
- publicly_accessible=True,
+ vpc_security_group_ids=[security_group_id],
+ cluster_subnet_group_name=cluster_subnet_group_name,
+ publicly_accessible=False,
cluster_type="single-node",
node_type="dc2.large",
master_username=DB_LOGIN,
@@ -235,11 +208,6 @@ with DAG(
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_sg = delete_security_group(
- sec_group_id=set_up_sg,
- sec_group_name=sg_name,
- )
-
delete_bucket = S3DeleteBucketOperator(
task_id="delete_bucket",
bucket_name=bucket_name,
@@ -250,7 +218,6 @@ with DAG(
chain(
# TEST SETUP
test_context,
- set_up_sg,
create_bucket,
create_cluster,
wait_cluster_available,
@@ -266,7 +233,6 @@ with DAG(
# TEST TEARDOWN
drop_table,
delete_cluster,
- delete_sg,
delete_bucket,
)
diff --git a/tests/system/providers/amazon/aws/example_s3_to_sql.py
b/tests/system/providers/amazon/aws/example_s3_to_sql.py
index 42eb46d1b4..2312271ce8 100644
--- a/tests/system/providers/amazon/aws/example_s3_to_sql.py
+++ b/tests/system/providers/amazon/aws/example_s3_to_sql.py
@@ -18,8 +18,6 @@ from __future__ import annotations
from datetime import datetime
-import boto3
-
from airflow import settings
from airflow.decorators import task
from airflow.models import Connection
@@ -41,10 +39,15 @@ from airflow.providers.amazon.aws.transfers.s3_to_sql
import S3ToSqlOperator
from airflow.providers.common.sql.operators.sql import
SQLExecuteQueryOperator, SQLTableCheckOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
-from tests.system.providers.amazon.aws.utils.ec2 import get_default_vpc_id
from tests.system.utils.watcher import watcher
-sys_test_context_task = SystemTestContextBuilder().build()
+# Externally fetched variables:
+SECURITY_GROUP_KEY = "SECURITY_GROUP"
+CLUSTER_SUBNET_GROUP_KEY = "CLUSTER_SUBNET_GROUP"
+
+sys_test_context_task = (
+
SystemTestContextBuilder().add_variable(SECURITY_GROUP_KEY).add_variable(CLUSTER_SUBNET_GROUP_KEY).build()
+)
DAG_ID = "example_s3_to_sql"
@@ -52,12 +55,6 @@ DB_LOGIN = "adminuser"
DB_PASS = "MyAmazonPassword1"
DB_NAME = "dev"
-IP_PERMISSION = {
- "FromPort": -1,
- "IpProtocol": "All",
- "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "Test description"}],
-}
-
SQL_TABLE_NAME = "cocktails"
SQL_COLUMN_LIST = ["cocktail_id", "cocktail_name", "base_spirit"]
SAMPLE_DATA = r"""1,Caipirinha,Cachaca
@@ -83,26 +80,6 @@ def create_connection(conn_id_name: str, cluster_id: str):
session.commit()
-@task
-def setup_security_group(sec_group_name: str, ip_permissions: list[dict],
vpc_id: str):
- client = boto3.client("ec2")
- security_group = client.create_security_group(
- Description="Redshift-system-test", GroupName=sec_group_name,
VpcId=vpc_id
- )
- client.get_waiter("security_group_exists").wait(
- GroupIds=[security_group["GroupId"]], GroupNames=[sec_group_name]
- )
- client.authorize_security_group_ingress(
- GroupId=security_group["GroupId"], GroupName=sec_group_name,
IpPermissions=ip_permissions
- )
- return security_group["GroupId"]
-
-
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def delete_security_group(sec_group_id: str, sec_group_name: str):
- boto3.client("ec2").delete_security_group(GroupId=sec_group_id,
GroupName=sec_group_name)
-
-
with DAG(
dag_id=DAG_ID,
start_date=datetime(2023, 1, 1),
@@ -112,21 +89,20 @@ with DAG(
) as dag:
test_context = sys_test_context_task()
env_id = test_context[ENV_ID_KEY]
+ security_group_id = test_context[SECURITY_GROUP_KEY]
+ cluster_subnet_group_name = test_context[CLUSTER_SUBNET_GROUP_KEY]
conn_id_name = f"{env_id}-conn-id"
redshift_cluster_identifier = f"{env_id}-redshift-cluster"
sg_name = f"{env_id}-sg"
s3_bucket_name = f"{env_id}-bucket"
s3_key = f"{env_id}/files/cocktail_list.csv"
- get_vpc_id = get_default_vpc_id()
-
- set_up_sg = setup_security_group(sg_name, [IP_PERMISSION], get_vpc_id)
-
create_cluster = RedshiftCreateClusterOperator(
task_id="create_cluster",
cluster_identifier=redshift_cluster_identifier,
- vpc_security_group_ids=[set_up_sg],
- publicly_accessible=True,
+ vpc_security_group_ids=[security_group_id],
+ cluster_subnet_group_name=cluster_subnet_group_name,
+ publicly_accessible=False,
cluster_type="single-node",
node_type="dc2.large",
master_username=DB_LOGIN,
@@ -250,15 +226,9 @@ with DAG(
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_sg = delete_security_group(
- sec_group_id=set_up_sg,
- sec_group_name=sg_name,
- )
-
chain(
# TEST SETUP
test_context,
- set_up_sg,
create_cluster,
wait_cluster_available,
set_up_connection,
@@ -274,7 +244,6 @@ with DAG(
delete_s3_objects,
delete_s3_bucket,
delete_cluster,
- delete_sg,
)
list(dag.tasks) >> watcher()
diff --git a/tests/system/providers/amazon/aws/example_sql_to_s3.py
b/tests/system/providers/amazon/aws/example_sql_to_s3.py
index cff8072d74..813420ca15 100644
--- a/tests/system/providers/amazon/aws/example_sql_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_sql_to_s3.py
@@ -20,8 +20,6 @@ from __future__ import annotations
from datetime import datetime
-import boto3
-
from airflow import settings
from airflow.decorators import task
from airflow.models import Connection
@@ -38,19 +36,22 @@ from airflow.providers.amazon.aws.transfers.sql_to_s3
import SqlToS3Operator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
-from tests.system.providers.amazon.aws.utils.ec2 import get_default_vpc_id
DAG_ID = "example_sql_to_s3"
+
+# Externally fetched variables:
+SECURITY_GROUP_KEY = "SECURITY_GROUP"
+CLUSTER_SUBNET_GROUP_KEY = "CLUSTER_SUBNET_GROUP"
+
+sys_test_context_task = (
+
SystemTestContextBuilder().add_variable(SECURITY_GROUP_KEY).add_variable(CLUSTER_SUBNET_GROUP_KEY).build()
+)
+
+
DB_LOGIN = "adminuser"
DB_PASS = "MyAmazonPassword1"
DB_NAME = "dev"
-IP_PERMISSION = {
- "FromPort": -1,
- "IpProtocol": "All",
- "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "Test description"}],
-}
-
REDSHIFT_TABLE = "test_table"
SQL_QUERY = f"SELECT * FROM {REDSHIFT_TABLE}"
@@ -65,9 +66,6 @@ SQL_CREATE_TABLE = f"""
SQL_INSERT_DATA = f"INSERT INTO {REDSHIFT_TABLE} VALUES ( 1, 'Banana',
'Yellow');"
-sys_test_context_task = SystemTestContextBuilder().build()
-
-
@task
def create_connection(conn_id_name: str, cluster_id: str):
redshift_hook = RedshiftHook()
@@ -86,28 +84,6 @@ def create_connection(conn_id_name: str, cluster_id: str):
session.commit()
-@task
-def setup_security_group(sec_group_name: str, ip_permissions: list[dict],
vpc_id: str):
- client = boto3.client("ec2")
- security_group = client.create_security_group(
- Description="Redshift-system-test", GroupName=sec_group_name,
VpcId=vpc_id
- )
- client.get_waiter("security_group_exists").wait(
- GroupIds=[security_group["GroupId"]],
- GroupNames=[sec_group_name],
- WaiterConfig={"Delay": 15, "MaxAttempts": 4},
- )
- client.authorize_security_group_ingress(
- GroupId=security_group["GroupId"], GroupName=sec_group_name,
IpPermissions=ip_permissions
- )
- return security_group["GroupId"]
-
-
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def delete_security_group(sec_group_id: str, sec_group_name: str):
- boto3.client("ec2").delete_security_group(GroupId=sec_group_id,
GroupName=sec_group_name)
-
-
with DAG(
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
@@ -117,6 +93,8 @@ with DAG(
) as dag:
test_context = sys_test_context_task()
env_id = test_context[ENV_ID_KEY]
+ security_group_id = test_context[SECURITY_GROUP_KEY]
+ cluster_subnet_group_name = test_context[CLUSTER_SUBNET_GROUP_KEY]
redshift_cluster_identifier = f"{env_id}-redshift-cluster"
conn_id_name = f"{env_id}-conn-id"
sg_name = f"{env_id}-sg"
@@ -128,15 +106,12 @@ with DAG(
bucket_name=bucket_name,
)
- get_vpc_id = get_default_vpc_id()
-
- set_up_sg = setup_security_group(sg_name, [IP_PERMISSION], get_vpc_id)
-
create_cluster = RedshiftCreateClusterOperator(
task_id="create_cluster",
cluster_identifier=redshift_cluster_identifier,
- vpc_security_group_ids=[set_up_sg],
- publicly_accessible=True,
+ vpc_security_group_ids=[security_group_id],
+ cluster_subnet_group_name=cluster_subnet_group_name,
+ publicly_accessible=False,
cluster_type="single-node",
node_type="dc2.large",
master_username=DB_LOGIN,
@@ -201,15 +176,10 @@ with DAG(
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_sg = delete_security_group(
- sec_group_id=set_up_sg,
- sec_group_name=sg_name,
- )
chain(
# TEST SETUP
test_context,
create_bucket,
- set_up_sg,
create_cluster,
wait_cluster_available,
set_up_connection,
@@ -221,7 +191,6 @@ with DAG(
# TEST TEARDOWN
delete_bucket,
delete_cluster,
- delete_sg,
)
from tests.system.utils.watcher import watcher