This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b4e82cf66f Update `example_redshift` and 
`example_redshift_s3_transfers` to use `RedshiftDataHook` instead of 
`RedshiftSQLHook` (#40970)
b4e82cf66f is described below

commit b4e82cf66fb4d833a25de5e2688b44e7b4ddf4bb
Author: Vincent <[email protected]>
AuthorDate: Wed Jul 24 14:08:06 2024 -0400

    Update `example_redshift` and `example_redshift_s3_transfers` to use 
`RedshiftDataHook` instead of `RedshiftSQLHook` (#40970)
---
 .../amazon/aws/transfers/redshift_to_s3.py         |  1 +
 .../amazon/aws/transfers/s3_to_redshift.py         |  1 +
 .../amazon/aws/transfers/test_redshift_to_s3.py    | 11 -----
 .../amazon/aws/transfers/test_s3_to_redshift.py    | 13 ------
 .../providers/amazon/aws/example_redshift.py       | 35 ----------------
 .../amazon/aws/example_redshift_s3_transfers.py    | 47 +++++++++-------------
 6 files changed, 20 insertions(+), 88 deletions(-)

diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py 
b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
index f6aafeba59..73578ea539 100644
--- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
@@ -84,6 +84,7 @@ class RedshiftToS3Operator(BaseOperator):
         "unload_options",
         "select_query",
         "redshift_conn_id",
+        "redshift_data_api_kwargs",
     )
     template_ext: Sequence[str] = (".sql",)
     template_fields_renderers = {"select_query": "sql"}
diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py 
b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
index 6418c111e2..161276b33c 100644
--- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
@@ -77,6 +77,7 @@ class S3ToRedshiftOperator(BaseOperator):
         "copy_options",
         "redshift_conn_id",
         "method",
+        "redshift_data_api_kwargs",
         "aws_conn_id",
     )
     template_ext: Sequence[str] = ()
diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py 
b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
index d025b4836f..d2af90a445 100644
--- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
@@ -364,17 +364,6 @@ class TestRedshiftToS3Transfer:
         assert extra["role_arn"] in unload_query
         assert_equal_ignore_multiple_spaces(mock_run.call_args.args[0], 
unload_query)
 
-    def test_template_fields_overrides(self):
-        assert RedshiftToS3Operator.template_fields == (
-            "s3_bucket",
-            "s3_key",
-            "schema",
-            "table",
-            "unload_options",
-            "select_query",
-            "redshift_conn_id",
-        )
-
     @pytest.mark.parametrize("param", ["sql", "parameters"])
     def test_invalid_param_in_redshift_data_api_kwargs(self, param):
         """
diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py 
b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
index 6e3cbb2a1c..cb5ef7fdb7 100644
--- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
+++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
@@ -381,19 +381,6 @@ class TestS3ToRedshiftTransfer:
         assert mock_run.call_count == 1
         assert_equal_ignore_multiple_spaces(actual_copy_query, 
expected_copy_query)
 
-    def test_template_fields_overrides(self):
-        assert S3ToRedshiftOperator.template_fields == (
-            "s3_bucket",
-            "s3_key",
-            "schema",
-            "table",
-            "column_list",
-            "copy_options",
-            "redshift_conn_id",
-            "method",
-            "aws_conn_id",
-        )
-
     def test_execute_unavailable_method(self):
         """
         Test execute unavailable method
diff --git a/tests/system/providers/amazon/aws/example_redshift.py 
b/tests/system/providers/amazon/aws/example_redshift.py
index 84be4c702c..cc88811bef 100644
--- a/tests/system/providers/amazon/aws/example_redshift.py
+++ b/tests/system/providers/amazon/aws/example_redshift.py
@@ -20,12 +20,8 @@ from __future__ import annotations
 
 from datetime import datetime
 
-from airflow import settings
-from airflow.decorators import task
-from airflow.models import Connection
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
-from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
 from airflow.providers.amazon.aws.operators.redshift_cluster import (
     RedshiftCreateClusterOperator,
     RedshiftCreateClusterSnapshotOperator,
@@ -36,7 +32,6 @@ from airflow.providers.amazon.aws.operators.redshift_cluster 
import (
 )
 from airflow.providers.amazon.aws.operators.redshift_data import 
RedshiftDataOperator
 from airflow.providers.amazon.aws.sensors.redshift_cluster import 
RedshiftClusterSensor
-from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
 
@@ -56,24 +51,6 @@ DB_NAME = "dev"
 POLL_INTERVAL = 10
 
 
-@task
-def create_connection(conn_id_name: str, cluster_id: str):
-    redshift_hook = RedshiftHook()
-    cluster_endpoint = 
redshift_hook.get_conn().describe_clusters(ClusterIdentifier=cluster_id)["Clusters"][0]
-    conn = Connection(
-        conn_id=conn_id_name,
-        conn_type="redshift",
-        host=cluster_endpoint["Endpoint"]["Address"],
-        login=DB_LOGIN,
-        password=DB_PASS,
-        port=cluster_endpoint["Endpoint"]["Port"],
-        schema=cluster_endpoint["DBName"],
-    )
-    session = settings.Session()
-    session.add(conn)
-    session.commit()
-
-
 with DAG(
     dag_id=DAG_ID,
     start_date=datetime(2021, 1, 1),
@@ -87,7 +64,6 @@ with DAG(
     cluster_subnet_group_name = test_context[CLUSTER_SUBNET_GROUP_KEY]
     redshift_cluster_identifier = f"{env_id}-redshift-cluster"
     redshift_cluster_snapshot_identifier = f"{env_id}-snapshot"
-    conn_id_name = f"{env_id}-conn-id"
     sg_name = f"{env_id}-sg"
 
     # [START howto_operator_redshift_cluster]
@@ -164,8 +140,6 @@ with DAG(
         timeout=60 * 30,
     )
 
-    set_up_connection = create_connection(conn_id_name, 
cluster_id=redshift_cluster_identifier)
-
     # [START howto_operator_redshift_data]
     create_table_redshift_data = RedshiftDataOperator(
         task_id="create_table_redshift_data",
@@ -201,13 +175,6 @@ with DAG(
         wait_for_completion=True,
     )
 
-    drop_table = SQLExecuteQueryOperator(
-        task_id="drop_table",
-        conn_id=conn_id_name,
-        sql="DROP TABLE IF EXISTS fruit",
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-
     # [START howto_operator_redshift_delete_cluster]
     delete_cluster = RedshiftDeleteClusterOperator(
         task_id="delete_cluster",
@@ -236,10 +203,8 @@ with DAG(
         wait_cluster_paused,
         resume_cluster,
         wait_cluster_available_after_resume,
-        set_up_connection,
         create_table_redshift_data,
         insert_data,
-        drop_table,
         delete_cluster_snapshot,
         delete_cluster,
     )
diff --git a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py 
b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
index 4fbf728fa8..0691046190 100644
--- a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
+++ b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
@@ -18,12 +18,8 @@ from __future__ import annotations
 
 from datetime import datetime
 
-from airflow import settings
-from airflow.decorators import task
-from airflow.models import Connection
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
-from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
 from airflow.providers.amazon.aws.operators.redshift_cluster import (
     RedshiftCreateClusterOperator,
     RedshiftDeleteClusterOperator,
@@ -75,24 +71,6 @@ SQL_DROP_TABLE = f"DROP TABLE IF EXISTS {REDSHIFT_TABLE};"
 DATA = "0, 'Airflow', 'testing'"
 
 
-@task
-def create_connection(conn_id_name: str, cluster_id: str):
-    redshift_hook = RedshiftHook()
-    cluster_endpoint = 
redshift_hook.get_conn().describe_clusters(ClusterIdentifier=cluster_id)["Clusters"][0]
-    conn = Connection(
-        conn_id=conn_id_name,
-        conn_type="redshift",
-        host=cluster_endpoint["Endpoint"]["Address"],
-        login=DB_LOGIN,
-        password=DB_PASS,
-        port=cluster_endpoint["Endpoint"]["Port"],
-        schema=cluster_endpoint["DBName"],
-    )
-    session = settings.Session()
-    session.add(conn)
-    session.commit()
-
-
 with DAG(
     dag_id=DAG_ID,
     start_date=datetime(2021, 1, 1),
@@ -105,7 +83,6 @@ with DAG(
     security_group_id = test_context[SECURITY_GROUP_KEY]
     cluster_subnet_group_name = test_context[CLUSTER_SUBNET_GROUP_KEY]
     redshift_cluster_identifier = f"{env_id}-redshift-cluster"
-    conn_id_name = f"{env_id}-conn-id"
     sg_name = f"{env_id}-sg"
     bucket_name = f"{env_id}-bucket"
 
@@ -134,8 +111,6 @@ with DAG(
         timeout=60 * 30,
     )
 
-    set_up_connection = create_connection(conn_id_name, 
cluster_id=redshift_cluster_identifier)
-
     create_object = S3CreateObjectOperator(
         task_id="create_object",
         s3_bucket=bucket_name,
@@ -165,7 +140,12 @@ with DAG(
     # [START howto_transfer_redshift_to_s3]
     transfer_redshift_to_s3 = RedshiftToS3Operator(
         task_id="transfer_redshift_to_s3",
-        redshift_conn_id=conn_id_name,
+        redshift_data_api_kwargs={
+            "database": DB_NAME,
+            "cluster_identifier": redshift_cluster_identifier,
+            "db_user": DB_LOGIN,
+            "wait_for_completion": True,
+        },
         s3_bucket=bucket_name,
         s3_key=S3_KEY,
         schema="PUBLIC",
@@ -182,7 +162,12 @@ with DAG(
     # [START howto_transfer_s3_to_redshift]
     transfer_s3_to_redshift = S3ToRedshiftOperator(
         task_id="transfer_s3_to_redshift",
-        redshift_conn_id=conn_id_name,
+        redshift_data_api_kwargs={
+            "database": DB_NAME,
+            "cluster_identifier": redshift_cluster_identifier,
+            "db_user": DB_LOGIN,
+            "wait_for_completion": True,
+        },
         s3_bucket=bucket_name,
         s3_key=S3_KEY_2,
         schema="PUBLIC",
@@ -194,7 +179,12 @@ with DAG(
     # [START howto_transfer_s3_to_redshift_multiple_keys]
     transfer_s3_to_redshift_multiple = S3ToRedshiftOperator(
         task_id="transfer_s3_to_redshift_multiple",
-        redshift_conn_id=conn_id_name,
+        redshift_data_api_kwargs={
+            "database": DB_NAME,
+            "cluster_identifier": redshift_cluster_identifier,
+            "db_user": DB_LOGIN,
+            "wait_for_completion": True,
+        },
         s3_bucket=bucket_name,
         s3_key=S3_KEY_PREFIX,
         schema="PUBLIC",
@@ -231,7 +221,6 @@ with DAG(
         create_bucket,
         create_cluster,
         wait_cluster_available,
-        set_up_connection,
         create_object,
         create_table_redshift_data,
         insert_data,

Reply via email to