This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 956ea7c40e Redshift System Test (AIP-47) (#26187)
956ea7c40e is described below
commit 956ea7c40ebaa66963c2b5e7ed7eb290c5f392d8
Author: syedahsn <[email protected]>
AuthorDate: Wed Sep 21 09:49:37 2022 -0700
Redshift System Test (AIP-47) (#26187)
* Update example_redshift_sql.py to use SystemTestContextBuilder
Rebase to main
* Create Redshift Cluster to run SQL queries on, as well as a Security
Group to allow access
- Add template field to RedshiftHook
- Add template field to RedshiftCreateClusterOperator
- Add template field to RedshiftSQLOperator
- Make some changes requested by Dennis regarding formatting
* Combine Redshift SQL system test and Redshift Data Execute System test to
reduce resource creation overhead.
Change RedshiftDeleteClusterOperator to use boto3 API waiter
Change RedshiftPauseClusterOperator and RedshiftResumeClusterOperator to
raise Exception on failure
* Remove example_redshift_cluster.py - this is consolidated in
example_redshift.py
Remove documentation references to deleted files.
Use RedshiftCreateClusterSnapshotOperator and
RedshiftDeleteClusterSnapshotOperator instead of custom tasks.
* Add missing end-before pattern
* Add missing template fields, change function param names to pass checks
* fix redshift snapshot test_create_cluster_snapshot_with_wait test
* Make fixes to pass static checks
* Remove unused imports
---
.../providers/amazon/aws/hooks/redshift_cluster.py | 4 +-
.../amazon/aws/operators/redshift_cluster.py | 40 ++-
.../providers/amazon/aws/operators/redshift_sql.py | 5 +-
.../operators/redshift_cluster.rst | 14 +-
.../operators/redshift_data.rst | 2 +-
.../operators/redshift_sql.rst | 4 +-
.../amazon/aws/operators/test_redshift_cluster.py | 7 +-
.../providers/amazon/aws/example_redshift.py | 309 +++++++++++++++++++++
.../amazon/aws/example_redshift_cluster.py | 141 ----------
.../aws/example_redshift_data_execute_sql.py | 86 ------
.../providers/amazon/aws/example_redshift_sql.py | 98 -------
11 files changed, 349 insertions(+), 361 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/redshift_cluster.py
b/airflow/providers/amazon/aws/hooks/redshift_cluster.py
index be5d608808..f01eceb177 100644
--- a/airflow/providers/amazon/aws/hooks/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/hooks/redshift_cluster.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from typing import Any
+from typing import Any, Sequence
from botocore.exceptions import ClientError
@@ -36,6 +36,8 @@ class RedshiftHook(AwsBaseHook):
:param aws_conn_id: The Airflow connection used for AWS credentials.
"""
+ template_fields: Sequence[str] = ('cluster_identifier',)
+
def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = "redshift"
super().__init__(*args, **kwargs)
diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py
b/airflow/providers/amazon/aws/operators/redshift_cluster.py
index ba49434cab..3844ba2da4 100644
--- a/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -91,6 +91,7 @@ class RedshiftCreateClusterOperator(BaseOperator):
"cluster_type",
"node_type",
"number_of_nodes",
+ "vpc_security_group_ids",
)
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"
@@ -264,6 +265,11 @@ class RedshiftCreateClusterSnapshotOperator(BaseOperator):
The default connection id is ``aws_default``
"""
+ template_fields: Sequence[str] = (
+ "cluster_identifier",
+ "snapshot_identifier",
+ )
+
def __init__(
self,
*,
@@ -302,7 +308,6 @@ class RedshiftCreateClusterSnapshotOperator(BaseOperator):
if self.wait_for_completion:
self.redshift_hook.get_conn().get_waiter("snapshot_available").wait(
ClusterIdentifier=self.cluster_identifier,
- SnapshotIdentifier=self.snapshot_identifier,
WaiterConfig={
"Delay": self.poll_interval,
"MaxAttempts": self.max_attempt,
@@ -327,6 +332,11 @@ class RedshiftDeleteClusterSnapshotOperator(BaseOperator):
:param poll_interval: Time (in seconds) to wait between two consecutive
calls to check snapshot state
"""
+ template_fields: Sequence[str] = (
+ "cluster_identifier",
+ "snapshot_identifier",
+ )
+
def __init__(
self,
*,
@@ -395,9 +405,7 @@ class RedshiftResumeClusterOperator(BaseOperator):
self.log.info("Starting Redshift cluster %s",
self.cluster_identifier)
redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
else:
- self.log.warning(
- "Unable to resume cluster since cluster is currently in
status: %s", cluster_state
- )
+ raise Exception(f'Unable to resume cluster - cluster state is
{cluster_state}')
class RedshiftPauseClusterOperator(BaseOperator):
@@ -434,9 +442,7 @@ class RedshiftPauseClusterOperator(BaseOperator):
self.log.info("Pausing Redshift cluster %s",
self.cluster_identifier)
redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)
else:
- self.log.warning(
- "Unable to pause cluster since cluster is currently in status:
%s", cluster_state
- )
+ raise Exception(f'Unable to pause cluster - cluster state is
{cluster_state}')
class RedshiftDeleteClusterOperator(BaseOperator):
@@ -480,23 +486,15 @@ class RedshiftDeleteClusterOperator(BaseOperator):
self.poll_interval = poll_interval
def execute(self, context: Context):
- self.delete_cluster()
-
- if self.wait_for_completion:
- cluster_status: str = self.check_status()
- while cluster_status != "cluster_not_found":
- self.log.info(
- "cluster status is %s. Sleeping for %s seconds.",
cluster_status, self.poll_interval
- )
- time.sleep(self.poll_interval)
- cluster_status = self.check_status()
-
- def delete_cluster(self) -> None:
self.redshift_hook.delete_cluster(
cluster_identifier=self.cluster_identifier,
skip_final_cluster_snapshot=self.skip_final_cluster_snapshot,
final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier,
)
- def check_status(self) -> str:
- return self.redshift_hook.cluster_status(self.cluster_identifier)
+ if self.wait_for_completion:
+ waiter =
self.redshift_hook.get_conn().get_waiter('cluster_deleted')
+ waiter.wait(
+ ClusterIdentifier=self.cluster_identifier,
+ WaiterConfig={'Delay': self.poll_interval, 'MaxAttempts': 30},
+ )
diff --git a/airflow/providers/amazon/aws/operators/redshift_sql.py
b/airflow/providers/amazon/aws/operators/redshift_sql.py
index 446cff70a7..d5002e3748 100644
--- a/airflow/providers/amazon/aws/operators/redshift_sql.py
+++ b/airflow/providers/amazon/aws/operators/redshift_sql.py
@@ -44,7 +44,10 @@ class RedshiftSQLOperator(BaseOperator):
(default value: False)
"""
- template_fields: Sequence[str] = ('sql',)
+ template_fields: Sequence[str] = (
+ 'sql',
+ 'redshift_conn_id',
+ )
template_ext: Sequence[str] = ('.sql',)
# TODO: Remove renderer check when the provider has an Airflow 2.3+
requirement.
template_fields_renderers = {
diff --git
a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
index 6dfe6989d4..9902cdb4c9 100644
--- a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
@@ -40,7 +40,7 @@ Create an Amazon Redshift cluster
To create an Amazon Redshift Cluster with the specified parameters you can use
:class:`~airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftCreateClusterOperator`.
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift_cluster.py
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_cluster]
@@ -54,7 +54,7 @@ Resume an Amazon Redshift cluster
To resume a 'paused' Amazon Redshift cluster you can use
:class:`RedshiftResumeClusterOperator
<airflow.providers.amazon.aws.operators.redshift_cluster>`
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift_cluster.py
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_resume_cluster]
@@ -68,7 +68,7 @@ Pause an Amazon Redshift cluster
To pause an 'available' Amazon Redshift cluster you can use
:class:`RedshiftPauseClusterOperator
<airflow.providers.amazon.aws.operators.redshift_cluster>`
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift_cluster.py
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_pause_cluster]
@@ -82,7 +82,7 @@ Create an Amazon Redshift cluster snapshot
To create Amazon Redshift cluster snapshot you can use
:class:`RedshiftCreateClusterSnapshotOperator
<airflow.providers.amazon.aws.operators.redshift_cluster>`
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift_cluster.py
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_create_cluster_snapshot]
@@ -96,7 +96,7 @@ Delete an Amazon Redshift cluster snapshot
To delete Amazon Redshift cluster snapshot you can use
:class:`RedshiftDeleteClusterSnapshotOperator
<airflow.providers.amazon.aws.operators.redshift_cluster>`
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift_cluster.py
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_delete_cluster_snapshot]
@@ -110,7 +110,7 @@ Delete an Amazon Redshift cluster
To delete an Amazon Redshift cluster you can use
:class:`RedshiftDeleteClusterOperator
<airflow.providers.amazon.aws.operators.redshift_cluster>`
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift_cluster.py
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_delete_cluster]
@@ -127,7 +127,7 @@ Wait on an Amazon Redshift cluster state
To check the state of an Amazon Redshift Cluster until it reaches the target
state or another terminal
state you can use
:class:`~airflow.providers.amazon.aws.sensors.redshift_cluster.RedshiftClusterSensor`.
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift_cluster.py
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_redshift_cluster]
diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_data.rst
b/docs/apache-airflow-providers-amazon/operators/redshift_data.rst
index dea4472f00..9ec14dbaee 100644
--- a/docs/apache-airflow-providers-amazon/operators/redshift_data.rst
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_data.rst
@@ -43,7 +43,7 @@ statements against an Amazon Redshift cluster.
This differs from ``RedshiftSQLOperator`` in that it allows users to query and
retrieve data via the AWS API and avoid
the necessity of a Postgres connection.
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_data]
diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
b/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
index fc9809ac46..17d43f4097 100644
--- a/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
@@ -42,7 +42,7 @@ Execute a SQL query
To execute a SQL query against an Amazon Redshift cluster without using a
Postgres connection,
please check ``RedshiftDataOperator``.
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift_sql.py
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_sql]
@@ -51,7 +51,7 @@ please check ``RedshiftDataOperator``.
``RedshiftSQLOperator`` supports the ``parameters`` attribute which allows us
to dynamically pass
parameters into SQL statements.
-.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift_sql.py
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_redshift.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_sql_with_params]
diff --git a/tests/providers/amazon/aws/operators/test_redshift_cluster.py
b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
index 61223341e2..ee8d646fac 100644
--- a/tests/providers/amazon/aws/operators/test_redshift_cluster.py
+++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
@@ -149,7 +149,6 @@ class TestRedshiftCreateClusterSnapshotOperator:
create_snapshot.execute(None)
mock_get_conn.return_value.get_waiter.return_value.wait.assert_called_once_with(
ClusterIdentifier="test_cluster",
- SnapshotIdentifier="test_snapshot",
WaiterConfig={"Delay": 15, "MaxAttempts": 20},
)
@@ -223,7 +222,8 @@ class TestResumeClusterOperator:
redshift_operator = RedshiftResumeClusterOperator(
task_id="task_test", cluster_identifier="test_cluster",
aws_conn_id="aws_conn_test"
)
- redshift_operator.execute(None)
+ with pytest.raises(Exception):
+ redshift_operator.execute(None)
mock_get_conn.return_value.resume_cluster.assert_not_called()
@@ -253,7 +253,8 @@ class TestPauseClusterOperator:
redshift_operator = RedshiftPauseClusterOperator(
task_id="task_test", cluster_identifier="test_cluster",
aws_conn_id="aws_conn_test"
)
- redshift_operator.execute(None)
+ with pytest.raises(Exception):
+ redshift_operator.execute(None)
mock_get_conn.return_value.pause_cluster.assert_not_called()
diff --git a/tests/system/providers/amazon/aws/example_redshift.py
b/tests/system/providers/amazon/aws/example_redshift.py
new file mode 100644
index 0000000000..a67427cf18
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_redshift.py
@@ -0,0 +1,309 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime
+
+import boto3
+
+from airflow import DAG, settings
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.models.baseoperator import chain
+from airflow.operators.python import get_current_context
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
+from airflow.providers.amazon.aws.operators.redshift_cluster import (
+ RedshiftCreateClusterOperator,
+ RedshiftCreateClusterSnapshotOperator,
+ RedshiftDeleteClusterOperator,
+ RedshiftDeleteClusterSnapshotOperator,
+ RedshiftPauseClusterOperator,
+ RedshiftResumeClusterOperator,
+)
+from airflow.providers.amazon.aws.operators.redshift_data import
RedshiftDataOperator
+from airflow.providers.amazon.aws.operators.redshift_sql import
RedshiftSQLOperator
+from airflow.providers.amazon.aws.sensors.redshift_cluster import
RedshiftClusterSensor
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
+
+DAG_ID = 'example_redshift'
+DB_LOGIN = 'adminuser'
+DB_PASS = 'MyAmazonPassword1'
+DB_NAME = 'dev'
+POLL_INTERVAL = 10
+
+IP_PERMISSION = {
+ 'FromPort': -1,
+ 'IpProtocol': 'All',
+ 'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'Test description'}],
+}
+
+sys_test_context_task = SystemTestContextBuilder().build()
+
+
+@task
+def create_connection(conn_id_name: str, cluster_id: str):
+ redshift_hook = RedshiftHook()
+ cluster_endpoint =
redshift_hook.get_conn().describe_clusters(ClusterIdentifier=cluster_id)['Clusters'][0]
+ conn = Connection(
+ conn_id=conn_id_name,
+ conn_type='redshift',
+ host=cluster_endpoint['Endpoint']['Address'],
+ login=DB_LOGIN,
+ password=DB_PASS,
+ port=cluster_endpoint['Endpoint']['Port'],
+ schema=cluster_endpoint['DBName'],
+ )
+ session = settings.Session()
+ session.add(conn)
+ session.commit()
+
+
+@task
+def setup_security_group(sec_group_name: str, ip_permissions: list[dict]):
+ client = boto3.client('ec2')
+ vpc_id = client.describe_vpcs()['Vpcs'][0]['VpcId']
+ security_group = client.create_security_group(
+ Description='Redshift-system-test', GroupName=sec_group_name,
VpcId=vpc_id
+ )
+ client.get_waiter('security_group_exists').wait(
+ GroupIds=[security_group['GroupId']],
+ GroupNames=[sec_group_name],
+ WaiterConfig={'Delay': 15, 'MaxAttempts': 4},
+ )
+ client.authorize_security_group_ingress(
+ GroupId=security_group['GroupId'], GroupName=sec_group_name,
IpPermissions=ip_permissions
+ )
+ ti = get_current_context()['ti']
+ ti.xcom_push(key='security_group_id', value=security_group['GroupId'])
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_security_group(sec_group_id: str, sec_group_name: str):
+ boto3.client('ec2').delete_security_group(GroupId=sec_group_id,
GroupName=sec_group_name)
+
+
+@task
+def await_cluster_snapshot(cluster_identifier):
+ waiter = boto3.client('redshift').get_waiter('snapshot_available')
+ waiter.wait(
+ ClusterIdentifier=cluster_identifier,
+ WaiterConfig={
+ 'MaxAttempts': 100,
+ },
+ )
+
+
+with DAG(
+ dag_id=DAG_ID,
+ start_date=datetime(2021, 1, 1),
+ schedule='@once',
+ catchup=False,
+ tags=['example'],
+) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context[ENV_ID_KEY]
+ redshift_cluster_identifier = f'{env_id}-redshift-cluster'
+ redshift_cluster_snapshot_identifier = f'{env_id}-snapshot'
+ conn_id_name = f'{env_id}-conn-id'
+ sg_name = f'{env_id}-sg'
+
+ set_up_sg = setup_security_group(sec_group_name=sg_name,
ip_permissions=[IP_PERMISSION])
+
+ # [START howto_operator_redshift_cluster]
+ create_cluster = RedshiftCreateClusterOperator(
+ task_id="create_cluster",
+ cluster_identifier=redshift_cluster_identifier,
+ vpc_security_group_ids=[set_up_sg['security_group_id']],
+ publicly_accessible=True,
+ cluster_type="single-node",
+ node_type="dc2.large",
+ master_username=DB_LOGIN,
+ master_user_password=DB_PASS,
+ )
+ # [END howto_operator_redshift_cluster]
+
+ # [START howto_sensor_redshift_cluster]
+ wait_cluster_available = RedshiftClusterSensor(
+ task_id='wait_cluster_available',
+ cluster_identifier=redshift_cluster_identifier,
+ target_status='available',
+ poke_interval=15,
+ timeout=60 * 30,
+ )
+ # [END howto_sensor_redshift_cluster]
+
+ # [START howto_operator_redshift_create_cluster_snapshot]
+ create_cluster_snapshot = RedshiftCreateClusterSnapshotOperator(
+ task_id='create_cluster_snapshot',
+ cluster_identifier=redshift_cluster_identifier,
+ snapshot_identifier=redshift_cluster_snapshot_identifier,
+ poll_interval=30,
+ max_attempt=100,
+ retention_period=1,
+ wait_for_completion=True,
+ )
+ # [END howto_operator_redshift_create_cluster_snapshot]
+
+ # [START howto_operator_redshift_pause_cluster]
+ pause_cluster = RedshiftPauseClusterOperator(
+ task_id='pause_cluster',
+ cluster_identifier=redshift_cluster_identifier,
+ )
+ # [END howto_operator_redshift_pause_cluster]
+
+ wait_cluster_paused = RedshiftClusterSensor(
+ task_id='wait_cluster_paused',
+ cluster_identifier=redshift_cluster_identifier,
+ target_status='paused',
+ poke_interval=15,
+ timeout=60 * 30,
+ )
+
+ # [START howto_operator_redshift_resume_cluster]
+ resume_cluster = RedshiftResumeClusterOperator(
+ task_id='resume_cluster',
+ cluster_identifier=redshift_cluster_identifier,
+ )
+ # [END howto_operator_redshift_resume_cluster]
+
+ wait_cluster_available_after_resume = RedshiftClusterSensor(
+ task_id='wait_cluster_available_after_resume',
+ cluster_identifier=redshift_cluster_identifier,
+ target_status='available',
+ poke_interval=15,
+ timeout=60 * 30,
+ )
+
+ set_up_connection = create_connection(conn_id_name,
cluster_id=redshift_cluster_identifier)
+
+ # [START howto_operator_redshift_data]
+ create_table_redshift_data = RedshiftDataOperator(
+ task_id='create_table_redshift_data',
+ cluster_identifier=redshift_cluster_identifier,
+ database=DB_NAME,
+ db_user=DB_LOGIN,
+ sql="""
+ CREATE TABLE IF NOT EXISTS fruit (
+ fruit_id INTEGER,
+ name VARCHAR NOT NULL,
+ color VARCHAR NOT NULL
+ );
+ """,
+ poll_interval=POLL_INTERVAL,
+ await_result=True,
+ )
+ # [END howto_operator_redshift_data]
+
+ insert_data = RedshiftDataOperator(
+ task_id='insert_data',
+ cluster_identifier=redshift_cluster_identifier,
+ database=DB_NAME,
+ db_user=DB_LOGIN,
+ sql="""
+ INSERT INTO fruit VALUES ( 1, 'Banana', 'Yellow');
+ INSERT INTO fruit VALUES ( 2, 'Apple', 'Red');
+ INSERT INTO fruit VALUES ( 3, 'Lemon', 'Yellow');
+ INSERT INTO fruit VALUES ( 4, 'Grape', 'Purple');
+ INSERT INTO fruit VALUES ( 5, 'Pear', 'Green');
+ INSERT INTO fruit VALUES ( 6, 'Strawberry', 'Red');
+ """,
+ poll_interval=POLL_INTERVAL,
+ await_result=True,
+ )
+
+ # [START howto_operator_redshift_sql]
+ select_data = RedshiftSQLOperator(
+ task_id='select_data',
+ redshift_conn_id=conn_id_name,
+ sql="""CREATE TABLE more_fruit AS SELECT * FROM fruit;""",
+ )
+ # [END howto_operator_redshift_sql]
+
+ # [START howto_operator_redshift_sql_with_params]
+ select_filtered_data = RedshiftSQLOperator(
+ task_id='select_filtered_data',
+ redshift_conn_id=conn_id_name,
+ sql="""CREATE TABLE filtered_fruit AS SELECT * FROM fruit WHERE color
= '{{ params.color }}';""",
+ params={'color': 'Red'},
+ )
+ # [END howto_operator_redshift_sql_with_params]
+
+ drop_table = RedshiftSQLOperator(
+ task_id='drop_table',
+ redshift_conn_id=conn_id_name,
+ sql='DROP TABLE IF EXISTS fruit',
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ # [START howto_operator_redshift_delete_cluster]
+ delete_cluster = RedshiftDeleteClusterOperator(
+ task_id='delete_cluster',
+ cluster_identifier=redshift_cluster_identifier,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END howto_operator_redshift_delete_cluster]
+
+ # [START howto_operator_redshift_delete_cluster_snapshot]
+ delete_cluster_snapshot = RedshiftDeleteClusterSnapshotOperator(
+ task_id='delete_cluster_snapshot',
+ cluster_identifier=redshift_cluster_identifier,
+ snapshot_identifier=redshift_cluster_snapshot_identifier,
+ )
+ # [END howto_operator_redshift_delete_cluster_snapshot]
+
+ delete_sg = delete_security_group(
+ sec_group_id=set_up_sg['security_group_id'],
+ sec_group_name=sg_name,
+ )
+ chain(
+ # TEST SETUP
+ test_context,
+ set_up_sg,
+ # TEST BODY
+ create_cluster,
+ wait_cluster_available,
+ create_cluster_snapshot,
+ await_cluster_snapshot(redshift_cluster_identifier),
+ pause_cluster,
+ wait_cluster_paused,
+ resume_cluster,
+ wait_cluster_available_after_resume,
+ set_up_connection,
+ create_table_redshift_data,
+ insert_data,
+ [select_data, select_filtered_data],
+ drop_table,
+ delete_cluster_snapshot,
+ delete_cluster,
+ # TEST TEARDOWN
+ delete_sg,
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/amazon/aws/example_redshift_cluster.py
b/tests/system/providers/amazon/aws/example_redshift_cluster.py
deleted file mode 100644
index 15c380fa70..0000000000
--- a/tests/system/providers/amazon/aws/example_redshift_cluster.py
+++ /dev/null
@@ -1,141 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from datetime import datetime
-from os import getenv
-
-from airflow import DAG
-from airflow.models.baseoperator import chain
-from airflow.providers.amazon.aws.operators.redshift_cluster import (
- RedshiftCreateClusterOperator,
- RedshiftCreateClusterSnapshotOperator,
- RedshiftDeleteClusterOperator,
- RedshiftDeleteClusterSnapshotOperator,
- RedshiftPauseClusterOperator,
- RedshiftResumeClusterOperator,
-)
-from airflow.providers.amazon.aws.sensors.redshift_cluster import
RedshiftClusterSensor
-from airflow.utils.trigger_rule import TriggerRule
-from tests.system.providers.amazon.aws.utils import set_env_id
-
-ENV_ID = set_env_id()
-DAG_ID = 'example_redshift_cluster'
-REDSHIFT_CLUSTER_IDENTIFIER = getenv("REDSHIFT_CLUSTER_IDENTIFIER",
"redshift-cluster-1")
-REDSHIFT_CLUSTER_SNAPSHOT_IDENTIFIER = getenv(
- "REDSHIFT_CLUSTER_SNAPSHOT_IDENTIFIER", "redshift-cluster-snapshot-1"
-)
-
-with DAG(
- dag_id=DAG_ID,
- start_date=datetime(2021, 1, 1),
- schedule=None,
- catchup=False,
- tags=['example'],
-) as dag:
- # [START howto_operator_redshift_cluster]
- task_create_cluster = RedshiftCreateClusterOperator(
- task_id="redshift_create_cluster",
- cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
- cluster_type="single-node",
- node_type="dc2.large",
- master_username="adminuser",
- master_user_password="dummypass",
- )
- # [END howto_operator_redshift_cluster]
-
- # [START howto_sensor_redshift_cluster]
- task_wait_cluster_available = RedshiftClusterSensor(
- task_id='sensor_redshift_cluster_available',
- cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
- target_status='available',
- poke_interval=5,
- timeout=60 * 15,
- )
- # [END howto_sensor_redshift_cluster]
-
- # [START howto_operator_redshift_pause_cluster]
- task_pause_cluster = RedshiftPauseClusterOperator(
- task_id='redshift_pause_cluster',
- cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
- )
- # [END howto_operator_redshift_pause_cluster]
-
- task_wait_cluster_paused = RedshiftClusterSensor(
- task_id='sensor_redshift_cluster_paused',
- cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
- target_status='paused',
- poke_interval=5,
- timeout=60 * 15,
- )
-
- # [START howto_operator_redshift_resume_cluster]
- task_resume_cluster = RedshiftResumeClusterOperator(
- task_id='redshift_resume_cluster',
- cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
- )
- # [END howto_operator_redshift_resume_cluster]
-
- # [START howto_operator_redshift_create_cluster_snapshot]
- task_create_cluster_snapshot = RedshiftCreateClusterSnapshotOperator(
- task_id='create_cluster_snapshot',
- cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
- snapshot_identifier=REDSHIFT_CLUSTER_SNAPSHOT_IDENTIFIER,
- retention_period=1,
- wait_for_completion=True,
- )
- # [END howto_operator_redshift_create_cluster_snapshot]
-
- # [START howto_operator_redshift_delete_cluster_snapshot]
- task_delete_cluster_snapshot = RedshiftDeleteClusterSnapshotOperator(
- task_id='delete_cluster_snapshot',
- cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
- snapshot_identifier=REDSHIFT_CLUSTER_SNAPSHOT_IDENTIFIER,
- )
- # [END howto_operator_redshift_delete_cluster_snapshot]
-
- # [START howto_operator_redshift_delete_cluster]
- task_delete_cluster = RedshiftDeleteClusterOperator(
- task_id="delete_cluster",
- cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
- trigger_rule=TriggerRule.ALL_DONE,
- )
- # [END howto_operator_redshift_delete_cluster]
-
- chain(
- task_create_cluster,
- task_wait_cluster_available,
- task_pause_cluster,
- task_wait_cluster_paused,
- task_resume_cluster,
- task_create_cluster_snapshot,
- task_delete_cluster_snapshot,
- task_delete_cluster,
- )
-
- from tests.system.utils.watcher import watcher
-
- # This test needs watcher in order to properly mark success/failure
- # when "tearDown" task with trigger rule is part of the DAG
- list(dag.tasks) >> watcher()
-
-
-from tests.system.utils import get_test_run # noqa: E402
-
-# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git
a/tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py
b/tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py
deleted file mode 100644
index fdad7a1496..0000000000
--- a/tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py
+++ /dev/null
@@ -1,86 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from datetime import datetime
-
-from airflow import DAG
-from airflow.decorators import task
-from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
-from airflow.providers.amazon.aws.operators.redshift_data import
RedshiftDataOperator
-from tests.system.providers.amazon.aws.utils import fetch_variable, set_env_id
-
-ENV_ID = set_env_id()
-DAG_ID = 'example_redshift_data_execute_sql'
-REDSHIFT_CLUSTER_IDENTIFIER = fetch_variable("REDSHIFT_CLUSTER_IDENTIFIER",
"redshift_cluster_identifier")
-REDSHIFT_DATABASE = fetch_variable("REDSHIFT_DATABASE", "redshift_database")
-REDSHIFT_DATABASE_USER = fetch_variable("REDSHIFT_DATABASE_USER", "awsuser")
-
-REDSHIFT_QUERY = """
-SELECT table_schema,
- table_name
-FROM information_schema.tables
-WHERE table_schema NOT IN ('information_schema', 'pg_catalog')
- AND table_type = 'BASE TABLE'
-ORDER BY table_schema,
- table_name;
- """
-POLL_INTERVAL = 10
-
-
-@task(task_id="output_results")
-def output_query_results(statement_id):
- hook = RedshiftDataHook()
- resp = hook.conn.get_statement_result(
- Id=statement_id,
- )
-
- print(resp)
- return resp
-
-
-with DAG(
- dag_id=DAG_ID,
- start_date=datetime(2021, 1, 1),
- schedule=None,
- catchup=False,
- tags=['example'],
-) as dag:
- # [START howto_operator_redshift_data]
- task_query = RedshiftDataOperator(
- task_id='redshift_query',
- cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
- database=REDSHIFT_DATABASE,
- db_user=REDSHIFT_DATABASE_USER,
- sql=REDSHIFT_QUERY,
- poll_interval=POLL_INTERVAL,
- await_result=True,
- )
- # [END howto_operator_redshift_data]
-
- task_output = output_query_results(task_query.output)
-
- from tests.system.utils.watcher import watcher
-
- # This test needs watcher in order to properly mark success/failure
- # when "tearDown" task with trigger rule is part of the DAG
- list(dag.tasks) >> watcher()
-
-from tests.system.utils import get_test_run # noqa: E402
-
-# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git a/tests/system/providers/amazon/aws/example_redshift_sql.py
b/tests/system/providers/amazon/aws/example_redshift_sql.py
deleted file mode 100644
index aee0f42643..0000000000
--- a/tests/system/providers/amazon/aws/example_redshift_sql.py
+++ /dev/null
@@ -1,98 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from datetime import datetime
-
-from airflow import DAG
-from airflow.models.baseoperator import chain
-from airflow.providers.amazon.aws.operators.redshift_sql import
RedshiftSQLOperator
-from airflow.utils.trigger_rule import TriggerRule
-from tests.system.providers.amazon.aws.utils import set_env_id
-
-ENV_ID = set_env_id()
-DAG_ID = 'example_redshift_sql'
-
-with DAG(
- dag_id=DAG_ID,
- start_date=datetime(2021, 1, 1),
- schedule=None,
- catchup=False,
- tags=['example'],
-) as dag:
- setup__task_create_table = RedshiftSQLOperator(
- task_id='setup__create_table',
- sql="""
- CREATE TABLE IF NOT EXISTS fruit (
- fruit_id INTEGER,
- name VARCHAR NOT NULL,
- color VARCHAR NOT NULL
- );
- """,
- )
-
- setup__task_insert_data = RedshiftSQLOperator(
- task_id='setup__task_insert_data',
- sql=[
- "INSERT INTO fruit VALUES ( 1, 'Banana', 'Yellow');",
- "INSERT INTO fruit VALUES ( 2, 'Apple', 'Red');",
- "INSERT INTO fruit VALUES ( 3, 'Lemon', 'Yellow');",
- "INSERT INTO fruit VALUES ( 4, 'Grape', 'Purple');",
- "INSERT INTO fruit VALUES ( 5, 'Pear', 'Green');",
- "INSERT INTO fruit VALUES ( 6, 'Strawberry', 'Red');",
- ],
- )
-
- # [START howto_operator_redshift_sql]
- task_select_data = RedshiftSQLOperator(
- task_id='task_get_all_table_data', sql="""CREATE TABLE more_fruit AS
SELECT * FROM fruit;"""
- )
- # [END howto_operator_redshift_sql]
-
- # [START howto_operator_redshift_sql_with_params]
- task_select_filtered_data = RedshiftSQLOperator(
- task_id='task_get_filtered_table_data',
- sql="""CREATE TABLE filtered_fruit AS SELECT * FROM fruit WHERE color
= '{{ params.color }}';""",
- params={'color': 'Red'},
- )
- # [END howto_operator_redshift_sql_with_params]
-
- teardown__task_drop_table = RedshiftSQLOperator(
- task_id='teardown__drop_table',
- sql='DROP TABLE IF EXISTS fruit',
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
- chain(
- setup__task_create_table,
- setup__task_insert_data,
- [task_select_data, task_select_filtered_data],
- teardown__task_drop_table,
- )
-
- from tests.system.utils.watcher import watcher
-
- # This test needs watcher in order to properly mark success/failure
- # when "tearDown" task with trigger rule is part of the DAG
- list(dag.tasks) >> watcher()
-
-
-from tests.system.utils import get_test_run # noqa: E402
-
-# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)