This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bfcaf19  Split redshift sql and cluster objects (#20276)
bfcaf19 is described below

commit bfcaf195a56242df1d78439323a013605cb100ee
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Dec 16 15:02:29 2021 -0800

    Split redshift sql and cluster objects (#20276)
    
    The first redshift hook was for managing the cluster itself.  Later a hook 
for _using_ the cluster (e.g. running sql statements) was added to the same 
module.  Better to separate these into distinct modules redshift_sql and 
redshift_cluster.  Here we split the redshift modules for operators, hooks, and 
sensors.
---
 airflow/contrib/hooks/redshift_hook.py             |   8 +-
 .../contrib/sensors/aws_redshift_cluster_sensor.py |   8 +-
 .../amazon/aws/example_dags/example_redshift.py    |   2 +-
 .../aws/example_dags/example_s3_to_redshift.py     |   2 +-
 airflow/providers/amazon/aws/hooks/redshift.py     | 235 +--------------------
 .../aws/hooks/{redshift.py => redshift_cluster.py} | 122 +----------
 .../aws/hooks/{redshift.py => redshift_sql.py}     | 118 +----------
 airflow/providers/amazon/aws/operators/redshift.py | 154 ++------------
 .../operators/{redshift.py => redshift_cluster.py} |  56 +----
 .../providers/amazon/aws/operators/redshift_sql.py |  73 +++++++
 airflow/providers/amazon/aws/sensors/redshift.py   |  48 +----
 .../sensors/{redshift.py => redshift_cluster.py}   |   4 +-
 .../amazon/aws/transfers/redshift_to_s3.py         |   2 +-
 .../amazon/aws/transfers/s3_to_redshift.py         |   2 +-
 airflow/providers/amazon/provider.yaml             |  12 +-
 dev/provider_packages/prepare_provider_packages.py |   5 +
 .../operators/redshift_cluster.rst                 |  44 ++++
 .../operators/{redshift.rst => redshift_sql.rst}   |  28 +--
 .../{test_redshift.py => test_redshift_cluster.py} |  72 +------
 .../amazon/aws/hooks/test_redshift_sql.py          |  89 ++++++++
 .../{test_redshift.py => test_redshift_cluster.py} |  27 +--
 .../amazon/aws/operators/test_redshift_sql.py      |  43 ++++
 .../{test_redshift.py => test_redshift_cluster.py} |   4 +-
 .../amazon/aws/transfers/test_redshift_to_s3.py    |   8 +-
 24 files changed, 330 insertions(+), 836 deletions(-)

diff --git a/airflow/contrib/hooks/redshift_hook.py 
b/airflow/contrib/hooks/redshift_hook.py
index ebaac1b..f33515f 100644
--- a/airflow/contrib/hooks/redshift_hook.py
+++ b/airflow/contrib/hooks/redshift_hook.py
@@ -16,14 +16,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""This module is deprecated. Please use 
:mod:`airflow.providers.amazon.aws.hooks.redshift`."""
+"""This module is deprecated. Please use 
:mod:`airflow.providers.amazon.aws.hooks.redshift_cluster`."""
 
 import warnings
 
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook  # noqa
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
 
 warnings.warn(
-    "This module is deprecated. Please use 
`airflow.providers.amazon.aws.hooks.redshift`.",
+    "This module is deprecated. Please use 
`airflow.providers.amazon.aws.hooks.redshift_cluster`.",
     DeprecationWarning,
     stacklevel=2,
 )
+
+__all__ = ["RedshiftHook"]
diff --git a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py 
b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
index 5c1341d..c6f3c92 100644
--- a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
+++ b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
@@ -16,14 +16,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""This module is deprecated. Please use 
:mod:`airflow.providers.amazon.aws.sensors.redshift`."""
+"""This module is deprecated. Please use 
:mod:`airflow.providers.amazon.aws.sensors.redshift_cluster`."""
 
 import warnings
 
-from airflow.providers.amazon.aws.sensors.redshift import 
AwsRedshiftClusterSensor  # noqa
+from airflow.providers.amazon.aws.sensors.redshift_cluster import 
AwsRedshiftClusterSensor
 
 warnings.warn(
-    "This module is deprecated. Please use 
`airflow.providers.amazon.aws.sensors.redshift`.",
+    "This module is deprecated. Please use 
`airflow.providers.amazon.aws.sensors.redshift_cluster`.",
     DeprecationWarning,
     stacklevel=2,
 )
+
+__all__ = ["AwsRedshiftClusterSensor"]
diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift.py 
b/airflow/providers/amazon/aws/example_dags/example_redshift.py
index dbb7d8f..51f9f14 100644
--- a/airflow/providers/amazon/aws/example_dags/example_redshift.py
+++ b/airflow/providers/amazon/aws/example_dags/example_redshift.py
@@ -23,7 +23,7 @@ from datetime import datetime
 
 # [START redshift_operator_howto_guide]
 from airflow import DAG
-from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
+from airflow.providers.amazon.aws.operators.redshift_sql import 
RedshiftSQLOperator
 
 with DAG(
     dag_id="redshift",
diff --git 
a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py 
b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
index 8114e90..65ea682 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
@@ -26,7 +26,7 @@ from airflow import DAG
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
-from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
+from airflow.providers.amazon.aws.operators.redshift_sql import 
RedshiftSQLOperator
 from airflow.providers.amazon.aws.transfers.s3_to_redshift import 
S3ToRedshiftOperator
 
 # [START howto_operator_s3_to_redshift_env_variables]
diff --git a/airflow/providers/amazon/aws/hooks/redshift.py 
b/airflow/providers/amazon/aws/hooks/redshift.py
index e734285..4f52b5b 100644
--- a/airflow/providers/amazon/aws/hooks/redshift.py
+++ b/airflow/providers/amazon/aws/hooks/redshift.py
@@ -16,231 +16,16 @@
 # specific language governing permissions and limitations
 # under the License.
 """Interact with AWS Redshift clusters."""
-import sys
-from typing import Dict, List, Optional, Union
+import warnings
 
-if sys.version_info >= (3, 8):
-    from functools import cached_property
-else:
-    from cached_property import cached_property
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
+from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
 
-import redshift_connector
-from redshift_connector import Connection as RedshiftConnection
-from sqlalchemy import create_engine
-from sqlalchemy.engine.url import URL
+warnings.warn(
+    "This module is deprecated. Please use `airflow.hooks.redshift_sql` "
+    "or `airflow.hooks.redshift_cluster` as appropriate.",
+    DeprecationWarning,
+    stacklevel=2,
+)
 
-from airflow.hooks.dbapi import DbApiHook
-from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-
-
-class RedshiftHook(AwsBaseHook):
-    """
-    Interact with AWS Redshift, using the boto3 library
-
-    Additional arguments (such as ``aws_conn_id``) may be specified and
-    are passed down to the underlying AwsBaseHook.
-
-    .. seealso::
-        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
-
-    :param aws_conn_id: The Airflow connection used for AWS credentials.
-    :type aws_conn_id: str
-    """
-
-    def __init__(self, *args, **kwargs) -> None:
-        kwargs["client_type"] = "redshift"
-        super().__init__(*args, **kwargs)
-
-    # TODO: Wrap create_cluster_snapshot
-    def cluster_status(self, cluster_identifier: str) -> str:
-        """
-        Return status of a cluster
-
-        :param cluster_identifier: unique identifier of a cluster
-        :type cluster_identifier: str
-        :param skip_final_cluster_snapshot: determines cluster snapshot 
creation
-        :type skip_final_cluster_snapshot: bool
-        :param final_cluster_snapshot_identifier: Optional[str]
-        :type final_cluster_snapshot_identifier: Optional[str]
-        """
-        try:
-            response = 
self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
-        except self.get_conn().exceptions.ClusterNotFoundFault:
-            return 'cluster_not_found'
-
-    def delete_cluster(
-        self,
-        cluster_identifier: str,
-        skip_final_cluster_snapshot: bool = True,
-        final_cluster_snapshot_identifier: Optional[str] = None,
-    ):
-        """
-        Delete a cluster and optionally create a snapshot
-
-        :param cluster_identifier: unique identifier of a cluster
-        :type cluster_identifier: str
-        :param skip_final_cluster_snapshot: determines cluster snapshot 
creation
-        :type skip_final_cluster_snapshot: bool
-        :param final_cluster_snapshot_identifier: name of final cluster 
snapshot
-        :type final_cluster_snapshot_identifier: str
-        """
-        final_cluster_snapshot_identifier = final_cluster_snapshot_identifier 
or ''
-
-        response = self.get_conn().delete_cluster(
-            ClusterIdentifier=cluster_identifier,
-            SkipFinalClusterSnapshot=skip_final_cluster_snapshot,
-            FinalClusterSnapshotIdentifier=final_cluster_snapshot_identifier,
-        )
-        return response['Cluster'] if response['Cluster'] else None
-
-    def describe_cluster_snapshots(self, cluster_identifier: str) -> 
Optional[List[str]]:
-        """
-        Gets a list of snapshots for a cluster
-
-        :param cluster_identifier: unique identifier of a cluster
-        :type cluster_identifier: str
-        """
-        response = 
self.get_conn().describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
-        if 'Snapshots' not in response:
-            return None
-        snapshots = response['Snapshots']
-        snapshots = [snapshot for snapshot in snapshots if snapshot["Status"]]
-        snapshots.sort(key=lambda x: x['SnapshotCreateTime'], reverse=True)
-        return snapshots
-
-    def restore_from_cluster_snapshot(self, cluster_identifier: str, 
snapshot_identifier: str) -> str:
-        """
-        Restores a cluster from its snapshot
-
-        :param cluster_identifier: unique identifier of a cluster
-        :type cluster_identifier: str
-        :param snapshot_identifier: unique identifier for a snapshot of a 
cluster
-        :type snapshot_identifier: str
-        """
-        response = self.get_conn().restore_from_cluster_snapshot(
-            ClusterIdentifier=cluster_identifier, 
SnapshotIdentifier=snapshot_identifier
-        )
-        return response['Cluster'] if response['Cluster'] else None
-
-    def create_cluster_snapshot(self, snapshot_identifier: str, 
cluster_identifier: str) -> str:
-        """
-        Creates a snapshot of a cluster
-
-        :param snapshot_identifier: unique identifier for a snapshot of a 
cluster
-        :type snapshot_identifier: str
-        :param cluster_identifier: unique identifier of a cluster
-        :type cluster_identifier: str
-        """
-        response = self.get_conn().create_cluster_snapshot(
-            SnapshotIdentifier=snapshot_identifier,
-            ClusterIdentifier=cluster_identifier,
-        )
-        return response['Snapshot'] if response['Snapshot'] else None
-
-
-class RedshiftSQLHook(DbApiHook):
-    """
-    Execute statements against Amazon Redshift, using redshift_connector
-
-    This hook requires the redshift_conn_id connection.
-
-    :param redshift_conn_id: reference to
-        :ref:`Amazon Redshift connection id<howto/connection:redshift>`
-    :type redshift_conn_id: str
-
-    .. note::
-        get_sqlalchemy_engine() and get_uri() depend on 
sqlalchemy-amazon-redshift
-    """
-
-    conn_name_attr = 'redshift_conn_id'
-    default_conn_name = 'redshift_default'
-    conn_type = 'redshift'
-    hook_name = 'Amazon Redshift'
-    supports_autocommit = True
-
-    @staticmethod
-    def get_ui_field_behavior() -> Dict:
-        """Returns custom field behavior"""
-        return {
-            "hidden_fields": [],
-            "relabeling": {'login': 'User', 'schema': 'Database'},
-        }
-
-    @cached_property
-    def conn(self):
-        return self.get_connection(self.redshift_conn_id)  # type: 
ignore[attr-defined]
-
-    def _get_conn_params(self) -> Dict[str, Union[str, int]]:
-        """Helper method to retrieve connection args"""
-        conn = self.conn
-
-        conn_params: Dict[str, Union[str, int]] = {}
-
-        if conn.login:
-            conn_params['user'] = conn.login
-        if conn.password:
-            conn_params['password'] = conn.password
-        if conn.host:
-            conn_params['host'] = conn.host
-        if conn.port:
-            conn_params['port'] = conn.port
-        if conn.schema:
-            conn_params['database'] = conn.schema
-
-        return conn_params
-
-    def get_uri(self) -> str:
-        """Overrides DbApiHook get_uri to use redshift_connector sqlalchemy 
dialect as driver name"""
-        conn_params = self._get_conn_params()
-
-        if 'user' in conn_params:
-            conn_params['username'] = conn_params.pop('user')
-
-        return str(URL(drivername='redshift+redshift_connector', 
**conn_params))
-
-    def get_sqlalchemy_engine(self, engine_kwargs=None):
-        """Overrides DbApiHook get_sqlalchemy_engine to pass 
redshift_connector specific kwargs"""
-        conn_kwargs = self.conn.extra_dejson
-        if engine_kwargs is None:
-            engine_kwargs = {}
-
-        if "connect_args" in engine_kwargs:
-            engine_kwargs["connect_args"] = {**conn_kwargs, 
**engine_kwargs["connect_args"]}
-        else:
-            engine_kwargs["connect_args"] = conn_kwargs
-
-        return create_engine(self.get_uri(), **engine_kwargs)
-
-    def get_table_primary_key(self, table: str, schema: Optional[str] = 
"public") -> Optional[List[str]]:
-        """
-        Helper method that returns the table primary key
-        :param table: Name of the target table
-        :type table: str
-        :param table: Name of the target schema, public by default
-        :type table: str
-        :return: Primary key columns list
-        :rtype: List[str]
-        """
-        sql = """
-            select kcu.column_name
-            from information_schema.table_constraints tco
-                    join information_schema.key_column_usage kcu
-                        on kcu.constraint_name = tco.constraint_name
-                            and kcu.constraint_schema = tco.constraint_schema
-                            and kcu.constraint_name = tco.constraint_name
-            where tco.constraint_type = 'PRIMARY KEY'
-            and kcu.table_schema = %s
-            and kcu.table_name = %s
-        """
-        pk_columns = [row[0] for row in self.get_records(sql, (schema, table))]
-        return pk_columns or None
-
-    def get_conn(self) -> RedshiftConnection:
-        """Returns a redshift_connector.Connection object"""
-        conn_params = self._get_conn_params()
-        conn_kwargs_dejson = self.conn.extra_dejson
-        conn_kwargs: Dict = {**conn_params, **conn_kwargs_dejson}
-        conn: RedshiftConnection = redshift_connector.connect(**conn_kwargs)
-
-        return conn
+__all__ = ["RedshiftHook", "RedshiftSQLHook"]
diff --git a/airflow/providers/amazon/aws/hooks/redshift.py 
b/airflow/providers/amazon/aws/hooks/redshift_cluster.py
similarity index 54%
copy from airflow/providers/amazon/aws/hooks/redshift.py
copy to airflow/providers/amazon/aws/hooks/redshift_cluster.py
index e734285..aa45d77 100644
--- a/airflow/providers/amazon/aws/hooks/redshift.py
+++ b/airflow/providers/amazon/aws/hooks/redshift_cluster.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,21 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Interact with AWS Redshift clusters."""
-import sys
-from typing import Dict, List, Optional, Union
-
-if sys.version_info >= (3, 8):
-    from functools import cached_property
-else:
-    from cached_property import cached_property
 
-import redshift_connector
-from redshift_connector import Connection as RedshiftConnection
-from sqlalchemy import create_engine
-from sqlalchemy.engine.url import URL
+from typing import List, Optional
 
-from airflow.hooks.dbapi import DbApiHook
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 
 
@@ -137,110 +124,3 @@ class RedshiftHook(AwsBaseHook):
             ClusterIdentifier=cluster_identifier,
         )
         return response['Snapshot'] if response['Snapshot'] else None
-
-
-class RedshiftSQLHook(DbApiHook):
-    """
-    Execute statements against Amazon Redshift, using redshift_connector
-
-    This hook requires the redshift_conn_id connection.
-
-    :param redshift_conn_id: reference to
-        :ref:`Amazon Redshift connection id<howto/connection:redshift>`
-    :type redshift_conn_id: str
-
-    .. note::
-        get_sqlalchemy_engine() and get_uri() depend on 
sqlalchemy-amazon-redshift
-    """
-
-    conn_name_attr = 'redshift_conn_id'
-    default_conn_name = 'redshift_default'
-    conn_type = 'redshift'
-    hook_name = 'Amazon Redshift'
-    supports_autocommit = True
-
-    @staticmethod
-    def get_ui_field_behavior() -> Dict:
-        """Returns custom field behavior"""
-        return {
-            "hidden_fields": [],
-            "relabeling": {'login': 'User', 'schema': 'Database'},
-        }
-
-    @cached_property
-    def conn(self):
-        return self.get_connection(self.redshift_conn_id)  # type: 
ignore[attr-defined]
-
-    def _get_conn_params(self) -> Dict[str, Union[str, int]]:
-        """Helper method to retrieve connection args"""
-        conn = self.conn
-
-        conn_params: Dict[str, Union[str, int]] = {}
-
-        if conn.login:
-            conn_params['user'] = conn.login
-        if conn.password:
-            conn_params['password'] = conn.password
-        if conn.host:
-            conn_params['host'] = conn.host
-        if conn.port:
-            conn_params['port'] = conn.port
-        if conn.schema:
-            conn_params['database'] = conn.schema
-
-        return conn_params
-
-    def get_uri(self) -> str:
-        """Overrides DbApiHook get_uri to use redshift_connector sqlalchemy 
dialect as driver name"""
-        conn_params = self._get_conn_params()
-
-        if 'user' in conn_params:
-            conn_params['username'] = conn_params.pop('user')
-
-        return str(URL(drivername='redshift+redshift_connector', 
**conn_params))
-
-    def get_sqlalchemy_engine(self, engine_kwargs=None):
-        """Overrides DbApiHook get_sqlalchemy_engine to pass 
redshift_connector specific kwargs"""
-        conn_kwargs = self.conn.extra_dejson
-        if engine_kwargs is None:
-            engine_kwargs = {}
-
-        if "connect_args" in engine_kwargs:
-            engine_kwargs["connect_args"] = {**conn_kwargs, 
**engine_kwargs["connect_args"]}
-        else:
-            engine_kwargs["connect_args"] = conn_kwargs
-
-        return create_engine(self.get_uri(), **engine_kwargs)
-
-    def get_table_primary_key(self, table: str, schema: Optional[str] = 
"public") -> Optional[List[str]]:
-        """
-        Helper method that returns the table primary key
-        :param table: Name of the target table
-        :type table: str
-        :param table: Name of the target schema, public by default
-        :type table: str
-        :return: Primary key columns list
-        :rtype: List[str]
-        """
-        sql = """
-            select kcu.column_name
-            from information_schema.table_constraints tco
-                    join information_schema.key_column_usage kcu
-                        on kcu.constraint_name = tco.constraint_name
-                            and kcu.constraint_schema = tco.constraint_schema
-                            and kcu.constraint_name = tco.constraint_name
-            where tco.constraint_type = 'PRIMARY KEY'
-            and kcu.table_schema = %s
-            and kcu.table_name = %s
-        """
-        pk_columns = [row[0] for row in self.get_records(sql, (schema, table))]
-        return pk_columns or None
-
-    def get_conn(self) -> RedshiftConnection:
-        """Returns a redshift_connector.Connection object"""
-        conn_params = self._get_conn_params()
-        conn_kwargs_dejson = self.conn.extra_dejson
-        conn_kwargs: Dict = {**conn_params, **conn_kwargs_dejson}
-        conn: RedshiftConnection = redshift_connector.connect(**conn_kwargs)
-
-        return conn
diff --git a/airflow/providers/amazon/aws/hooks/redshift.py 
b/airflow/providers/amazon/aws/hooks/redshift_sql.py
similarity index 53%
copy from airflow/providers/amazon/aws/hooks/redshift.py
copy to airflow/providers/amazon/aws/hooks/redshift_sql.py
index e734285..739b7b5 100644
--- a/airflow/providers/amazon/aws/hooks/redshift.py
+++ b/airflow/providers/amazon/aws/hooks/redshift_sql.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,128 +14,21 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Interact with AWS Redshift clusters."""
+
 import sys
 from typing import Dict, List, Optional, Union
 
-if sys.version_info >= (3, 8):
-    from functools import cached_property
-else:
-    from cached_property import cached_property
-
 import redshift_connector
 from redshift_connector import Connection as RedshiftConnection
 from sqlalchemy import create_engine
 from sqlalchemy.engine.url import URL
 
 from airflow.hooks.dbapi import DbApiHook
-from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-
-
-class RedshiftHook(AwsBaseHook):
-    """
-    Interact with AWS Redshift, using the boto3 library
-
-    Additional arguments (such as ``aws_conn_id``) may be specified and
-    are passed down to the underlying AwsBaseHook.
-
-    .. seealso::
-        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
-
-    :param aws_conn_id: The Airflow connection used for AWS credentials.
-    :type aws_conn_id: str
-    """
 
-    def __init__(self, *args, **kwargs) -> None:
-        kwargs["client_type"] = "redshift"
-        super().__init__(*args, **kwargs)
-
-    # TODO: Wrap create_cluster_snapshot
-    def cluster_status(self, cluster_identifier: str) -> str:
-        """
-        Return status of a cluster
-
-        :param cluster_identifier: unique identifier of a cluster
-        :type cluster_identifier: str
-        :param skip_final_cluster_snapshot: determines cluster snapshot 
creation
-        :type skip_final_cluster_snapshot: bool
-        :param final_cluster_snapshot_identifier: Optional[str]
-        :type final_cluster_snapshot_identifier: Optional[str]
-        """
-        try:
-            response = 
self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
-        except self.get_conn().exceptions.ClusterNotFoundFault:
-            return 'cluster_not_found'
-
-    def delete_cluster(
-        self,
-        cluster_identifier: str,
-        skip_final_cluster_snapshot: bool = True,
-        final_cluster_snapshot_identifier: Optional[str] = None,
-    ):
-        """
-        Delete a cluster and optionally create a snapshot
-
-        :param cluster_identifier: unique identifier of a cluster
-        :type cluster_identifier: str
-        :param skip_final_cluster_snapshot: determines cluster snapshot 
creation
-        :type skip_final_cluster_snapshot: bool
-        :param final_cluster_snapshot_identifier: name of final cluster 
snapshot
-        :type final_cluster_snapshot_identifier: str
-        """
-        final_cluster_snapshot_identifier = final_cluster_snapshot_identifier 
or ''
-
-        response = self.get_conn().delete_cluster(
-            ClusterIdentifier=cluster_identifier,
-            SkipFinalClusterSnapshot=skip_final_cluster_snapshot,
-            FinalClusterSnapshotIdentifier=final_cluster_snapshot_identifier,
-        )
-        return response['Cluster'] if response['Cluster'] else None
-
-    def describe_cluster_snapshots(self, cluster_identifier: str) -> 
Optional[List[str]]:
-        """
-        Gets a list of snapshots for a cluster
-
-        :param cluster_identifier: unique identifier of a cluster
-        :type cluster_identifier: str
-        """
-        response = 
self.get_conn().describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
-        if 'Snapshots' not in response:
-            return None
-        snapshots = response['Snapshots']
-        snapshots = [snapshot for snapshot in snapshots if snapshot["Status"]]
-        snapshots.sort(key=lambda x: x['SnapshotCreateTime'], reverse=True)
-        return snapshots
-
-    def restore_from_cluster_snapshot(self, cluster_identifier: str, 
snapshot_identifier: str) -> str:
-        """
-        Restores a cluster from its snapshot
-
-        :param cluster_identifier: unique identifier of a cluster
-        :type cluster_identifier: str
-        :param snapshot_identifier: unique identifier for a snapshot of a 
cluster
-        :type snapshot_identifier: str
-        """
-        response = self.get_conn().restore_from_cluster_snapshot(
-            ClusterIdentifier=cluster_identifier, 
SnapshotIdentifier=snapshot_identifier
-        )
-        return response['Cluster'] if response['Cluster'] else None
-
-    def create_cluster_snapshot(self, snapshot_identifier: str, 
cluster_identifier: str) -> str:
-        """
-        Creates a snapshot of a cluster
-
-        :param snapshot_identifier: unique identifier for a snapshot of a 
cluster
-        :type snapshot_identifier: str
-        :param cluster_identifier: unique identifier of a cluster
-        :type cluster_identifier: str
-        """
-        response = self.get_conn().create_cluster_snapshot(
-            SnapshotIdentifier=snapshot_identifier,
-            ClusterIdentifier=cluster_identifier,
-        )
-        return response['Snapshot'] if response['Snapshot'] else None
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
 
 
 class RedshiftSQLHook(DbApiHook):
diff --git a/airflow/providers/amazon/aws/operators/redshift.py 
b/airflow/providers/amazon/aws/operators/redshift.py
index 52d82b4..71bd3c4 100644
--- a/airflow/providers/amazon/aws/operators/redshift.py
+++ b/airflow/providers/amazon/aws/operators/redshift.py
@@ -15,141 +15,19 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import Dict, Iterable, Optional, Union
-
-from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook, 
RedshiftSQLHook
-
-
-class RedshiftSQLOperator(BaseOperator):
-    """
-    Executes SQL Statements against an Amazon Redshift cluster
-
-    .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
-        :ref:`howto/operator:RedshiftSQLOperator`
-
-    :param sql: the sql code to be executed
-    :type sql: Can receive a str representing a sql statement,
-        or an iterable of str (sql statements)
-    :param redshift_conn_id: reference to
-        :ref:`Amazon Redshift connection id<howto/connection:redshift>`
-    :type redshift_conn_id: str
-    :param parameters: (optional) the parameters to render the SQL query with.
-    :type parameters: dict or iterable
-    :param autocommit: if True, each command is automatically committed.
-        (default value: False)
-    :type autocommit: bool
-    """
-
-    template_fields = ('sql',)
-    template_ext = ('.sql',)
-
-    def __init__(
-        self,
-        *,
-        sql: Optional[Union[Dict, Iterable]],
-        redshift_conn_id: str = 'redshift_default',
-        parameters: Optional[dict] = None,
-        autocommit: bool = True,
-        **kwargs,
-    ) -> None:
-        super().__init__(**kwargs)
-        self.redshift_conn_id = redshift_conn_id
-        self.sql = sql
-        self.autocommit = autocommit
-        self.parameters = parameters
-
-    def get_hook(self) -> RedshiftSQLHook:
-        """Create and return RedshiftSQLHook.
-        :return RedshiftSQLHook: A RedshiftSQLHook instance.
-        """
-        return RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
-
-    def execute(self, context: dict) -> None:
-        """Execute a statement against Amazon Redshift"""
-        self.log.info(f"Executing statement: {self.sql}")
-        hook = self.get_hook()
-        hook.run(self.sql, autocommit=self.autocommit, 
parameters=self.parameters)
-
-
-class RedshiftResumeClusterOperator(BaseOperator):
-    """
-    Resume a paused AWS Redshift Cluster
-
-    .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
-        :ref:`howto/operator:RedshiftResumeClusterOperator`
-
-    :param cluster_identifier: id of the AWS Redshift Cluster
-    :type cluster_identifier: str
-    :param aws_conn_id: aws connection to use
-    :type aws_conn_id: str
-    """
-
-    template_fields = ("cluster_identifier",)
-    ui_color = "#eeaa11"
-    ui_fgcolor = "#ffffff"
-
-    def __init__(
-        self,
-        *,
-        cluster_identifier: str,
-        aws_conn_id: str = "aws_default",
-        **kwargs,
-    ):
-        super().__init__(**kwargs)
-        self.cluster_identifier = cluster_identifier
-        self.aws_conn_id = aws_conn_id
-
-    def execute(self, context):
-        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
-        cluster_state = 
redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
-        if cluster_state == 'paused':
-            self.log.info("Starting Redshift cluster %s", 
self.cluster_identifier)
-            
redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
-        else:
-            self.log.warning(
-                "Unable to resume cluster since cluster is currently in 
status: %s", cluster_state
-            )
-
-
-class RedshiftPauseClusterOperator(BaseOperator):
-    """
-    Pause an AWS Redshift Cluster if it has status `available`.
-
-    .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
-        :ref:`howto/operator:RedshiftPauseClusterOperator`
-
-    :param cluster_identifier: id of the AWS Redshift Cluster
-    :type cluster_identifier: str
-    :param aws_conn_id: aws connection to use
-    :type aws_conn_id: str
-    """
-
-    template_fields = ("cluster_identifier",)
-    ui_color = "#eeaa11"
-    ui_fgcolor = "#ffffff"
-
-    def __init__(
-        self,
-        *,
-        cluster_identifier: str,
-        aws_conn_id: str = "aws_default",
-        **kwargs,
-    ):
-        super().__init__(**kwargs)
-        self.cluster_identifier = cluster_identifier
-        self.aws_conn_id = aws_conn_id
-
-    def execute(self, context):
-        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
-        cluster_state = 
redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
-        if cluster_state == 'available':
-            self.log.info("Pausing Redshift cluster %s", 
self.cluster_identifier)
-            
redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)
-        else:
-            self.log.warning(
-                "Unable to pause cluster since cluster is currently in status: 
%s", cluster_state
-            )
+import warnings
+
+from airflow.providers.amazon.aws.operators.redshift_cluster import (
+    RedshiftPauseClusterOperator,
+    RedshiftResumeClusterOperator,
+)
+from airflow.providers.amazon.aws.operators.redshift_sql import 
RedshiftSQLOperator
+
+warnings.warn(
+    "This module is deprecated. Please use `airflow.operators.redshift_sql` "
+    "or `airflow.operators.redshift_cluster` as appropriate.",
+    DeprecationWarning,
+    stacklevel=2,
+)
+
+__all__ = ["RedshiftSQLOperator", "RedshiftPauseClusterOperator", 
"RedshiftResumeClusterOperator"]
diff --git a/airflow/providers/amazon/aws/operators/redshift.py 
b/airflow/providers/amazon/aws/operators/redshift_cluster.py
similarity index 64%
copy from airflow/providers/amazon/aws/operators/redshift.py
copy to airflow/providers/amazon/aws/operators/redshift_cluster.py
index 52d82b4..5ec9c01 100644
--- a/airflow/providers/amazon/aws/operators/redshift.py
+++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,62 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import Dict, Iterable, Optional, Union
 
 from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook, 
RedshiftSQLHook
-
-
-class RedshiftSQLOperator(BaseOperator):
-    """
-    Executes SQL Statements against an Amazon Redshift cluster
-
-    .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
-        :ref:`howto/operator:RedshiftSQLOperator`
-
-    :param sql: the sql code to be executed
-    :type sql: Can receive a str representing a sql statement,
-        or an iterable of str (sql statements)
-    :param redshift_conn_id: reference to
-        :ref:`Amazon Redshift connection id<howto/connection:redshift>`
-    :type redshift_conn_id: str
-    :param parameters: (optional) the parameters to render the SQL query with.
-    :type parameters: dict or iterable
-    :param autocommit: if True, each command is automatically committed.
-        (default value: False)
-    :type autocommit: bool
-    """
-
-    template_fields = ('sql',)
-    template_ext = ('.sql',)
-
-    def __init__(
-        self,
-        *,
-        sql: Optional[Union[Dict, Iterable]],
-        redshift_conn_id: str = 'redshift_default',
-        parameters: Optional[dict] = None,
-        autocommit: bool = True,
-        **kwargs,
-    ) -> None:
-        super().__init__(**kwargs)
-        self.redshift_conn_id = redshift_conn_id
-        self.sql = sql
-        self.autocommit = autocommit
-        self.parameters = parameters
-
-    def get_hook(self) -> RedshiftSQLHook:
-        """Create and return RedshiftSQLHook.
-        :return RedshiftSQLHook: A RedshiftSQLHook instance.
-        """
-        return RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
-
-    def execute(self, context: dict) -> None:
-        """Execute a statement against Amazon Redshift"""
-        self.log.info(f"Executing statement: {self.sql}")
-        hook = self.get_hook()
-        hook.run(self.sql, autocommit=self.autocommit, 
parameters=self.parameters)
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
 
 
 class RedshiftResumeClusterOperator(BaseOperator):
diff --git a/airflow/providers/amazon/aws/operators/redshift_sql.py 
b/airflow/providers/amazon/aws/operators/redshift_sql.py
new file mode 100644
index 0000000..4e40baa
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/redshift_sql.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Iterable, Optional, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
+
+
+class RedshiftSQLOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:RedshiftSQLOperator`
+
+    :param sql: the sql code to be executed
+    :type sql: Can receive a str representing a sql statement,
+        or an iterable of str (sql statements)
+    :param redshift_conn_id: reference to
+        :ref:`Amazon Redshift connection id<howto/connection:redshift>`
+    :type redshift_conn_id: str
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :type parameters: dict or iterable
+    :param autocommit: if True, each command is automatically committed.
+        (default value: False)
+    :type autocommit: bool
+    """
+
+    template_fields = ('sql',)
+    template_ext = ('.sql',)
+
+    def __init__(
+        self,
+        *,
+        sql: Optional[Union[Dict, Iterable]],
+        redshift_conn_id: str = 'redshift_default',
+        parameters: Optional[dict] = None,
+        autocommit: bool = True,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.redshift_conn_id = redshift_conn_id
+        self.sql = sql
+        self.autocommit = autocommit
+        self.parameters = parameters
+
+    def get_hook(self) -> RedshiftSQLHook:
+        """Create and return RedshiftSQLHook.
+        :return RedshiftSQLHook: A RedshiftSQLHook instance.
+        """
+        return RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
+
+    def execute(self, context: dict) -> None:
+        """Execute a statement against Amazon Redshift"""
+        self.log.info(f"Executing statement: {self.sql}")
+        hook = self.get_hook()
+        hook.run(self.sql, autocommit=self.autocommit, 
parameters=self.parameters)
diff --git a/airflow/providers/amazon/aws/sensors/redshift.py 
b/airflow/providers/amazon/aws/sensors/redshift.py
index 669e6c0..0f41508 100644
--- a/airflow/providers/amazon/aws/sensors/redshift.py
+++ b/airflow/providers/amazon/aws/sensors/redshift.py
@@ -15,46 +15,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import Optional
+import warnings
 
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook
-from airflow.sensors.base import BaseSensorOperator
+from airflow.providers.amazon.aws.sensors.redshift_cluster import 
AwsRedshiftClusterSensor
 
+warnings.warn(
+    "This module is deprecated. Please use 
`airflow.sensors.redshift_cluster`.",
+    DeprecationWarning,
+    stacklevel=2,
+)
 
-class AwsRedshiftClusterSensor(BaseSensorOperator):
-    """
-    Waits for a Redshift cluster to reach a specific status.
-
-    :param cluster_identifier: The identifier for the cluster being pinged.
-    :type cluster_identifier: str
-    :param target_status: The cluster status desired.
-    :type target_status: str
-    """
-
-    template_fields = ('cluster_identifier', 'target_status')
-
-    def __init__(
-        self,
-        *,
-        cluster_identifier: str,
-        target_status: str = 'available',
-        aws_conn_id: str = 'aws_default',
-        **kwargs,
-    ):
-        super().__init__(**kwargs)
-        self.cluster_identifier = cluster_identifier
-        self.target_status = target_status
-        self.aws_conn_id = aws_conn_id
-        self.hook: Optional[RedshiftHook] = None
-
-    def poke(self, context):
-        self.log.info('Poking for status : %s\nfor cluster %s', 
self.target_status, self.cluster_identifier)
-        return self.get_hook().cluster_status(self.cluster_identifier) == 
self.target_status
-
-    def get_hook(self) -> RedshiftHook:
-        """Create and return a RedshiftHook"""
-        if self.hook:
-            return self.hook
-
-        self.hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
-        return self.hook
+__all__ = ["AwsRedshiftClusterSensor"]
diff --git a/airflow/providers/amazon/aws/sensors/redshift.py 
b/airflow/providers/amazon/aws/sensors/redshift_cluster.py
similarity index 96%
copy from airflow/providers/amazon/aws/sensors/redshift.py
copy to airflow/providers/amazon/aws/sensors/redshift_cluster.py
index 669e6c0..91b0703 100644
--- a/airflow/providers/amazon/aws/sensors/redshift.py
+++ b/airflow/providers/amazon/aws/sensors/redshift_cluster.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,9 +14,10 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from typing import Optional
 
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
 from airflow.sensors.base import BaseSensorOperator
 
 
diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py 
b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
index 42ce2b8..f233e94 100644
--- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
@@ -19,7 +19,7 @@
 from typing import Iterable, List, Mapping, Optional, Union
 
 from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook
+from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.utils.redshift import build_credentials_block
 
diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py 
b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
index 5f5eb38..0c3b347 100644
--- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
@@ -20,7 +20,7 @@ from typing import List, Optional, Union
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook
+from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.utils.redshift import build_credentials_block
 
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index d602132..5664ba4 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -105,7 +105,8 @@ integrations:
     external-doc-url: https://aws.amazon.com/redshift/
     logo: /integration-logos/aws/[email protected]
     how-to-guide:
-      - /docs/apache-airflow-providers-amazon/operators/redshift.rst
+      - /docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
+      - /docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
     tags: [aws]
   - integration-name: Amazon SageMaker
     external-doc-url: https://aws.amazon.com/sagemaker/
@@ -252,6 +253,8 @@ operators:
   - integration-name: Amazon Redshift
     python-modules:
       - airflow.providers.amazon.aws.operators.redshift
+      - airflow.providers.amazon.aws.operators.redshift_sql
+      - airflow.providers.amazon.aws.operators.redshift_cluster
 
 sensors:
   - integration-name: Amazon Athena
@@ -295,6 +298,7 @@ sensors:
   - integration-name: Amazon Redshift
     python-modules:
       - airflow.providers.amazon.aws.sensors.redshift
+      - airflow.providers.amazon.aws.sensors.redshift_cluster
   - integration-name: Amazon Simple Storage Service (S3)
     python-modules:
       - airflow.providers.amazon.aws.sensors.s3_key
@@ -375,6 +379,8 @@ hooks:
   - integration-name: Amazon Redshift
     python-modules:
       - airflow.providers.amazon.aws.hooks.redshift
+      - airflow.providers.amazon.aws.hooks.redshift_sql
+      - airflow.providers.amazon.aws.hooks.redshift_cluster
   - integration-name: Amazon Simple Storage Service (S3)
     python-modules:
       - airflow.providers.amazon.aws.hooks.s3
@@ -461,7 +467,7 @@ hook-class-names:  # deprecated - to be removed after 
providers add dependency o
   - airflow.providers.amazon.aws.hooks.s3.S3Hook
   - airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook
   - airflow.providers.amazon.aws.hooks.emr.EmrHook
-  - airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook
+  - airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook
 
 extra-links:
   - airflow.providers.amazon.aws.operators.emr.EmrClusterLink
@@ -474,7 +480,7 @@ connection-types:
     connection-type: aws
   - hook-class-name: airflow.providers.amazon.aws.hooks.emr.EmrHook
     connection-type: emr
-  - hook-class-name: 
airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook
+  - hook-class-name: 
airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook
     connection-type: redshift
 
 secrets-backends:
diff --git a/dev/provider_packages/prepare_provider_packages.py 
b/dev/provider_packages/prepare_provider_packages.py
index 6c57392..2e77ea9 100755
--- a/dev/provider_packages/prepare_provider_packages.py
+++ b/dev/provider_packages/prepare_provider_packages.py
@@ -2154,6 +2154,11 @@ KNOWN_DEPRECATED_DIRECT_IMPORTS: Set[str] = {
     "This module is deprecated. Please use 
`airflow.providers.amazon.aws.sensors.dms`.",
     'This module is deprecated. Please use 
`airflow.providers.amazon.aws.operators.emr`.',
     'This module is deprecated. Please use 
`airflow.providers.amazon.aws.sensors.emr`.',
+    'This module is deprecated. Please use `airflow.hooks.redshift_sql` '
+    'or `airflow.hooks.redshift_cluster` as appropriate.',
+    'This module is deprecated. Please use `airflow.operators.redshift_sql` or 
'
+    '`airflow.operators.redshift_cluster` as appropriate.',
+    'This module is deprecated. Please use 
`airflow.sensors.redshift_cluster`.',
 }
 
 
diff --git 
a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst 
b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
new file mode 100644
index 0000000..352478b
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
@@ -0,0 +1,44 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Redshift cluster management operators
+=====================================
+
+.. contents::
+  :depth: 1
+  :local:
+
+.. _howto/operator:RedshiftResumeClusterOperator:
+
+Resume a Redshift Cluster
+"""""""""""""""""""""""""
+
+To resume a 'paused' AWS Redshift Cluster you can use
+:class:`RedshiftResumeClusterOperator 
<airflow.providers.amazon.aws.operators.redshift_cluster>`
+
+This Operator leverages the AWS CLI
+`resume-cluster 
<https://docs.aws.amazon.com/cli/latest/reference/redshift/resume-cluster.html>`__
 API
+
+.. _howto/operator:RedshiftPauseClusterOperator:
+
+Pause a Redshift Cluster
+""""""""""""""""""""""""
+
+To pause an 'available' AWS Redshift Cluster you can use
+:class:`RedshiftPauseClusterOperator 
<airflow.providers.amazon.aws.operators.redshift_cluster>`
+This Operator leverages the AWS CLI
+`pause-cluster 
<https://docs.aws.amazon.com/cli/latest/reference/redshift/pause-cluster.html>`__
 API
diff --git a/docs/apache-airflow-providers-amazon/operators/redshift.rst 
b/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
similarity index 77%
rename from docs/apache-airflow-providers-amazon/operators/redshift.rst
rename to docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
index 5a5d1d7..c53a4b9 100644
--- a/docs/apache-airflow-providers-amazon/operators/redshift.rst
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst
@@ -27,10 +27,10 @@ RedshiftSQLOperator
 Overview
 --------
 
-Use the :class:`RedshiftSQLOperator 
<airflow.providers.amazon.aws.operators.redshift>` to execute
+Use the :class:`RedshiftSQLOperator 
<airflow.providers.amazon.aws.operators.redshift_sql>` to execute
 statements against an Amazon Redshift cluster.
 
-:class:`RedshiftSQLOperator 
<airflow.providers.amazon.aws.operators.redshift.RedshiftSQLOperator>` works 
together with
+:class:`RedshiftSQLOperator 
<airflow.providers.amazon.aws.operators.redshift_sql.RedshiftSQLOperator>` 
works together with
 :class:`RedshiftSQLHook 
<airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook>` to establish
 connections with Amazon Redshift.
 
@@ -41,7 +41,7 @@ example_redshift.py
 Purpose
 """""""
 
-This is a basic example dag for using :class:`RedshiftSQLOperator 
<airflow.providers.amazon.aws.operators.redshift>`
+This is a basic example dag for using :class:`RedshiftSQLOperator 
<airflow.providers.amazon.aws.operators.redshift_sql>`
 to execute statements against an Amazon Redshift cluster.
 
 Create a table
@@ -94,25 +94,3 @@ All together, here is our DAG:
     :language: python
     :start-after: [START redshift_operator_howto_guide]
     :end-before: [END redshift_operator_howto_guide]
-
-
-.. _howto/operator:RedshiftResumeClusterOperator:
-
-Resume a Redshift Cluster
-"""""""""""""""""""""""""""""""""""""""""""
-
-To resume a 'paused' AWS Redshift Cluster you can use
-:class:`RedshiftResumeClusterOperator 
<airflow.providers.amazon.aws.operators.redshift>`
-
-This Operator leverages the AWS CLI
-`resume-cluster 
<https://docs.aws.amazon.com/cli/latest/reference/redshift/resume-cluster.html>`__
 API
-
-.. _howto/operator:RedshiftPauseClusterOperator:
-
-Pause a Redshift Cluster
-"""""""""""""""""""""""""""""""""""""""""""
-
-To pause an 'available' AWS Redshift Cluster you can use
-:class:`RedshiftPauseClusterOperator 
<airflow.providers.amazon.aws.operators.redshift>`
-This Operator leverages the AWS CLI
-`pause-cluster 
<https://docs.aws.amazon.com/cli/latest/reference/redshift/pause-cluster.html>`__
 API
diff --git a/tests/providers/amazon/aws/hooks/test_redshift.py 
b/tests/providers/amazon/aws/hooks/test_redshift_cluster.py
similarity index 60%
rename from tests/providers/amazon/aws/hooks/test_redshift.py
rename to tests/providers/amazon/aws/hooks/test_redshift_cluster.py
index 35a1b5e..12d0bf4 100644
--- a/tests/providers/amazon/aws/hooks/test_redshift.py
+++ b/tests/providers/amazon/aws/hooks/test_redshift_cluster.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,18 +14,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
 
-import json
 import unittest
-from unittest import mock
 
 import boto3
-from parameterized import parameterized
 
-from airflow.models import Connection
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook, 
RedshiftSQLHook
+from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
 
 try:
     from moto import mock_redshift
@@ -107,67 +101,3 @@ class TestRedshiftHook(unittest.TestCase):
         hook = RedshiftHook(aws_conn_id='aws_default')
         status = hook.cluster_status('test_cluster')
         assert status == 'available'
-
-
-class TestRedshiftSQLHookConn(unittest.TestCase):
-    def setUp(self):
-        super().setUp()
-
-        self.connection = Connection(
-            conn_type='redshift', login='login', password='password', 
host='host', port=5439, schema="dev"
-        )
-
-        self.db_hook = RedshiftSQLHook()
-        self.db_hook.get_connection = mock.Mock()
-        self.db_hook.get_connection.return_value = self.connection
-
-    def test_get_uri(self):
-        expected = 'redshift+redshift_connector://login:password@host:5439/dev'
-        x = self.db_hook.get_uri()
-        assert x == expected
-
-    
@mock.patch('airflow.providers.amazon.aws.hooks.redshift.redshift_connector.connect')
-    def test_get_conn(self, mock_connect):
-        self.db_hook.get_conn()
-        mock_connect.assert_called_once_with(
-            user='login', password='password', host='host', port=5439, 
database='dev'
-        )
-
-    
@mock.patch('airflow.providers.amazon.aws.hooks.redshift.redshift_connector.connect')
-    def test_get_conn_extra(self, mock_connect):
-        self.connection.extra = json.dumps(
-            {
-                "iam": True,
-                "cluster_identifier": "my-test-cluster",
-                "profile": "default",
-            }
-        )
-        self.db_hook.get_conn()
-        mock_connect.assert_called_once_with(
-            user='login',
-            password='password',
-            host='host',
-            port=5439,
-            cluster_identifier="my-test-cluster",
-            profile="default",
-            database='dev',
-            iam=True,
-        )
-
-    @parameterized.expand(
-        [
-            ({}, {}, {}),
-            ({"login": "test"}, {}, {"user": "test"}),
-            ({}, {"user": "test"}, {"user": "test"}),
-            ({"login": "original"}, {"user": "overridden"}, {"user": 
"overridden"}),
-            ({"login": "test1"}, {"password": "test2"}, {"user": "test1", 
"password": "test2"}),
-        ],
-    )
-    
@mock.patch('airflow.providers.amazon.aws.hooks.redshift.redshift_connector.connect')
-    def test_get_conn_overrides_correctly(self, conn_params, conn_extra, 
expected_call_args, mock_connect):
-        with mock.patch(
-            'airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.conn',
-            Connection(conn_type='redshift', extra=conn_extra, **conn_params),
-        ):
-            self.db_hook.get_conn()
-            mock_connect.assert_called_once_with(**expected_call_args)
diff --git a/tests/providers/amazon/aws/hooks/test_redshift_sql.py 
b/tests/providers/amazon/aws/hooks/test_redshift_sql.py
new file mode 100644
index 0000000..e4754e7
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_redshift_sql.py
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import unittest
+from unittest import mock
+
+from parameterized import parameterized
+
+from airflow.models import Connection
+from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
+
+
+class TestRedshiftSQLHookConn(unittest.TestCase):
+    def setUp(self):
+        super().setUp()
+
+        self.connection = Connection(
+            conn_type='redshift', login='login', password='password', 
host='host', port=5439, schema="dev"
+        )
+
+        self.db_hook = RedshiftSQLHook()
+        self.db_hook.get_connection = mock.Mock()
+        self.db_hook.get_connection.return_value = self.connection
+
+    def test_get_uri(self):
+        expected = 'redshift+redshift_connector://login:password@host:5439/dev'
+        x = self.db_hook.get_uri()
+        assert x == expected
+
+    
@mock.patch('airflow.providers.amazon.aws.hooks.redshift_sql.redshift_connector.connect')
+    def test_get_conn(self, mock_connect):
+        self.db_hook.get_conn()
+        mock_connect.assert_called_once_with(
+            user='login', password='password', host='host', port=5439, 
database='dev'
+        )
+
+    
@mock.patch('airflow.providers.amazon.aws.hooks.redshift_sql.redshift_connector.connect')
+    def test_get_conn_extra(self, mock_connect):
+        self.connection.extra = json.dumps(
+            {
+                "iam": True,
+                "cluster_identifier": "my-test-cluster",
+                "profile": "default",
+            }
+        )
+        self.db_hook.get_conn()
+        mock_connect.assert_called_once_with(
+            user='login',
+            password='password',
+            host='host',
+            port=5439,
+            cluster_identifier="my-test-cluster",
+            profile="default",
+            database='dev',
+            iam=True,
+        )
+
+    @parameterized.expand(
+        [
+            ({}, {}, {}),
+            ({"login": "test"}, {}, {"user": "test"}),
+            ({}, {"user": "test"}, {"user": "test"}),
+            ({"login": "original"}, {"user": "overridden"}, {"user": 
"overridden"}),
+            ({"login": "test1"}, {"password": "test2"}, {"user": "test1", 
"password": "test2"}),
+        ],
+    )
+    
@mock.patch('airflow.providers.amazon.aws.hooks.redshift_sql.redshift_connector.connect')
+    def test_get_conn_overrides_correctly(self, conn_params, conn_extra, 
expected_call_args, mock_connect):
+        with mock.patch(
+            'airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.conn',
+            Connection(conn_type='redshift', extra=conn_extra, **conn_params),
+        ):
+            self.db_hook.get_conn()
+            mock_connect.assert_called_once_with(**expected_call_args)
diff --git a/tests/providers/amazon/aws/operators/test_redshift.py 
b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
similarity index 81%
rename from tests/providers/amazon/aws/operators/test_redshift.py
rename to tests/providers/amazon/aws/operators/test_redshift_cluster.py
index d43cf4a..a1944a8 100644
--- a/tests/providers/amazon/aws/operators/test_redshift.py
+++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,38 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import unittest
 from unittest import mock
-from unittest.mock import MagicMock
-
-from parameterized import parameterized
 
-from airflow.providers.amazon.aws.operators.redshift import (
+from airflow.providers.amazon.aws.operators.redshift_cluster import (
     RedshiftPauseClusterOperator,
     RedshiftResumeClusterOperator,
-    RedshiftSQLOperator,
 )
 
 
-class TestRedshiftSQLOperator(unittest.TestCase):
-    @parameterized.expand([(True, ('a', 'b')), (False, ('c', 'd'))])
-    
@mock.patch("airflow.providers.amazon.aws.operators.redshift.RedshiftSQLOperator.get_hook")
-    def test_redshift_operator(self, test_autocommit, test_parameters, 
mock_get_hook):
-        hook = MagicMock()
-        mock_run = hook.run
-        mock_get_hook.return_value = hook
-        sql = MagicMock()
-        operator = RedshiftSQLOperator(
-            task_id='test', sql=sql, autocommit=test_autocommit, 
parameters=test_parameters
-        )
-        operator.execute(None)
-        mock_run.assert_called_once_with(
-            sql,
-            autocommit=test_autocommit,
-            parameters=test_parameters,
-        )
-
-
 class TestResumeClusterOperator:
     def test_init(self):
         redshift_operator = RedshiftResumeClusterOperator(
diff --git a/tests/providers/amazon/aws/operators/test_redshift_sql.py 
b/tests/providers/amazon/aws/operators/test_redshift_sql.py
new file mode 100644
index 0000000..064e86a
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_redshift_sql.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+from unittest.mock import MagicMock
+
+from parameterized import parameterized
+
+from airflow.providers.amazon.aws.operators.redshift_sql import 
RedshiftSQLOperator
+
+
+class TestRedshiftSQLOperator(unittest.TestCase):
+    @parameterized.expand([(True, ('a', 'b')), (False, ('c', 'd'))])
+    
@mock.patch("airflow.providers.amazon.aws.operators.redshift_sql.RedshiftSQLOperator.get_hook")
+    def test_redshift_operator(self, test_autocommit, test_parameters, 
mock_get_hook):
+        hook = MagicMock()
+        mock_run = hook.run
+        mock_get_hook.return_value = hook
+        sql = MagicMock()
+        operator = RedshiftSQLOperator(
+            task_id='test', sql=sql, autocommit=test_autocommit, 
parameters=test_parameters
+        )
+        operator.execute(None)
+        mock_run.assert_called_once_with(
+            sql,
+            autocommit=test_autocommit,
+            parameters=test_parameters,
+        )
diff --git a/tests/providers/amazon/aws/sensors/test_redshift.py 
b/tests/providers/amazon/aws/sensors/test_redshift_cluster.py
similarity index 96%
rename from tests/providers/amazon/aws/sensors/test_redshift.py
rename to tests/providers/amazon/aws/sensors/test_redshift_cluster.py
index ec7ae66..43e6e9f 100644
--- a/tests/providers/amazon/aws/sensors/test_redshift.py
+++ b/tests/providers/amazon/aws/sensors/test_redshift_cluster.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,13 +14,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
 
 import unittest
 
 import boto3
 
-from airflow.providers.amazon.aws.sensors.redshift import 
AwsRedshiftClusterSensor
+from airflow.providers.amazon.aws.sensors.redshift_cluster import 
AwsRedshiftClusterSensor
 
 try:
     from moto import mock_redshift
diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py 
b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
index da7acf2..2e7bda3 100644
--- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
@@ -39,7 +39,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.run")
     def test_table_unloading(
         self,
         table_as_file_name,
@@ -103,7 +103,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.run")
     def test_execute_sts_token(
         self,
         table_as_file_name,
@@ -171,7 +171,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.run")
     def test_custom_select_query_unloading(
         self,
         table,
@@ -234,7 +234,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
     @mock.patch("airflow.models.connection.Connection")
     @mock.patch("boto3.session.Session")
-    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.run")
     def test_table_unloading_role_arn(
         self,
         table_as_file_name,

Reply via email to