This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bfcaf19 Split redshift sql and cluster objects (#20276)
bfcaf19 is described below
commit bfcaf195a56242df1d78439323a013605cb100ee
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Dec 16 15:02:29 2021 -0800
Split redshift sql and cluster objects (#20276)
The first redshift hook was for managing the cluster itself. Later a hook
for _using_ the cluster (e.g. running sql statements) was added to the same
module. Better to separate these into distinct modules redshift_sql and
redshift_cluster. Here we split the redshift modules for operators, hooks, and
sensors.
---
airflow/contrib/hooks/redshift_hook.py | 8 +-
.../contrib/sensors/aws_redshift_cluster_sensor.py | 8 +-
.../amazon/aws/example_dags/example_redshift.py | 2 +-
.../aws/example_dags/example_s3_to_redshift.py | 2 +-
airflow/providers/amazon/aws/hooks/redshift.py | 235 +--------------------
.../aws/hooks/{redshift.py => redshift_cluster.py} | 122 +----------
.../aws/hooks/{redshift.py => redshift_sql.py} | 118 +----------
airflow/providers/amazon/aws/operators/redshift.py | 154 ++------------
.../operators/{redshift.py => redshift_cluster.py} | 56 +----
.../providers/amazon/aws/operators/redshift_sql.py | 73 +++++++
airflow/providers/amazon/aws/sensors/redshift.py | 48 +----
.../sensors/{redshift.py => redshift_cluster.py} | 4 +-
.../amazon/aws/transfers/redshift_to_s3.py | 2 +-
.../amazon/aws/transfers/s3_to_redshift.py | 2 +-
airflow/providers/amazon/provider.yaml | 12 +-
dev/provider_packages/prepare_provider_packages.py | 5 +
.../operators/redshift_cluster.rst | 44 ++++
.../operators/{redshift.rst => redshift_sql.rst} | 28 +--
.../{test_redshift.py => test_redshift_cluster.py} | 72 +------
.../amazon/aws/hooks/test_redshift_sql.py | 89 ++++++++
.../{test_redshift.py => test_redshift_cluster.py} | 27 +--
.../amazon/aws/operators/test_redshift_sql.py | 43 ++++
.../{test_redshift.py => test_redshift_cluster.py} | 4 +-
.../amazon/aws/transfers/test_redshift_to_s3.py | 8 +-
24 files changed, 330 insertions(+), 836 deletions(-)
diff --git a/airflow/contrib/hooks/redshift_hook.py
b/airflow/contrib/hooks/redshift_hook.py
index ebaac1b..f33515f 100644
--- a/airflow/contrib/hooks/redshift_hook.py
+++ b/airflow/contrib/hooks/redshift_hook.py
@@ -16,14 +16,16 @@
# specific language governing permissions and limitations
# under the License.
-"""This module is deprecated. Please use
:mod:`airflow.providers.amazon.aws.hooks.redshift`."""
+"""This module is deprecated. Please use
:mod:`airflow.providers.amazon.aws.hooks.redshift_cluster`."""
import warnings
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook # noqa
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
warnings.warn(
- "This module is deprecated. Please use
`airflow.providers.amazon.aws.hooks.redshift`.",
+ "This module is deprecated. Please use
`airflow.providers.amazon.aws.hooks.redshift_cluster`.",
DeprecationWarning,
stacklevel=2,
)
+
+__all__ = ["RedshiftHook"]
diff --git a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
index 5c1341d..c6f3c92 100644
--- a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
+++ b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
@@ -16,14 +16,16 @@
# specific language governing permissions and limitations
# under the License.
-"""This module is deprecated. Please use
:mod:`airflow.providers.amazon.aws.sensors.redshift`."""
+"""This module is deprecated. Please use
:mod:`airflow.providers.amazon.aws.sensors.redshift_cluster`."""
import warnings
-from airflow.providers.amazon.aws.sensors.redshift import
AwsRedshiftClusterSensor # noqa
+from airflow.providers.amazon.aws.sensors.redshift_cluster import
AwsRedshiftClusterSensor
warnings.warn(
- "This module is deprecated. Please use
`airflow.providers.amazon.aws.sensors.redshift`.",
+ "This module is deprecated. Please use
`airflow.providers.amazon.aws.sensors.redshift_cluster`.",
DeprecationWarning,
stacklevel=2,
)
+
+__all__ = ["AwsRedshiftClusterSensor"]
diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift.py
b/airflow/providers/amazon/aws/example_dags/example_redshift.py
index dbb7d8f..51f9f14 100644
--- a/airflow/providers/amazon/aws/example_dags/example_redshift.py
+++ b/airflow/providers/amazon/aws/example_dags/example_redshift.py
@@ -23,7 +23,7 @@ from datetime import datetime
# [START redshift_operator_howto_guide]
from airflow import DAG
-from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
+from airflow.providers.amazon.aws.operators.redshift_sql import
RedshiftSQLOperator
with DAG(
dag_id="redshift",
diff --git
a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
index 8114e90..65ea682 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
@@ -26,7 +26,7 @@ from airflow import DAG
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
-from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
+from airflow.providers.amazon.aws.operators.redshift_sql import
RedshiftSQLOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import
S3ToRedshiftOperator
# [START howto_operator_s3_to_redshift_env_variables]
diff --git a/airflow/providers/amazon/aws/hooks/redshift.py
b/airflow/providers/amazon/aws/hooks/redshift.py
index e734285..4f52b5b 100644
--- a/airflow/providers/amazon/aws/hooks/redshift.py
+++ b/airflow/providers/amazon/aws/hooks/redshift.py
@@ -16,231 +16,16 @@
# specific language governing permissions and limitations
# under the License.
"""Interact with AWS Redshift clusters."""
-import sys
-from typing import Dict, List, Optional, Union
+import warnings
-if sys.version_info >= (3, 8):
- from functools import cached_property
-else:
- from cached_property import cached_property
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
+from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
-import redshift_connector
-from redshift_connector import Connection as RedshiftConnection
-from sqlalchemy import create_engine
-from sqlalchemy.engine.url import URL
+warnings.warn(
+ "This module is deprecated. Please use `airflow.hooks.redshift_sql` "
+ "or `airflow.hooks.redshift_cluster` as appropriate.",
+ DeprecationWarning,
+ stacklevel=2,
+)
-from airflow.hooks.dbapi import DbApiHook
-from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-
-
-class RedshiftHook(AwsBaseHook):
- """
- Interact with AWS Redshift, using the boto3 library
-
- Additional arguments (such as ``aws_conn_id``) may be specified and
- are passed down to the underlying AwsBaseHook.
-
- .. seealso::
- :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
-
- :param aws_conn_id: The Airflow connection used for AWS credentials.
- :type aws_conn_id: str
- """
-
- def __init__(self, *args, **kwargs) -> None:
- kwargs["client_type"] = "redshift"
- super().__init__(*args, **kwargs)
-
- # TODO: Wrap create_cluster_snapshot
- def cluster_status(self, cluster_identifier: str) -> str:
- """
- Return status of a cluster
-
- :param cluster_identifier: unique identifier of a cluster
- :type cluster_identifier: str
- :param skip_final_cluster_snapshot: determines cluster snapshot
creation
- :type skip_final_cluster_snapshot: bool
- :param final_cluster_snapshot_identifier: Optional[str]
- :type final_cluster_snapshot_identifier: Optional[str]
- """
- try:
- response =
self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
- return response[0]['ClusterStatus'] if response else None
- except self.get_conn().exceptions.ClusterNotFoundFault:
- return 'cluster_not_found'
-
- def delete_cluster(
- self,
- cluster_identifier: str,
- skip_final_cluster_snapshot: bool = True,
- final_cluster_snapshot_identifier: Optional[str] = None,
- ):
- """
- Delete a cluster and optionally create a snapshot
-
- :param cluster_identifier: unique identifier of a cluster
- :type cluster_identifier: str
- :param skip_final_cluster_snapshot: determines cluster snapshot
creation
- :type skip_final_cluster_snapshot: bool
- :param final_cluster_snapshot_identifier: name of final cluster
snapshot
- :type final_cluster_snapshot_identifier: str
- """
- final_cluster_snapshot_identifier = final_cluster_snapshot_identifier
or ''
-
- response = self.get_conn().delete_cluster(
- ClusterIdentifier=cluster_identifier,
- SkipFinalClusterSnapshot=skip_final_cluster_snapshot,
- FinalClusterSnapshotIdentifier=final_cluster_snapshot_identifier,
- )
- return response['Cluster'] if response['Cluster'] else None
-
- def describe_cluster_snapshots(self, cluster_identifier: str) ->
Optional[List[str]]:
- """
- Gets a list of snapshots for a cluster
-
- :param cluster_identifier: unique identifier of a cluster
- :type cluster_identifier: str
- """
- response =
self.get_conn().describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
- if 'Snapshots' not in response:
- return None
- snapshots = response['Snapshots']
- snapshots = [snapshot for snapshot in snapshots if snapshot["Status"]]
- snapshots.sort(key=lambda x: x['SnapshotCreateTime'], reverse=True)
- return snapshots
-
- def restore_from_cluster_snapshot(self, cluster_identifier: str,
snapshot_identifier: str) -> str:
- """
- Restores a cluster from its snapshot
-
- :param cluster_identifier: unique identifier of a cluster
- :type cluster_identifier: str
- :param snapshot_identifier: unique identifier for a snapshot of a
cluster
- :type snapshot_identifier: str
- """
- response = self.get_conn().restore_from_cluster_snapshot(
- ClusterIdentifier=cluster_identifier,
SnapshotIdentifier=snapshot_identifier
- )
- return response['Cluster'] if response['Cluster'] else None
-
- def create_cluster_snapshot(self, snapshot_identifier: str,
cluster_identifier: str) -> str:
- """
- Creates a snapshot of a cluster
-
- :param snapshot_identifier: unique identifier for a snapshot of a
cluster
- :type snapshot_identifier: str
- :param cluster_identifier: unique identifier of a cluster
- :type cluster_identifier: str
- """
- response = self.get_conn().create_cluster_snapshot(
- SnapshotIdentifier=snapshot_identifier,
- ClusterIdentifier=cluster_identifier,
- )
- return response['Snapshot'] if response['Snapshot'] else None
-
-
-class RedshiftSQLHook(DbApiHook):
- """
- Execute statements against Amazon Redshift, using redshift_connector
-
- This hook requires the redshift_conn_id connection.
-
- :param redshift_conn_id: reference to
- :ref:`Amazon Redshift connection id<howto/connection:redshift>`
- :type redshift_conn_id: str
-
- .. note::
- get_sqlalchemy_engine() and get_uri() depend on
sqlalchemy-amazon-redshift
- """
-
- conn_name_attr = 'redshift_conn_id'
- default_conn_name = 'redshift_default'
- conn_type = 'redshift'
- hook_name = 'Amazon Redshift'
- supports_autocommit = True
-
- @staticmethod
- def get_ui_field_behavior() -> Dict:
- """Returns custom field behavior"""
- return {
- "hidden_fields": [],
- "relabeling": {'login': 'User', 'schema': 'Database'},
- }
-
- @cached_property
- def conn(self):
- return self.get_connection(self.redshift_conn_id) # type:
ignore[attr-defined]
-
- def _get_conn_params(self) -> Dict[str, Union[str, int]]:
- """Helper method to retrieve connection args"""
- conn = self.conn
-
- conn_params: Dict[str, Union[str, int]] = {}
-
- if conn.login:
- conn_params['user'] = conn.login
- if conn.password:
- conn_params['password'] = conn.password
- if conn.host:
- conn_params['host'] = conn.host
- if conn.port:
- conn_params['port'] = conn.port
- if conn.schema:
- conn_params['database'] = conn.schema
-
- return conn_params
-
- def get_uri(self) -> str:
- """Overrides DbApiHook get_uri to use redshift_connector sqlalchemy
dialect as driver name"""
- conn_params = self._get_conn_params()
-
- if 'user' in conn_params:
- conn_params['username'] = conn_params.pop('user')
-
- return str(URL(drivername='redshift+redshift_connector',
**conn_params))
-
- def get_sqlalchemy_engine(self, engine_kwargs=None):
- """Overrides DbApiHook get_sqlalchemy_engine to pass
redshift_connector specific kwargs"""
- conn_kwargs = self.conn.extra_dejson
- if engine_kwargs is None:
- engine_kwargs = {}
-
- if "connect_args" in engine_kwargs:
- engine_kwargs["connect_args"] = {**conn_kwargs,
**engine_kwargs["connect_args"]}
- else:
- engine_kwargs["connect_args"] = conn_kwargs
-
- return create_engine(self.get_uri(), **engine_kwargs)
-
- def get_table_primary_key(self, table: str, schema: Optional[str] =
"public") -> Optional[List[str]]:
- """
- Helper method that returns the table primary key
- :param table: Name of the target table
- :type table: str
- :param table: Name of the target schema, public by default
- :type table: str
- :return: Primary key columns list
- :rtype: List[str]
- """
- sql = """
- select kcu.column_name
- from information_schema.table_constraints tco
- join information_schema.key_column_usage kcu
- on kcu.constraint_name = tco.constraint_name
- and kcu.constraint_schema = tco.constraint_schema
- and kcu.constraint_name = tco.constraint_name
- where tco.constraint_type = 'PRIMARY KEY'
- and kcu.table_schema = %s
- and kcu.table_name = %s
- """
- pk_columns = [row[0] for row in self.get_records(sql, (schema, table))]
- return pk_columns or None
-
- def get_conn(self) -> RedshiftConnection:
- """Returns a redshift_connector.Connection object"""
- conn_params = self._get_conn_params()
- conn_kwargs_dejson = self.conn.extra_dejson
- conn_kwargs: Dict = {**conn_params, **conn_kwargs_dejson}
- conn: RedshiftConnection = redshift_connector.connect(**conn_kwargs)
-
- return conn
+__all__ = ["RedshiftHook", "RedshiftSQLHook"]
diff --git a/airflow/providers/amazon/aws/hooks/redshift.py
b/airflow/providers/amazon/aws/hooks/redshift_cluster.py
similarity index 54%
copy from airflow/providers/amazon/aws/hooks/redshift.py
copy to airflow/providers/amazon/aws/hooks/redshift_cluster.py
index e734285..aa45d77 100644
--- a/airflow/providers/amazon/aws/hooks/redshift.py
+++ b/airflow/providers/amazon/aws/hooks/redshift_cluster.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,21 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Interact with AWS Redshift clusters."""
-import sys
-from typing import Dict, List, Optional, Union
-
-if sys.version_info >= (3, 8):
- from functools import cached_property
-else:
- from cached_property import cached_property
-import redshift_connector
-from redshift_connector import Connection as RedshiftConnection
-from sqlalchemy import create_engine
-from sqlalchemy.engine.url import URL
+from typing import List, Optional
-from airflow.hooks.dbapi import DbApiHook
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
@@ -137,110 +124,3 @@ class RedshiftHook(AwsBaseHook):
ClusterIdentifier=cluster_identifier,
)
return response['Snapshot'] if response['Snapshot'] else None
-
-
-class RedshiftSQLHook(DbApiHook):
- """
- Execute statements against Amazon Redshift, using redshift_connector
-
- This hook requires the redshift_conn_id connection.
-
- :param redshift_conn_id: reference to
- :ref:`Amazon Redshift connection id<howto/connection:redshift>`
- :type redshift_conn_id: str
-
- .. note::
- get_sqlalchemy_engine() and get_uri() depend on
sqlalchemy-amazon-redshift
- """
-
- conn_name_attr = 'redshift_conn_id'
- default_conn_name = 'redshift_default'
- conn_type = 'redshift'
- hook_name = 'Amazon Redshift'
- supports_autocommit = True
-
- @staticmethod
- def get_ui_field_behavior() -> Dict:
- """Returns custom field behavior"""
- return {
- "hidden_fields": [],
- "relabeling": {'login': 'User', 'schema': 'Database'},
- }
-
- @cached_property
- def conn(self):
- return self.get_connection(self.redshift_conn_id) # type:
ignore[attr-defined]
-
- def _get_conn_params(self) -> Dict[str, Union[str, int]]:
- """Helper method to retrieve connection args"""
- conn = self.conn
-
- conn_params: Dict[str, Union[str, int]] = {}
-
- if conn.login:
- conn_params['user'] = conn.login
- if conn.password:
- conn_params['password'] = conn.password
- if conn.host:
- conn_params['host'] = conn.host
- if conn.port:
- conn_params['port'] = conn.port
- if conn.schema:
- conn_params['database'] = conn.schema
-
- return conn_params
-
- def get_uri(self) -> str:
- """Overrides DbApiHook get_uri to use redshift_connector sqlalchemy
dialect as driver name"""
- conn_params = self._get_conn_params()
-
- if 'user' in conn_params:
- conn_params['username'] = conn_params.pop('user')
-
- return str(URL(drivername='redshift+redshift_connector',
**conn_params))
-
- def get_sqlalchemy_engine(self, engine_kwargs=None):
- """Overrides DbApiHook get_sqlalchemy_engine to pass
redshift_connector specific kwargs"""
- conn_kwargs = self.conn.extra_dejson
- if engine_kwargs is None:
- engine_kwargs = {}
-
- if "connect_args" in engine_kwargs:
- engine_kwargs["connect_args"] = {**conn_kwargs,
**engine_kwargs["connect_args"]}
- else:
- engine_kwargs["connect_args"] = conn_kwargs
-
- return create_engine(self.get_uri(), **engine_kwargs)
-
- def get_table_primary_key(self, table: str, schema: Optional[str] =
"public") -> Optional[List[str]]:
- """
- Helper method that returns the table primary key
- :param table: Name of the target table
- :type table: str
- :param table: Name of the target schema, public by default
- :type table: str
- :return: Primary key columns list
- :rtype: List[str]
- """
- sql = """
- select kcu.column_name
- from information_schema.table_constraints tco
- join information_schema.key_column_usage kcu
- on kcu.constraint_name = tco.constraint_name
- and kcu.constraint_schema = tco.constraint_schema
- and kcu.constraint_name = tco.constraint_name
- where tco.constraint_type = 'PRIMARY KEY'
- and kcu.table_schema = %s
- and kcu.table_name = %s
- """
- pk_columns = [row[0] for row in self.get_records(sql, (schema, table))]
- return pk_columns or None
-
- def get_conn(self) -> RedshiftConnection:
- """Returns a redshift_connector.Connection object"""
- conn_params = self._get_conn_params()
- conn_kwargs_dejson = self.conn.extra_dejson
- conn_kwargs: Dict = {**conn_params, **conn_kwargs_dejson}
- conn: RedshiftConnection = redshift_connector.connect(**conn_kwargs)
-
- return conn
diff --git a/airflow/providers/amazon/aws/hooks/redshift.py
b/airflow/providers/amazon/aws/hooks/redshift_sql.py
similarity index 53%
copy from airflow/providers/amazon/aws/hooks/redshift.py
copy to airflow/providers/amazon/aws/hooks/redshift_sql.py
index e734285..739b7b5 100644
--- a/airflow/providers/amazon/aws/hooks/redshift.py
+++ b/airflow/providers/amazon/aws/hooks/redshift_sql.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,128 +14,21 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Interact with AWS Redshift clusters."""
+
import sys
from typing import Dict, List, Optional, Union
-if sys.version_info >= (3, 8):
- from functools import cached_property
-else:
- from cached_property import cached_property
-
import redshift_connector
from redshift_connector import Connection as RedshiftConnection
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
from airflow.hooks.dbapi import DbApiHook
-from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-
-
-class RedshiftHook(AwsBaseHook):
- """
- Interact with AWS Redshift, using the boto3 library
-
- Additional arguments (such as ``aws_conn_id``) may be specified and
- are passed down to the underlying AwsBaseHook.
-
- .. seealso::
- :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
-
- :param aws_conn_id: The Airflow connection used for AWS credentials.
- :type aws_conn_id: str
- """
- def __init__(self, *args, **kwargs) -> None:
- kwargs["client_type"] = "redshift"
- super().__init__(*args, **kwargs)
-
- # TODO: Wrap create_cluster_snapshot
- def cluster_status(self, cluster_identifier: str) -> str:
- """
- Return status of a cluster
-
- :param cluster_identifier: unique identifier of a cluster
- :type cluster_identifier: str
- :param skip_final_cluster_snapshot: determines cluster snapshot
creation
- :type skip_final_cluster_snapshot: bool
- :param final_cluster_snapshot_identifier: Optional[str]
- :type final_cluster_snapshot_identifier: Optional[str]
- """
- try:
- response =
self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
- return response[0]['ClusterStatus'] if response else None
- except self.get_conn().exceptions.ClusterNotFoundFault:
- return 'cluster_not_found'
-
- def delete_cluster(
- self,
- cluster_identifier: str,
- skip_final_cluster_snapshot: bool = True,
- final_cluster_snapshot_identifier: Optional[str] = None,
- ):
- """
- Delete a cluster and optionally create a snapshot
-
- :param cluster_identifier: unique identifier of a cluster
- :type cluster_identifier: str
- :param skip_final_cluster_snapshot: determines cluster snapshot
creation
- :type skip_final_cluster_snapshot: bool
- :param final_cluster_snapshot_identifier: name of final cluster
snapshot
- :type final_cluster_snapshot_identifier: str
- """
- final_cluster_snapshot_identifier = final_cluster_snapshot_identifier
or ''
-
- response = self.get_conn().delete_cluster(
- ClusterIdentifier=cluster_identifier,
- SkipFinalClusterSnapshot=skip_final_cluster_snapshot,
- FinalClusterSnapshotIdentifier=final_cluster_snapshot_identifier,
- )
- return response['Cluster'] if response['Cluster'] else None
-
- def describe_cluster_snapshots(self, cluster_identifier: str) ->
Optional[List[str]]:
- """
- Gets a list of snapshots for a cluster
-
- :param cluster_identifier: unique identifier of a cluster
- :type cluster_identifier: str
- """
- response =
self.get_conn().describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
- if 'Snapshots' not in response:
- return None
- snapshots = response['Snapshots']
- snapshots = [snapshot for snapshot in snapshots if snapshot["Status"]]
- snapshots.sort(key=lambda x: x['SnapshotCreateTime'], reverse=True)
- return snapshots
-
- def restore_from_cluster_snapshot(self, cluster_identifier: str,
snapshot_identifier: str) -> str:
- """
- Restores a cluster from its snapshot
-
- :param cluster_identifier: unique identifier of a cluster
- :type cluster_identifier: str
- :param snapshot_identifier: unique identifier for a snapshot of a
cluster
- :type snapshot_identifier: str
- """
- response = self.get_conn().restore_from_cluster_snapshot(
- ClusterIdentifier=cluster_identifier,
SnapshotIdentifier=snapshot_identifier
- )
- return response['Cluster'] if response['Cluster'] else None
-
- def create_cluster_snapshot(self, snapshot_identifier: str,
cluster_identifier: str) -> str:
- """
- Creates a snapshot of a cluster
-
- :param snapshot_identifier: unique identifier for a snapshot of a
cluster
- :type snapshot_identifier: str
- :param cluster_identifier: unique identifier of a cluster
- :type cluster_identifier: str
- """
- response = self.get_conn().create_cluster_snapshot(
- SnapshotIdentifier=snapshot_identifier,
- ClusterIdentifier=cluster_identifier,
- )
- return response['Snapshot'] if response['Snapshot'] else None
+if sys.version_info >= (3, 8):
+ from functools import cached_property
+else:
+ from cached_property import cached_property
class RedshiftSQLHook(DbApiHook):
diff --git a/airflow/providers/amazon/aws/operators/redshift.py
b/airflow/providers/amazon/aws/operators/redshift.py
index 52d82b4..71bd3c4 100644
--- a/airflow/providers/amazon/aws/operators/redshift.py
+++ b/airflow/providers/amazon/aws/operators/redshift.py
@@ -15,141 +15,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from typing import Dict, Iterable, Optional, Union
-
-from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook,
RedshiftSQLHook
-
-
-class RedshiftSQLOperator(BaseOperator):
- """
- Executes SQL Statements against an Amazon Redshift cluster
-
- .. seealso::
- For more information on how to use this operator, take a look at the
guide:
- :ref:`howto/operator:RedshiftSQLOperator`
-
- :param sql: the sql code to be executed
- :type sql: Can receive a str representing a sql statement,
- or an iterable of str (sql statements)
- :param redshift_conn_id: reference to
- :ref:`Amazon Redshift connection id<howto/connection:redshift>`
- :type redshift_conn_id: str
- :param parameters: (optional) the parameters to render the SQL query with.
- :type parameters: dict or iterable
- :param autocommit: if True, each command is automatically committed.
- (default value: False)
- :type autocommit: bool
- """
-
- template_fields = ('sql',)
- template_ext = ('.sql',)
-
- def __init__(
- self,
- *,
- sql: Optional[Union[Dict, Iterable]],
- redshift_conn_id: str = 'redshift_default',
- parameters: Optional[dict] = None,
- autocommit: bool = True,
- **kwargs,
- ) -> None:
- super().__init__(**kwargs)
- self.redshift_conn_id = redshift_conn_id
- self.sql = sql
- self.autocommit = autocommit
- self.parameters = parameters
-
- def get_hook(self) -> RedshiftSQLHook:
- """Create and return RedshiftSQLHook.
- :return RedshiftSQLHook: A RedshiftSQLHook instance.
- """
- return RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
-
- def execute(self, context: dict) -> None:
- """Execute a statement against Amazon Redshift"""
- self.log.info(f"Executing statement: {self.sql}")
- hook = self.get_hook()
- hook.run(self.sql, autocommit=self.autocommit,
parameters=self.parameters)
-
-
-class RedshiftResumeClusterOperator(BaseOperator):
- """
- Resume a paused AWS Redshift Cluster
-
- .. seealso::
- For more information on how to use this operator, take a look at the
guide:
- :ref:`howto/operator:RedshiftResumeClusterOperator`
-
- :param cluster_identifier: id of the AWS Redshift Cluster
- :type cluster_identifier: str
- :param aws_conn_id: aws connection to use
- :type aws_conn_id: str
- """
-
- template_fields = ("cluster_identifier",)
- ui_color = "#eeaa11"
- ui_fgcolor = "#ffffff"
-
- def __init__(
- self,
- *,
- cluster_identifier: str,
- aws_conn_id: str = "aws_default",
- **kwargs,
- ):
- super().__init__(**kwargs)
- self.cluster_identifier = cluster_identifier
- self.aws_conn_id = aws_conn_id
-
- def execute(self, context):
- redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
- cluster_state =
redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
- if cluster_state == 'paused':
- self.log.info("Starting Redshift cluster %s",
self.cluster_identifier)
-
redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
- else:
- self.log.warning(
- "Unable to resume cluster since cluster is currently in
status: %s", cluster_state
- )
-
-
-class RedshiftPauseClusterOperator(BaseOperator):
- """
- Pause an AWS Redshift Cluster if it has status `available`.
-
- .. seealso::
- For more information on how to use this operator, take a look at the
guide:
- :ref:`howto/operator:RedshiftPauseClusterOperator`
-
- :param cluster_identifier: id of the AWS Redshift Cluster
- :type cluster_identifier: str
- :param aws_conn_id: aws connection to use
- :type aws_conn_id: str
- """
-
- template_fields = ("cluster_identifier",)
- ui_color = "#eeaa11"
- ui_fgcolor = "#ffffff"
-
- def __init__(
- self,
- *,
- cluster_identifier: str,
- aws_conn_id: str = "aws_default",
- **kwargs,
- ):
- super().__init__(**kwargs)
- self.cluster_identifier = cluster_identifier
- self.aws_conn_id = aws_conn_id
-
- def execute(self, context):
- redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
- cluster_state =
redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
- if cluster_state == 'available':
- self.log.info("Pausing Redshift cluster %s",
self.cluster_identifier)
-
redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)
- else:
- self.log.warning(
- "Unable to pause cluster since cluster is currently in status:
%s", cluster_state
- )
+import warnings
+
+from airflow.providers.amazon.aws.operators.redshift_cluster import (
+ RedshiftPauseClusterOperator,
+ RedshiftResumeClusterOperator,
+)
+from airflow.providers.amazon.aws.operators.redshift_sql import
RedshiftSQLOperator
+
+warnings.warn(
+ "This module is deprecated. Please use `airflow.operators.redshift_sql` "
+ "or `airflow.operators.redshift_cluster` as appropriate.",
+ DeprecationWarning,
+ stacklevel=2,
+)
+
+__all__ = ["RedshiftSQLOperator", "RedshiftPauseClusterOperator",
"RedshiftResumeClusterOperator"]
diff --git a/airflow/providers/amazon/aws/operators/redshift.py
b/airflow/providers/amazon/aws/operators/redshift_cluster.py
similarity index 64%
copy from airflow/providers/amazon/aws/operators/redshift.py
copy to airflow/providers/amazon/aws/operators/redshift_cluster.py
index 52d82b4..5ec9c01 100644
--- a/airflow/providers/amazon/aws/operators/redshift.py
+++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,62 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from typing import Dict, Iterable, Optional, Union
from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook,
RedshiftSQLHook
-
-
-class RedshiftSQLOperator(BaseOperator):
- """
- Executes SQL Statements against an Amazon Redshift cluster
-
- .. seealso::
- For more information on how to use this operator, take a look at the
guide:
- :ref:`howto/operator:RedshiftSQLOperator`
-
- :param sql: the sql code to be executed
- :type sql: Can receive a str representing a sql statement,
- or an iterable of str (sql statements)
- :param redshift_conn_id: reference to
- :ref:`Amazon Redshift connection id<howto/connection:redshift>`
- :type redshift_conn_id: str
- :param parameters: (optional) the parameters to render the SQL query with.
- :type parameters: dict or iterable
- :param autocommit: if True, each command is automatically committed.
- (default value: False)
- :type autocommit: bool
- """
-
- template_fields = ('sql',)
- template_ext = ('.sql',)
-
- def __init__(
- self,
- *,
- sql: Optional[Union[Dict, Iterable]],
- redshift_conn_id: str = 'redshift_default',
- parameters: Optional[dict] = None,
- autocommit: bool = True,
- **kwargs,
- ) -> None:
- super().__init__(**kwargs)
- self.redshift_conn_id = redshift_conn_id
- self.sql = sql
- self.autocommit = autocommit
- self.parameters = parameters
-
- def get_hook(self) -> RedshiftSQLHook:
- """Create and return RedshiftSQLHook.
- :return RedshiftSQLHook: A RedshiftSQLHook instance.
- """
- return RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
-
- def execute(self, context: dict) -> None:
- """Execute a statement against Amazon Redshift"""
- self.log.info(f"Executing statement: {self.sql}")
- hook = self.get_hook()
- hook.run(self.sql, autocommit=self.autocommit,
parameters=self.parameters)
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
class RedshiftResumeClusterOperator(BaseOperator):
diff --git a/airflow/providers/amazon/aws/operators/redshift_sql.py
b/airflow/providers/amazon/aws/operators/redshift_sql.py
new file mode 100644
index 0000000..4e40baa
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/redshift_sql.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Iterable, Optional, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
+
+
+class RedshiftSQLOperator(BaseOperator):
+ """
+ Executes SQL Statements against an Amazon Redshift cluster
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:RedshiftSQLOperator`
+
+ :param sql: the sql code to be executed
+ :type sql: Can receive a str representing a sql statement,
+ or an iterable of str (sql statements)
+ :param redshift_conn_id: reference to
+ :ref:`Amazon Redshift connection id<howto/connection:redshift>`
+ :type redshift_conn_id: str
+ :param parameters: (optional) the parameters to render the SQL query with.
+ :type parameters: dict or iterable
+ :param autocommit: if True, each command is automatically committed.
+ (default value: False)
+ :type autocommit: bool
+ """
+
+ template_fields = ('sql',)
+ template_ext = ('.sql',)
+
+ def __init__(
+ self,
+ *,
+ sql: Optional[Union[Dict, Iterable]],
+ redshift_conn_id: str = 'redshift_default',
+ parameters: Optional[dict] = None,
+ autocommit: bool = True,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.redshift_conn_id = redshift_conn_id
+ self.sql = sql
+ self.autocommit = autocommit
+ self.parameters = parameters
+
+ def get_hook(self) -> RedshiftSQLHook:
+ """Create and return RedshiftSQLHook.
+ :return RedshiftSQLHook: A RedshiftSQLHook instance.
+ """
+ return RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
+
+ def execute(self, context: dict) -> None:
+ """Execute a statement against Amazon Redshift"""
+ self.log.info(f"Executing statement: {self.sql}")
+ hook = self.get_hook()
+ hook.run(self.sql, autocommit=self.autocommit,
parameters=self.parameters)
diff --git a/airflow/providers/amazon/aws/sensors/redshift.py
b/airflow/providers/amazon/aws/sensors/redshift.py
index 669e6c0..0f41508 100644
--- a/airflow/providers/amazon/aws/sensors/redshift.py
+++ b/airflow/providers/amazon/aws/sensors/redshift.py
@@ -15,46 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from typing import Optional
+import warnings
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook
-from airflow.sensors.base import BaseSensorOperator
+from airflow.providers.amazon.aws.sensors.redshift_cluster import
AwsRedshiftClusterSensor
+warnings.warn(
+ "This module is deprecated. Please use
`airflow.sensors.redshift_cluster`.",
+ DeprecationWarning,
+ stacklevel=2,
+)
-class AwsRedshiftClusterSensor(BaseSensorOperator):
- """
- Waits for a Redshift cluster to reach a specific status.
-
- :param cluster_identifier: The identifier for the cluster being pinged.
- :type cluster_identifier: str
- :param target_status: The cluster status desired.
- :type target_status: str
- """
-
- template_fields = ('cluster_identifier', 'target_status')
-
- def __init__(
- self,
- *,
- cluster_identifier: str,
- target_status: str = 'available',
- aws_conn_id: str = 'aws_default',
- **kwargs,
- ):
- super().__init__(**kwargs)
- self.cluster_identifier = cluster_identifier
- self.target_status = target_status
- self.aws_conn_id = aws_conn_id
- self.hook: Optional[RedshiftHook] = None
-
- def poke(self, context):
- self.log.info('Poking for status : %s\nfor cluster %s',
self.target_status, self.cluster_identifier)
- return self.get_hook().cluster_status(self.cluster_identifier) ==
self.target_status
-
- def get_hook(self) -> RedshiftHook:
- """Create and return a RedshiftHook"""
- if self.hook:
- return self.hook
-
- self.hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
- return self.hook
+__all__ = ["AwsRedshiftClusterSensor"]
diff --git a/airflow/providers/amazon/aws/sensors/redshift.py
b/airflow/providers/amazon/aws/sensors/redshift_cluster.py
similarity index 96%
copy from airflow/providers/amazon/aws/sensors/redshift.py
copy to airflow/providers/amazon/aws/sensors/redshift_cluster.py
index 669e6c0..91b0703 100644
--- a/airflow/providers/amazon/aws/sensors/redshift.py
+++ b/airflow/providers/amazon/aws/sensors/redshift_cluster.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,9 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
from typing import Optional
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
from airflow.sensors.base import BaseSensorOperator
diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
index 42ce2b8..f233e94 100644
--- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
@@ -19,7 +19,7 @@
from typing import Iterable, List, Mapping, Optional, Union
from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook
+from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.utils.redshift import build_credentials_block
diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
index 5f5eb38..0c3b347 100644
--- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
@@ -20,7 +20,7 @@ from typing import List, Optional, Union
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook
+from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.utils.redshift import build_credentials_block
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index d602132..5664ba4 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -105,7 +105,8 @@ integrations:
external-doc-url: https://aws.amazon.com/redshift/
logo: /integration-logos/aws/[email protected]
how-to-guide:
- - /docs/apache-airflow-providers-amazon/operators/redshift.rst
+ - /docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
+ - /docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
tags: [aws]
- integration-name: Amazon SageMaker
external-doc-url: https://aws.amazon.com/sagemaker/
@@ -252,6 +253,8 @@ operators:
- integration-name: Amazon Redshift
python-modules:
- airflow.providers.amazon.aws.operators.redshift
+ - airflow.providers.amazon.aws.operators.redshift_sql
+ - airflow.providers.amazon.aws.operators.redshift_cluster
sensors:
- integration-name: Amazon Athena
@@ -295,6 +298,7 @@ sensors:
- integration-name: Amazon Redshift
python-modules:
- airflow.providers.amazon.aws.sensors.redshift
+ - airflow.providers.amazon.aws.sensors.redshift_cluster
- integration-name: Amazon Simple Storage Service (S3)
python-modules:
- airflow.providers.amazon.aws.sensors.s3_key
@@ -375,6 +379,8 @@ hooks:
- integration-name: Amazon Redshift
python-modules:
- airflow.providers.amazon.aws.hooks.redshift
+ - airflow.providers.amazon.aws.hooks.redshift_sql
+ - airflow.providers.amazon.aws.hooks.redshift_cluster
- integration-name: Amazon Simple Storage Service (S3)
python-modules:
- airflow.providers.amazon.aws.hooks.s3
@@ -461,7 +467,7 @@ hook-class-names: # deprecated - to be removed after
providers add dependency o
- airflow.providers.amazon.aws.hooks.s3.S3Hook
- airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook
- airflow.providers.amazon.aws.hooks.emr.EmrHook
- - airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook
+ - airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook
extra-links:
- airflow.providers.amazon.aws.operators.emr.EmrClusterLink
@@ -474,7 +480,7 @@ connection-types:
connection-type: aws
- hook-class-name: airflow.providers.amazon.aws.hooks.emr.EmrHook
connection-type: emr
- - hook-class-name:
airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook
+ - hook-class-name:
airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook
connection-type: redshift
secrets-backends:
diff --git a/dev/provider_packages/prepare_provider_packages.py
b/dev/provider_packages/prepare_provider_packages.py
index 6c57392..2e77ea9 100755
--- a/dev/provider_packages/prepare_provider_packages.py
+++ b/dev/provider_packages/prepare_provider_packages.py
@@ -2154,6 +2154,11 @@ KNOWN_DEPRECATED_DIRECT_IMPORTS: Set[str] = {
"This module is deprecated. Please use
`airflow.providers.amazon.aws.sensors.dms`.",
'This module is deprecated. Please use
`airflow.providers.amazon.aws.operators.emr`.',
'This module is deprecated. Please use
`airflow.providers.amazon.aws.sensors.emr`.',
+ 'This module is deprecated. Please use `airflow.hooks.redshift_sql` '
+ 'or `airflow.hooks.redshift_cluster` as appropriate.',
+ 'This module is deprecated. Please use `airflow.operators.redshift_sql` or
'
+ '`airflow.operators.redshift_cluster` as appropriate.',
+ 'This module is deprecated. Please use
`airflow.sensors.redshift_cluster`.',
}
diff --git
a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
new file mode 100644
index 0000000..352478b
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
@@ -0,0 +1,44 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Redshift cluster management operators
+=====================================
+
+.. contents::
+ :depth: 1
+ :local:
+
+.. _howto/operator:RedshiftResumeClusterOperator:
+
+Resume a Redshift Cluster
+"""""""""""""""""""""""""
+
+To resume a 'paused' AWS Redshift Cluster you can use
+:class:`RedshiftResumeClusterOperator
<airflow.providers.amazon.aws.operators.redshift_cluster>`
+
+This Operator leverages the AWS CLI
+`resume-cluster
<https://docs.aws.amazon.com/cli/latest/reference/redshift/resume-cluster.html>`__
API
+
+.. _howto/operator:RedshiftPauseClusterOperator:
+
+Pause a Redshift Cluster
+""""""""""""""""""""""""
+
+To pause an 'available' AWS Redshift Cluster you can use
+:class:`RedshiftPauseClusterOperator
<airflow.providers.amazon.aws.operators.redshift_cluster>`
+This Operator leverages the AWS CLI
+`pause-cluster
<https://docs.aws.amazon.com/cli/latest/reference/redshift/pause-cluster.html>`__
API
diff --git a/docs/apache-airflow-providers-amazon/operators/redshift.rst
b/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
similarity index 77%
rename from docs/apache-airflow-providers-amazon/operators/redshift.rst
rename to docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
index 5a5d1d7..c53a4b9 100644
--- a/docs/apache-airflow-providers-amazon/operators/redshift.rst
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
@@ -27,10 +27,10 @@ RedshiftSQLOperator
Overview
--------
-Use the :class:`RedshiftSQLOperator
<airflow.providers.amazon.aws.operators.redshift>` to execute
+Use the :class:`RedshiftSQLOperator
<airflow.providers.amazon.aws.operators.redshift_sql>` to execute
statements against an Amazon Redshift cluster.
-:class:`RedshiftSQLOperator
<airflow.providers.amazon.aws.operators.redshift.RedshiftSQLOperator>` works
together with
+:class:`RedshiftSQLOperator
<airflow.providers.amazon.aws.operators.redshift_sql.RedshiftSQLOperator>`
works together with
:class:`RedshiftSQLHook
<airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook>` to establish
connections with Amazon Redshift.
@@ -41,7 +41,7 @@ example_redshift.py
Purpose
"""""""
-This is a basic example dag for using :class:`RedshiftSQLOperator
<airflow.providers.amazon.aws.operators.redshift>`
+This is a basic example dag for using :class:`RedshiftSQLOperator
<airflow.providers.amazon.aws.operators.redshift_sql>`
to execute statements against an Amazon Redshift cluster.
Create a table
@@ -94,25 +94,3 @@ All together, here is our DAG:
:language: python
:start-after: [START redshift_operator_howto_guide]
:end-before: [END redshift_operator_howto_guide]
-
-
-.. _howto/operator:RedshiftResumeClusterOperator:
-
-Resume a Redshift Cluster
-"""""""""""""""""""""""""""""""""""""""""""
-
-To resume a 'paused' AWS Redshift Cluster you can use
-:class:`RedshiftResumeClusterOperator
<airflow.providers.amazon.aws.operators.redshift>`
-
-This Operator leverages the AWS CLI
-`resume-cluster
<https://docs.aws.amazon.com/cli/latest/reference/redshift/resume-cluster.html>`__
API
-
-.. _howto/operator:RedshiftPauseClusterOperator:
-
-Pause a Redshift Cluster
-"""""""""""""""""""""""""""""""""""""""""""
-
-To pause an 'available' AWS Redshift Cluster you can use
-:class:`RedshiftPauseClusterOperator
<airflow.providers.amazon.aws.operators.redshift>`
-This Operator leverages the AWS CLI
-`pause-cluster
<https://docs.aws.amazon.com/cli/latest/reference/redshift/pause-cluster.html>`__
API
diff --git a/tests/providers/amazon/aws/hooks/test_redshift.py
b/tests/providers/amazon/aws/hooks/test_redshift_cluster.py
similarity index 60%
rename from tests/providers/amazon/aws/hooks/test_redshift.py
rename to tests/providers/amazon/aws/hooks/test_redshift_cluster.py
index 35a1b5e..12d0bf4 100644
--- a/tests/providers/amazon/aws/hooks/test_redshift.py
+++ b/tests/providers/amazon/aws/hooks/test_redshift_cluster.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,18 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-import json
import unittest
-from unittest import mock
import boto3
-from parameterized import parameterized
-from airflow.models import Connection
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook,
RedshiftSQLHook
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
try:
from moto import mock_redshift
@@ -107,67 +101,3 @@ class TestRedshiftHook(unittest.TestCase):
hook = RedshiftHook(aws_conn_id='aws_default')
status = hook.cluster_status('test_cluster')
assert status == 'available'
-
-
-class TestRedshiftSQLHookConn(unittest.TestCase):
- def setUp(self):
- super().setUp()
-
- self.connection = Connection(
- conn_type='redshift', login='login', password='password',
host='host', port=5439, schema="dev"
- )
-
- self.db_hook = RedshiftSQLHook()
- self.db_hook.get_connection = mock.Mock()
- self.db_hook.get_connection.return_value = self.connection
-
- def test_get_uri(self):
- expected = 'redshift+redshift_connector://login:password@host:5439/dev'
- x = self.db_hook.get_uri()
- assert x == expected
-
-
@mock.patch('airflow.providers.amazon.aws.hooks.redshift.redshift_connector.connect')
- def test_get_conn(self, mock_connect):
- self.db_hook.get_conn()
- mock_connect.assert_called_once_with(
- user='login', password='password', host='host', port=5439,
database='dev'
- )
-
-
@mock.patch('airflow.providers.amazon.aws.hooks.redshift.redshift_connector.connect')
- def test_get_conn_extra(self, mock_connect):
- self.connection.extra = json.dumps(
- {
- "iam": True,
- "cluster_identifier": "my-test-cluster",
- "profile": "default",
- }
- )
- self.db_hook.get_conn()
- mock_connect.assert_called_once_with(
- user='login',
- password='password',
- host='host',
- port=5439,
- cluster_identifier="my-test-cluster",
- profile="default",
- database='dev',
- iam=True,
- )
-
- @parameterized.expand(
- [
- ({}, {}, {}),
- ({"login": "test"}, {}, {"user": "test"}),
- ({}, {"user": "test"}, {"user": "test"}),
- ({"login": "original"}, {"user": "overridden"}, {"user":
"overridden"}),
- ({"login": "test1"}, {"password": "test2"}, {"user": "test1",
"password": "test2"}),
- ],
- )
-
@mock.patch('airflow.providers.amazon.aws.hooks.redshift.redshift_connector.connect')
- def test_get_conn_overrides_correctly(self, conn_params, conn_extra,
expected_call_args, mock_connect):
- with mock.patch(
- 'airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.conn',
- Connection(conn_type='redshift', extra=conn_extra, **conn_params),
- ):
- self.db_hook.get_conn()
- mock_connect.assert_called_once_with(**expected_call_args)
diff --git a/tests/providers/amazon/aws/hooks/test_redshift_sql.py
b/tests/providers/amazon/aws/hooks/test_redshift_sql.py
new file mode 100644
index 0000000..e4754e7
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_redshift_sql.py
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import unittest
+from unittest import mock
+
+from parameterized import parameterized
+
+from airflow.models import Connection
+from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
+
+
+class TestRedshiftSQLHookConn(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+
+ self.connection = Connection(
+ conn_type='redshift', login='login', password='password',
host='host', port=5439, schema="dev"
+ )
+
+ self.db_hook = RedshiftSQLHook()
+ self.db_hook.get_connection = mock.Mock()
+ self.db_hook.get_connection.return_value = self.connection
+
+ def test_get_uri(self):
+ expected = 'redshift+redshift_connector://login:password@host:5439/dev'
+ x = self.db_hook.get_uri()
+ assert x == expected
+
+
@mock.patch('airflow.providers.amazon.aws.hooks.redshift_sql.redshift_connector.connect')
+ def test_get_conn(self, mock_connect):
+ self.db_hook.get_conn()
+ mock_connect.assert_called_once_with(
+ user='login', password='password', host='host', port=5439,
database='dev'
+ )
+
+
@mock.patch('airflow.providers.amazon.aws.hooks.redshift_sql.redshift_connector.connect')
+ def test_get_conn_extra(self, mock_connect):
+ self.connection.extra = json.dumps(
+ {
+ "iam": True,
+ "cluster_identifier": "my-test-cluster",
+ "profile": "default",
+ }
+ )
+ self.db_hook.get_conn()
+ mock_connect.assert_called_once_with(
+ user='login',
+ password='password',
+ host='host',
+ port=5439,
+ cluster_identifier="my-test-cluster",
+ profile="default",
+ database='dev',
+ iam=True,
+ )
+
+ @parameterized.expand(
+ [
+ ({}, {}, {}),
+ ({"login": "test"}, {}, {"user": "test"}),
+ ({}, {"user": "test"}, {"user": "test"}),
+ ({"login": "original"}, {"user": "overridden"}, {"user":
"overridden"}),
+ ({"login": "test1"}, {"password": "test2"}, {"user": "test1",
"password": "test2"}),
+ ],
+ )
+
@mock.patch('airflow.providers.amazon.aws.hooks.redshift_sql.redshift_connector.connect')
+ def test_get_conn_overrides_correctly(self, conn_params, conn_extra,
expected_call_args, mock_connect):
+ with mock.patch(
+ 'airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.conn',
+ Connection(conn_type='redshift', extra=conn_extra, **conn_params),
+ ):
+ self.db_hook.get_conn()
+ mock_connect.assert_called_once_with(**expected_call_args)
diff --git a/tests/providers/amazon/aws/operators/test_redshift.py
b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
similarity index 81%
rename from tests/providers/amazon/aws/operators/test_redshift.py
rename to tests/providers/amazon/aws/operators/test_redshift_cluster.py
index d43cf4a..a1944a8 100644
--- a/tests/providers/amazon/aws/operators/test_redshift.py
+++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,38 +15,14 @@
# specific language governing permissions and limitations
# under the License.
-import unittest
from unittest import mock
-from unittest.mock import MagicMock
-
-from parameterized import parameterized
-from airflow.providers.amazon.aws.operators.redshift import (
+from airflow.providers.amazon.aws.operators.redshift_cluster import (
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
- RedshiftSQLOperator,
)
-class TestRedshiftSQLOperator(unittest.TestCase):
- @parameterized.expand([(True, ('a', 'b')), (False, ('c', 'd'))])
-
@mock.patch("airflow.providers.amazon.aws.operators.redshift.RedshiftSQLOperator.get_hook")
- def test_redshift_operator(self, test_autocommit, test_parameters,
mock_get_hook):
- hook = MagicMock()
- mock_run = hook.run
- mock_get_hook.return_value = hook
- sql = MagicMock()
- operator = RedshiftSQLOperator(
- task_id='test', sql=sql, autocommit=test_autocommit,
parameters=test_parameters
- )
- operator.execute(None)
- mock_run.assert_called_once_with(
- sql,
- autocommit=test_autocommit,
- parameters=test_parameters,
- )
-
-
class TestResumeClusterOperator:
def test_init(self):
redshift_operator = RedshiftResumeClusterOperator(
diff --git a/tests/providers/amazon/aws/operators/test_redshift_sql.py
b/tests/providers/amazon/aws/operators/test_redshift_sql.py
new file mode 100644
index 0000000..064e86a
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_redshift_sql.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+from unittest.mock import MagicMock
+
+from parameterized import parameterized
+
+from airflow.providers.amazon.aws.operators.redshift_sql import
RedshiftSQLOperator
+
+
+class TestRedshiftSQLOperator(unittest.TestCase):
+ @parameterized.expand([(True, ('a', 'b')), (False, ('c', 'd'))])
+
@mock.patch("airflow.providers.amazon.aws.operators.redshift_sql.RedshiftSQLOperator.get_hook")
+ def test_redshift_operator(self, test_autocommit, test_parameters,
mock_get_hook):
+ hook = MagicMock()
+ mock_run = hook.run
+ mock_get_hook.return_value = hook
+ sql = MagicMock()
+ operator = RedshiftSQLOperator(
+ task_id='test', sql=sql, autocommit=test_autocommit,
parameters=test_parameters
+ )
+ operator.execute(None)
+ mock_run.assert_called_once_with(
+ sql,
+ autocommit=test_autocommit,
+ parameters=test_parameters,
+ )
diff --git a/tests/providers/amazon/aws/sensors/test_redshift.py
b/tests/providers/amazon/aws/sensors/test_redshift_cluster.py
similarity index 96%
rename from tests/providers/amazon/aws/sensors/test_redshift.py
rename to tests/providers/amazon/aws/sensors/test_redshift_cluster.py
index ec7ae66..43e6e9f 100644
--- a/tests/providers/amazon/aws/sensors/test_redshift.py
+++ b/tests/providers/amazon/aws/sensors/test_redshift_cluster.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,13 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
import unittest
import boto3
-from airflow.providers.amazon.aws.sensors.redshift import
AwsRedshiftClusterSensor
+from airflow.providers.amazon.aws.sensors.redshift_cluster import
AwsRedshiftClusterSensor
try:
from moto import mock_redshift
diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
index da7acf2..2e7bda3 100644
--- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
@@ -39,7 +39,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
-
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.run")
def test_table_unloading(
self,
table_as_file_name,
@@ -103,7 +103,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
-
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.run")
def test_execute_sts_token(
self,
table_as_file_name,
@@ -171,7 +171,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
-
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.run")
def test_custom_select_query_unloading(
self,
table,
@@ -234,7 +234,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
-
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.run")
def test_table_unloading_role_arn(
self,
table_as_file_name,