Brooke-white commented on a change in pull request #18447:
URL: https://github.com/apache/airflow/pull/18447#discussion_r720539871



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -126,3 +135,101 @@ def create_cluster_snapshot(self, snapshot_identifier: 
str, cluster_identifier:
             ClusterIdentifier=cluster_identifier,
         )
         return response['Snapshot'] if response['Snapshot'] else None
+
+
+class RedshiftSQLHook(DbApiHook):
+    """
+    Execute statements against Amazon Redshift, using redshift_connector
+
+    This hook requires the redshift_conn_id connection. This connection must
+    be initialized with the host, port, login, password. Additional connection
+    options can be passed to extra as a JSON string.
+
+    :param redshift_conn_id: reference to
+        :ref:`Amazon Redshift connection id<howto/connection:redshift>`
+    :type redshift_conn_id: str
+
+    .. note::
+        get_sqlalchemy_engine() and get_uri() depend on 
sqlalchemy-amazon-redshift
+    """
+
+    conn_name_attr = 'redshift_conn_id'
+    default_conn_name = 'redshift_default'
+    conn_type = 'redshift+redshift_connector'
+    hook_name = 'Amazon Redshift'
+    supports_autocommit = True
+
+    @staticmethod
+    def get_ui_field_behavior() -> Dict:
+        """Returns custom field behavior"""
+        return {
+            "hidden_fields": [],
+            "relabeling": {'login': 'User', 'schema': 'Database'},
+        }
+
+    @cached_property
+    def conn(self):
+        return self.get_connection(self.redshift_conn_id)  # type: 
ignore[attr-defined]
+
+    def _get_conn_params(self) -> Dict[str, Union[str, int]]:
+        """Helper method to retrieve connection args"""
+        conn = self.conn
+
+        conn_params: Dict[str, Union[str, int]] = {}
+
+        if conn.login:
+            conn_params['user'] = conn.login
+        if conn.password:
+            conn_params['password'] = conn.password
+        if conn.host:
+            conn_params['host'] = conn.host
+        if conn.port:
+            conn_params['port'] = conn.port
+        if conn.schema:
+            conn_params['database'] = conn.schema
+
+        return conn_params
+
+    def get_uri(self) -> str:
+        """
+        Override DbApiHook get_uri method for get_sqlalchemy_engine()
+
+        .. note::
+            Value passed to connection extra parameter will be excluded
+            from returned uri but passed to get_sqlalchemy_engine()
+            by default
+        """
+        from sqlalchemy.engine.url import URL
+
+        conn_params = self._get_conn_params()
+
+        conn = self.conn
+
+        conn_type = conn.conn_type or RedshiftSQLHook.conn_type
+
+        if 'user' in conn_params:
+            conn_params['username'] = conn_params.pop('user')

Review comment:
       > AFAIK, the only realon there is a get_uri method in hooks is for 
sqlalchemy
   
   In this case, the approach I took in 1b22d59 with `_get_sqlalchemy_uri` 
doesn't make sense.
   I thought I had seen `get_uri` mentioned for some other use in the docs 
somewhere but after taking another look I see that I was mistaken.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to