This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ef5eebdb26 Fix AWS system tests (#36091)
ef5eebdb26 is described below

commit ef5eebdb26ca9ddb49c529625660b72b6c9b55b4
Author: Vincent <[email protected]>
AuthorDate: Wed Dec 6 15:18:34 2023 -0500

    Fix AWS system tests (#36091)
---
 .../amazon/aws/operators/redshift_cluster.py       | 29 ++++++++++
 .../providers/amazon/aws/example_redshift.py       | 60 +++++----------------
 .../amazon/aws/example_redshift_s3_transfers.py    | 60 +++++----------------
 .../providers/amazon/aws/example_s3_to_sql.py      | 55 +++++--------------
 .../providers/amazon/aws/example_sql_to_s3.py      | 61 ++++++----------------
 5 files changed, 83 insertions(+), 182 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py 
b/airflow/providers/amazon/aws/operators/redshift_cluster.py
index 3ad8c1ae66..fdeddf42a0 100644
--- a/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -103,8 +103,37 @@ class RedshiftCreateClusterOperator(BaseOperator):
         "cluster_identifier",
         "cluster_type",
         "node_type",
+        "master_username",
+        "master_user_password",
+        "cluster_type",
+        "db_name",
         "number_of_nodes",
+        "cluster_security_groups",
         "vpc_security_group_ids",
+        "cluster_subnet_group_name",
+        "availability_zone",
+        "preferred_maintenance_window",
+        "cluster_parameter_group_name",
+        "automated_snapshot_retention_period",
+        "manual_snapshot_retention_period",
+        "port",
+        "cluster_version",
+        "allow_version_upgrade",
+        "publicly_accessible",
+        "encrypted",
+        "hsm_client_certificate_identifier",
+        "hsm_configuration_identifier",
+        "elastic_ip",
+        "tags",
+        "kms_key_id",
+        "enhanced_vpc_routing",
+        "additional_info",
+        "iam_roles",
+        "maintenance_track_name",
+        "snapshot_schedule_identifier",
+        "availability_zone_relocation",
+        "aqua_configuration_status",
+        "default_iam_role_arn",
     )
     ui_color = "#eeaa11"
     ui_fgcolor = "#ffffff"
diff --git a/tests/system/providers/amazon/aws/example_redshift.py 
b/tests/system/providers/amazon/aws/example_redshift.py
index 70da271fdf..84be4c702c 100644
--- a/tests/system/providers/amazon/aws/example_redshift.py
+++ b/tests/system/providers/amazon/aws/example_redshift.py
@@ -20,8 +20,6 @@ from __future__ import annotations
 
 from datetime import datetime
 
-import boto3
-
 from airflow import settings
 from airflow.decorators import task
 from airflow.models import Connection
@@ -41,22 +39,22 @@ from airflow.providers.amazon.aws.sensors.redshift_cluster 
import RedshiftCluste
 from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
-from tests.system.providers.amazon.aws.utils.ec2 import get_default_vpc_id
 
 DAG_ID = "example_redshift"
+
+# Externally fetched variables:
+SECURITY_GROUP_KEY = "SECURITY_GROUP"
+CLUSTER_SUBNET_GROUP_KEY = "CLUSTER_SUBNET_GROUP"
+
+sys_test_context_task = (
+    
SystemTestContextBuilder().add_variable(SECURITY_GROUP_KEY).add_variable(CLUSTER_SUBNET_GROUP_KEY).build()
+)
+
 DB_LOGIN = "adminuser"
 DB_PASS = "MyAmazonPassword1"
 DB_NAME = "dev"
 POLL_INTERVAL = 10
 
-IP_PERMISSION = {
-    "FromPort": -1,
-    "IpProtocol": "All",
-    "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "Test description"}],
-}
-
-sys_test_context_task = SystemTestContextBuilder().build()
-
 
 @task
 def create_connection(conn_id_name: str, cluster_id: str):
@@ -76,28 +74,6 @@ def create_connection(conn_id_name: str, cluster_id: str):
     session.commit()
 
 
-@task
-def setup_security_group(sec_group_name: str, ip_permissions: list[dict], 
vpc_id: str):
-    client = boto3.client("ec2")
-    security_group = client.create_security_group(
-        Description="Redshift-system-test", GroupName=sec_group_name, 
VpcId=vpc_id
-    )
-    client.get_waiter("security_group_exists").wait(
-        GroupIds=[security_group["GroupId"]],
-        GroupNames=[sec_group_name],
-        WaiterConfig={"Delay": 15, "MaxAttempts": 4},
-    )
-    client.authorize_security_group_ingress(
-        GroupId=security_group["GroupId"], GroupName=sec_group_name, 
IpPermissions=ip_permissions
-    )
-    return security_group["GroupId"]
-
-
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def delete_security_group(sec_group_id: str, sec_group_name: str):
-    boto3.client("ec2").delete_security_group(GroupId=sec_group_id, 
GroupName=sec_group_name)
-
-
 with DAG(
     dag_id=DAG_ID,
     start_date=datetime(2021, 1, 1),
@@ -107,21 +83,20 @@ with DAG(
 ) as dag:
     test_context = sys_test_context_task()
     env_id = test_context[ENV_ID_KEY]
+    security_group_id = test_context[SECURITY_GROUP_KEY]
+    cluster_subnet_group_name = test_context[CLUSTER_SUBNET_GROUP_KEY]
     redshift_cluster_identifier = f"{env_id}-redshift-cluster"
     redshift_cluster_snapshot_identifier = f"{env_id}-snapshot"
     conn_id_name = f"{env_id}-conn-id"
     sg_name = f"{env_id}-sg"
 
-    get_vpc_id = get_default_vpc_id()
-
-    set_up_sg = setup_security_group(sg_name, [IP_PERMISSION], get_vpc_id)
-
     # [START howto_operator_redshift_cluster]
     create_cluster = RedshiftCreateClusterOperator(
         task_id="create_cluster",
         cluster_identifier=redshift_cluster_identifier,
-        vpc_security_group_ids=[set_up_sg],
-        publicly_accessible=True,
+        vpc_security_group_ids=[security_group_id],
+        cluster_subnet_group_name=cluster_subnet_group_name,
+        publicly_accessible=False,
         cluster_type="single-node",
         node_type="dc2.large",
         master_username=DB_LOGIN,
@@ -249,14 +224,9 @@ with DAG(
     )
     # [END howto_operator_redshift_delete_cluster_snapshot]
 
-    delete_sg = delete_security_group(
-        sec_group_id=set_up_sg,
-        sec_group_name=sg_name,
-    )
     chain(
         # TEST SETUP
         test_context,
-        set_up_sg,
         # TEST BODY
         create_cluster,
         wait_cluster_available,
@@ -272,8 +242,6 @@ with DAG(
         drop_table,
         delete_cluster_snapshot,
         delete_cluster,
-        # TEST TEARDOWN
-        delete_sg,
     )
 
     from tests.system.utils.watcher import watcher
diff --git a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py 
b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
index 8d45390860..db6160d93f 100644
--- a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
+++ b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
@@ -18,8 +18,6 @@ from __future__ import annotations
 
 from datetime import datetime
 
-import boto3
-
 from airflow import settings
 from airflow.decorators import task
 from airflow.models import Connection
@@ -42,20 +40,21 @@ from airflow.providers.amazon.aws.transfers.s3_to_redshift 
import S3ToRedshiftOp
 from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
-from tests.system.providers.amazon.aws.utils.ec2 import get_default_vpc_id
 
 DAG_ID = "example_redshift_to_s3"
 
+# Externally fetched variables:
+SECURITY_GROUP_KEY = "SECURITY_GROUP"
+CLUSTER_SUBNET_GROUP_KEY = "CLUSTER_SUBNET_GROUP"
+
+sys_test_context_task = (
+    
SystemTestContextBuilder().add_variable(SECURITY_GROUP_KEY).add_variable(CLUSTER_SUBNET_GROUP_KEY).build()
+)
+
 DB_LOGIN = "adminuser"
 DB_PASS = "MyAmazonPassword1"
 DB_NAME = "dev"
 
-IP_PERMISSION = {
-    "FromPort": -1,
-    "IpProtocol": "All",
-    "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "Test description"}],
-}
-
 S3_KEY = "s3_output_"
 S3_KEY_2 = "s3_key_2"
 S3_KEY_PREFIX = "s3_k"
@@ -76,9 +75,6 @@ SQL_DROP_TABLE = f"DROP TABLE IF EXISTS {REDSHIFT_TABLE};"
 DATA = "0, 'Airflow', 'testing'"
 
 
-sys_test_context_task = SystemTestContextBuilder().build()
-
-
 @task
 def create_connection(conn_id_name: str, cluster_id: str):
     redshift_hook = RedshiftHook()
@@ -97,28 +93,6 @@ def create_connection(conn_id_name: str, cluster_id: str):
     session.commit()
 
 
-@task
-def setup_security_group(sec_group_name: str, ip_permissions: list[dict], 
vpc_id: str):
-    client = boto3.client("ec2")
-    security_group = client.create_security_group(
-        Description="Redshift-system-test", GroupName=sec_group_name, 
VpcId=vpc_id
-    )
-    client.get_waiter("security_group_exists").wait(
-        GroupIds=[security_group["GroupId"]],
-        GroupNames=[sec_group_name],
-        WaiterConfig={"Delay": 15, "MaxAttempts": 4},
-    )
-    client.authorize_security_group_ingress(
-        GroupId=security_group["GroupId"], GroupName=sec_group_name, 
IpPermissions=ip_permissions
-    )
-    return security_group["GroupId"]
-
-
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def delete_security_group(sec_group_id: str, sec_group_name: str):
-    boto3.client("ec2").delete_security_group(GroupId=sec_group_id, 
GroupName=sec_group_name)
-
-
 with DAG(
     dag_id=DAG_ID,
     start_date=datetime(2021, 1, 1),
@@ -128,15 +102,13 @@ with DAG(
 ) as dag:
     test_context = sys_test_context_task()
     env_id = test_context[ENV_ID_KEY]
+    security_group_id = test_context[SECURITY_GROUP_KEY]
+    cluster_subnet_group_name = test_context[CLUSTER_SUBNET_GROUP_KEY]
     redshift_cluster_identifier = f"{env_id}-redshift-cluster"
     conn_id_name = f"{env_id}-conn-id"
     sg_name = f"{env_id}-sg"
     bucket_name = f"{env_id}-bucket"
 
-    get_vpc_id = get_default_vpc_id()
-
-    set_up_sg = setup_security_group(sg_name, [IP_PERMISSION], get_vpc_id)
-
     create_bucket = S3CreateBucketOperator(
         task_id="s3_create_bucket",
         bucket_name=bucket_name,
@@ -145,8 +117,9 @@ with DAG(
     create_cluster = RedshiftCreateClusterOperator(
         task_id="create_cluster",
         cluster_identifier=redshift_cluster_identifier,
-        vpc_security_group_ids=[set_up_sg],
-        publicly_accessible=True,
+        vpc_security_group_ids=[security_group_id],
+        cluster_subnet_group_name=cluster_subnet_group_name,
+        publicly_accessible=False,
         cluster_type="single-node",
         node_type="dc2.large",
         master_username=DB_LOGIN,
@@ -235,11 +208,6 @@ with DAG(
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_sg = delete_security_group(
-        sec_group_id=set_up_sg,
-        sec_group_name=sg_name,
-    )
-
     delete_bucket = S3DeleteBucketOperator(
         task_id="delete_bucket",
         bucket_name=bucket_name,
@@ -250,7 +218,6 @@ with DAG(
     chain(
         # TEST SETUP
         test_context,
-        set_up_sg,
         create_bucket,
         create_cluster,
         wait_cluster_available,
@@ -266,7 +233,6 @@ with DAG(
         # TEST TEARDOWN
         drop_table,
         delete_cluster,
-        delete_sg,
         delete_bucket,
     )
 
diff --git a/tests/system/providers/amazon/aws/example_s3_to_sql.py 
b/tests/system/providers/amazon/aws/example_s3_to_sql.py
index 42eb46d1b4..2312271ce8 100644
--- a/tests/system/providers/amazon/aws/example_s3_to_sql.py
+++ b/tests/system/providers/amazon/aws/example_s3_to_sql.py
@@ -18,8 +18,6 @@ from __future__ import annotations
 
 from datetime import datetime
 
-import boto3
-
 from airflow import settings
 from airflow.decorators import task
 from airflow.models import Connection
@@ -41,10 +39,15 @@ from airflow.providers.amazon.aws.transfers.s3_to_sql 
import S3ToSqlOperator
 from airflow.providers.common.sql.operators.sql import 
SQLExecuteQueryOperator, SQLTableCheckOperator
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
-from tests.system.providers.amazon.aws.utils.ec2 import get_default_vpc_id
 from tests.system.utils.watcher import watcher
 
-sys_test_context_task = SystemTestContextBuilder().build()
+# Externally fetched variables:
+SECURITY_GROUP_KEY = "SECURITY_GROUP"
+CLUSTER_SUBNET_GROUP_KEY = "CLUSTER_SUBNET_GROUP"
+
+sys_test_context_task = (
+    
SystemTestContextBuilder().add_variable(SECURITY_GROUP_KEY).add_variable(CLUSTER_SUBNET_GROUP_KEY).build()
+)
 
 DAG_ID = "example_s3_to_sql"
 
@@ -52,12 +55,6 @@ DB_LOGIN = "adminuser"
 DB_PASS = "MyAmazonPassword1"
 DB_NAME = "dev"
 
-IP_PERMISSION = {
-    "FromPort": -1,
-    "IpProtocol": "All",
-    "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "Test description"}],
-}
-
 SQL_TABLE_NAME = "cocktails"
 SQL_COLUMN_LIST = ["cocktail_id", "cocktail_name", "base_spirit"]
 SAMPLE_DATA = r"""1,Caipirinha,Cachaca
@@ -83,26 +80,6 @@ def create_connection(conn_id_name: str, cluster_id: str):
     session.commit()
 
 
-@task
-def setup_security_group(sec_group_name: str, ip_permissions: list[dict], 
vpc_id: str):
-    client = boto3.client("ec2")
-    security_group = client.create_security_group(
-        Description="Redshift-system-test", GroupName=sec_group_name, 
VpcId=vpc_id
-    )
-    client.get_waiter("security_group_exists").wait(
-        GroupIds=[security_group["GroupId"]], GroupNames=[sec_group_name]
-    )
-    client.authorize_security_group_ingress(
-        GroupId=security_group["GroupId"], GroupName=sec_group_name, 
IpPermissions=ip_permissions
-    )
-    return security_group["GroupId"]
-
-
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def delete_security_group(sec_group_id: str, sec_group_name: str):
-    boto3.client("ec2").delete_security_group(GroupId=sec_group_id, 
GroupName=sec_group_name)
-
-
 with DAG(
     dag_id=DAG_ID,
     start_date=datetime(2023, 1, 1),
@@ -112,21 +89,20 @@ with DAG(
 ) as dag:
     test_context = sys_test_context_task()
     env_id = test_context[ENV_ID_KEY]
+    security_group_id = test_context[SECURITY_GROUP_KEY]
+    cluster_subnet_group_name = test_context[CLUSTER_SUBNET_GROUP_KEY]
     conn_id_name = f"{env_id}-conn-id"
     redshift_cluster_identifier = f"{env_id}-redshift-cluster"
     sg_name = f"{env_id}-sg"
     s3_bucket_name = f"{env_id}-bucket"
     s3_key = f"{env_id}/files/cocktail_list.csv"
 
-    get_vpc_id = get_default_vpc_id()
-
-    set_up_sg = setup_security_group(sg_name, [IP_PERMISSION], get_vpc_id)
-
     create_cluster = RedshiftCreateClusterOperator(
         task_id="create_cluster",
         cluster_identifier=redshift_cluster_identifier,
-        vpc_security_group_ids=[set_up_sg],
-        publicly_accessible=True,
+        vpc_security_group_ids=[security_group_id],
+        cluster_subnet_group_name=cluster_subnet_group_name,
+        publicly_accessible=False,
         cluster_type="single-node",
         node_type="dc2.large",
         master_username=DB_LOGIN,
@@ -250,15 +226,9 @@ with DAG(
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_sg = delete_security_group(
-        sec_group_id=set_up_sg,
-        sec_group_name=sg_name,
-    )
-
     chain(
         # TEST SETUP
         test_context,
-        set_up_sg,
         create_cluster,
         wait_cluster_available,
         set_up_connection,
@@ -274,7 +244,6 @@ with DAG(
         delete_s3_objects,
         delete_s3_bucket,
         delete_cluster,
-        delete_sg,
     )
 
     list(dag.tasks) >> watcher()
diff --git a/tests/system/providers/amazon/aws/example_sql_to_s3.py 
b/tests/system/providers/amazon/aws/example_sql_to_s3.py
index cff8072d74..813420ca15 100644
--- a/tests/system/providers/amazon/aws/example_sql_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_sql_to_s3.py
@@ -20,8 +20,6 @@ from __future__ import annotations
 
 from datetime import datetime
 
-import boto3
-
 from airflow import settings
 from airflow.decorators import task
 from airflow.models import Connection
@@ -38,19 +36,22 @@ from airflow.providers.amazon.aws.transfers.sql_to_s3 
import SqlToS3Operator
 from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
-from tests.system.providers.amazon.aws.utils.ec2 import get_default_vpc_id
 
 DAG_ID = "example_sql_to_s3"
+
+# Externally fetched variables:
+SECURITY_GROUP_KEY = "SECURITY_GROUP"
+CLUSTER_SUBNET_GROUP_KEY = "CLUSTER_SUBNET_GROUP"
+
+sys_test_context_task = (
+    
SystemTestContextBuilder().add_variable(SECURITY_GROUP_KEY).add_variable(CLUSTER_SUBNET_GROUP_KEY).build()
+)
+
+
 DB_LOGIN = "adminuser"
 DB_PASS = "MyAmazonPassword1"
 DB_NAME = "dev"
 
-IP_PERMISSION = {
-    "FromPort": -1,
-    "IpProtocol": "All",
-    "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "Test description"}],
-}
-
 REDSHIFT_TABLE = "test_table"
 SQL_QUERY = f"SELECT * FROM {REDSHIFT_TABLE}"
 
@@ -65,9 +66,6 @@ SQL_CREATE_TABLE = f"""
 SQL_INSERT_DATA = f"INSERT INTO {REDSHIFT_TABLE} VALUES ( 1, 'Banana', 
'Yellow');"
 
 
-sys_test_context_task = SystemTestContextBuilder().build()
-
-
 @task
 def create_connection(conn_id_name: str, cluster_id: str):
     redshift_hook = RedshiftHook()
@@ -86,28 +84,6 @@ def create_connection(conn_id_name: str, cluster_id: str):
     session.commit()
 
 
-@task
-def setup_security_group(sec_group_name: str, ip_permissions: list[dict], 
vpc_id: str):
-    client = boto3.client("ec2")
-    security_group = client.create_security_group(
-        Description="Redshift-system-test", GroupName=sec_group_name, 
VpcId=vpc_id
-    )
-    client.get_waiter("security_group_exists").wait(
-        GroupIds=[security_group["GroupId"]],
-        GroupNames=[sec_group_name],
-        WaiterConfig={"Delay": 15, "MaxAttempts": 4},
-    )
-    client.authorize_security_group_ingress(
-        GroupId=security_group["GroupId"], GroupName=sec_group_name, 
IpPermissions=ip_permissions
-    )
-    return security_group["GroupId"]
-
-
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def delete_security_group(sec_group_id: str, sec_group_name: str):
-    boto3.client("ec2").delete_security_group(GroupId=sec_group_id, 
GroupName=sec_group_name)
-
-
 with DAG(
     dag_id=DAG_ID,
     start_date=datetime(2021, 1, 1),
@@ -117,6 +93,8 @@ with DAG(
 ) as dag:
     test_context = sys_test_context_task()
     env_id = test_context[ENV_ID_KEY]
+    security_group_id = test_context[SECURITY_GROUP_KEY]
+    cluster_subnet_group_name = test_context[CLUSTER_SUBNET_GROUP_KEY]
     redshift_cluster_identifier = f"{env_id}-redshift-cluster"
     conn_id_name = f"{env_id}-conn-id"
     sg_name = f"{env_id}-sg"
@@ -128,15 +106,12 @@ with DAG(
         bucket_name=bucket_name,
     )
 
-    get_vpc_id = get_default_vpc_id()
-
-    set_up_sg = setup_security_group(sg_name, [IP_PERMISSION], get_vpc_id)
-
     create_cluster = RedshiftCreateClusterOperator(
         task_id="create_cluster",
         cluster_identifier=redshift_cluster_identifier,
-        vpc_security_group_ids=[set_up_sg],
-        publicly_accessible=True,
+        vpc_security_group_ids=[security_group_id],
+        cluster_subnet_group_name=cluster_subnet_group_name,
+        publicly_accessible=False,
         cluster_type="single-node",
         node_type="dc2.large",
         master_username=DB_LOGIN,
@@ -201,15 +176,10 @@ with DAG(
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_sg = delete_security_group(
-        sec_group_id=set_up_sg,
-        sec_group_name=sg_name,
-    )
     chain(
         # TEST SETUP
         test_context,
         create_bucket,
-        set_up_sg,
         create_cluster,
         wait_cluster_available,
         set_up_connection,
@@ -221,7 +191,6 @@ with DAG(
         # TEST TEARDOWN
         delete_bucket,
         delete_cluster,
-        delete_sg,
     )
 
     from tests.system.utils.watcher import watcher

Reply via email to