Brooke-white commented on a change in pull request #18447:
URL: https://github.com/apache/airflow/pull/18447#discussion_r720403036
##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -126,3 +135,101 @@ def create_cluster_snapshot(self, snapshot_identifier:
str, cluster_identifier:
ClusterIdentifier=cluster_identifier,
)
return response['Snapshot'] if response['Snapshot'] else None
+
+
+class RedshiftSQLHook(DbApiHook):
+ """
+ Execute statements against Amazon Redshift, using redshift_connector
+
+ This hook requires the redshift_conn_id connection. This connection must
+ be initialized with the host, port, login, password. Additional connection
+ options can be passed to extra as a JSON string.
+
+ :param redshift_conn_id: reference to
+ :ref:`Amazon Redshift connection id<howto/connection:redshift>`
+ :type redshift_conn_id: str
+
+ .. note::
+ get_sqlalchemy_engine() and get_uri() depend on
sqlalchemy-amazon-redshift
+ """
+
+ conn_name_attr = 'redshift_conn_id'
+ default_conn_name = 'redshift_default'
+ conn_type = 'redshift+redshift_connector'
+ hook_name = 'Amazon Redshift'
+ supports_autocommit = True
+
+ @staticmethod
+ def get_ui_field_behavior() -> Dict:
+ """Returns custom field behavior"""
+ return {
+ "hidden_fields": [],
+ "relabeling": {'login': 'User', 'schema': 'Database'},
+ }
+
+ @cached_property
+ def conn(self):
+ return self.get_connection(self.redshift_conn_id) # type:
ignore[attr-defined]
+
+ def _get_conn_params(self) -> Dict[str, Union[str, int]]:
+ """Helper method to retrieve connection args"""
+ conn = self.conn
+
+ conn_params: Dict[str, Union[str, int]] = {}
+
+ if conn.login:
+ conn_params['user'] = conn.login
+ if conn.password:
+ conn_params['password'] = conn.password
+ if conn.host:
+ conn_params['host'] = conn.host
+ if conn.port:
+ conn_params['port'] = conn.port
+ if conn.schema:
+ conn_params['database'] = conn.schema
+
+ return conn_params
+
+ def get_uri(self) -> str:
+ """
+ Override DbApiHook get_uri method for get_sqlalchemy_engine()
+
+ .. note::
+ Value passed to connection extra parameter will be excluded
+ from returned uri but passed to get_sqlalchemy_engine()
+ by default
+ """
+ from sqlalchemy.engine.url import URL
+
+ conn_params = self._get_conn_params()
+
+ conn = self.conn
+
+ conn_type = conn.conn_type or RedshiftSQLHook.conn_type
+
+ if 'user' in conn_params:
+ conn_params['username'] = conn_params.pop('user')
Review comment:
The reason for this approach is that the method that builds the URL
expects `username` while the DB-API driver expects `user`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]