dstandish commented on a change in pull request #18447:
URL: https://github.com/apache/airflow/pull/18447#discussion_r722654639



##########
File path: airflow/providers/amazon/aws/operators/redshift.py
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import List, Optional, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook
+
+
+class RedshiftSQLOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:RedshiftSQLOperator`
+
+    :param sql: the sql code to be executed
+    :type sql: Can receive a str representing a sql statement,
+        a list of str (sql statements)

Review comment:
       ```suggestion
           or a list of str (sql statements)
   ```

##########
File path: docs/apache-airflow-providers-amazon/connections/redshift.rst
##########
@@ -0,0 +1,82 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection:redshift:
+
+Amazon Redshift Connection
+==========================
+
+The Redshift connection type enables integrations with Redshift.
+
+Authenticating to Amazon Redshift
+---------------------------------
+
+Authentication may be performed using any of the authentication methods 
supported by `redshift_connector 
<https://github.com/aws/amazon-redshift-python-driver>`_ such as via direct 
credentials, IAM authentication, or using an Identity Provider (IdP) plugin.
+
+Default Connection IDs
+-----------------------
+
+The default connection ID is ``redshift_default``.
+
+Configuring the Connection
+--------------------------
+
+
+User
+  Specify the username to use for authentication with Amazon Redshift.
+
+Password
+  Specify the password to use for authentication with Amazon Redshift.
+
+Host
+  Specify the Amazon Redshift hostname.
+
+Database
+  Specify the Amazon Redshift database name.
+
+Extra
+    Specify the extra parameters (as json dictionary) that can be used in
+    Amazon Redshift connection. For a complete list of supported parameters
+    please see the `documentation 
<https://github.com/aws/amazon-redshift-python-driver#connection-parameters>`_
+    for redshift_connector.
+
+
+When specifying the connection in environment variable you should specify
+it using URI syntax.
+
+Note that all components of the URI should be URL-encoded.
+
+Examples
+--------
+
+Database Authentication
+
+.. code-block:: bash
+
+  
AIRFLOW_CONN_REDSHIFT_DEFAULT=redshift://awsuser:passw...@redshift-cluster-1.123456789.us-west-1.redshift.amazonaws.com:5439/?database=dev&ssl=True
+
+IAM Authentication using AWS Profile
+
+.. code-block:: bash
+
+  
AIRFLOW_CONN_REDSHIFT_DEFAULT=redshift://:@:/?database=dev&iam=True&db_user=awsuser&cluster_identifier=redshift-cluster-1&profile=default

Review comment:
       Ok so dealing with airflow URIs is a bit tricky.
   
   And in this case I think there's a small problem.
   
   Look at the handling of the `iam` parameter:
   
   ```python
   >>> from airflow.models.connection import Connection
   >>> c = 
Connection(uri='redshift://:@:/?database=dev&iam=True&db_user=awsuser&cluster_identifier=redshift-cluster-1&profile=default')
   >>> c.extra_dejson
   {'database': 'dev', 'iam': 'True', 'db_user': 'awsuser', 
'cluster_identifier': 'redshift-cluster-1', 'profile': 'default'}
   ```
   
   And if redshift connector gets `iam='True'` instead of `iam=True` it won't 
work.
   
   While we _could_ implement logic to handle this in the hook, we don't need 
to because there's a way to produce the URI such that we avoid this issue.
   
   We have a `get_uri` method on `Connection` that produces the URI from a 
connection object.  And when doing the standard URI encoding will lose fidelity 
(e.g. bool converted to string on reparsing) then it will use the alternative 
representation of extra:
   ```python
   c = Connection(conn_type='redshift', extra=json.dumps({"database":"dev", 
"iam":True,"db_user":"awsuser", "cluster_identifier":"redshift-cluster-1", 
"profile":"default"}))
   c.get_uri()
   
'redshift://?__extra__=%7B%22database%22%3A+%22dev%22%2C+%22iam%22%3A+true%2C+%22db_user%22%3A+%22awsuser%22%2C+%22cluster_identifier%22%3A+%22redshift-cluster-1%22%2C+%22profile%22%3A+%22default%22%7D'
   ```
   
   It's ugly but it works.
   
   So bottom line I think here it would be to just show examples of how to 
define the connections using `Connection` instead of URI format.  It's the 
easiest way to produce the correct URI.  And you could also show the generated 
URI and how you produced it, or just point to the "generating an airflow URI" 
section in the corehowto / "managing connections" doc, where this is covered in 
detail.
   
   ---
   
   Side note I hope to implement support for json serialization broadly (i.e. 
as an alternative to airflow URI) along the lines done [here with 
SSM](https://github.com/apache/airflow/pull/18692), which will make this a 
little less painful.
   
   

##########
File path: airflow/providers/amazon/aws/operators/redshift.py
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import List, Optional, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook
+
+
+class RedshiftSQLOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:RedshiftSQLOperator`
+
+    :param sql: the sql code to be executed
+    :type sql: Can receive a str representing a sql statement,
+        a list of str (sql statements)
+    :param redshift_conn_id: reference to
+        :ref:`Amazon Redshift connection id<howto/connection:redshift>`
+    :type redshift_conn_id: str
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :type parameters: dict or iterable
+    :param autocommit: if True, each command is automatically committed.
+        (default value: False)
+    :type autocommit: bool
+    """
+
+    template_fields = ('sql',)
+    template_ext = ('.sql',)
+
+    def __init__(
+        self,
+        *,
+        sql: Union[str, List[str]],
+        redshift_conn_id: str = 'redshift_default',
+        parameters: Optional[dict] = None,

Review comment:
       i don't think this type annotation is quite correct.  it might be 
`Optional[Union[Dict, Iterable]]`
   
   the point is that it could be a tuple or list, or a dict, as shown in the 
readme examples of the library.

##########
File path: tests/providers/amazon/aws/hooks/test_redshift.py
##########
@@ -103,3 +107,67 @@ def test_cluster_status_returns_available_cluster(self):
         hook = RedshiftHook(aws_conn_id='aws_default')
         status = hook.cluster_status('test_cluster')
         assert status == 'available'
+
+
+class TestRedshiftSQLHookConn(unittest.TestCase):
+    def setUp(self):
+        super().setUp()
+
+        self.connection = Connection(
+            conn_type='redshift', login='login', password='password', 
host='host', port=5439, schema="dev"
+        )
+
+        self.db_hook = RedshiftSQLHook()
+        self.db_hook.get_connection = mock.Mock()
+        self.db_hook.get_connection.return_value = self.connection
+
+    def test_get_uri(self):
+        expected = 'redshift+redshift_connector://login:password@host:5439/dev'
+        x = self.db_hook.get_uri()
+        assert x == expected
+
+    
@mock.patch('airflow.providers.amazon.aws.hooks.redshift.redshift_connector.connect')
+    def test_get_conn(self, mock_connect):
+        self.db_hook.get_conn()
+        mock_connect.assert_called_once_with(
+            user='login', password='password', host='host', port=5439, 
database='dev'
+        )
+
+    
@mock.patch('airflow.providers.amazon.aws.hooks.redshift.redshift_connector.connect')
+    def test_get_conn_extra(self, mock_connect):
+        self.connection.extra = json.dumps(
+            {
+                "iam": True,
+                "cluster_identifier": "my-test-cluster",
+                "profile": "default",
+            }
+        )
+        self.db_hook.get_conn()
+        mock_connect.assert_called_once_with(
+            user='login',
+            password='password',
+            host='host',
+            port=5439,
+            cluster_identifier="my-test-cluster",
+            profile="default",
+            database='dev',
+            iam=True,
+        )
+
+    @parameterized.expand(
+        [
+            ({}, {}, {}),
+            ({"login": "test"}, {}, {"user": "test"}),
+            ({}, {"user": "test"}, {"user": "test"}),
+            ({"login": "original"}, {"user": "overridden"}, {"user": 
"overridden"}),
+            ({"login": "test1"}, {"password": "test2"}, {"user": "test1", 
"password": "test2"}),
+        ],
+    )
+    
@mock.patch('airflow.providers.amazon.aws.hooks.redshift.redshift_connector.connect')
+    def test_get_conn_overrides_correctly(self, test_args, test_kwargs, 
expected_call_args, mock_connect):

Review comment:
       ```suggestion
       def test_get_conn_overrides_correctly(self, conn_params, conn_extra, 
expected_call_args, mock_connect):
   ```
   hard to name this perfectly but test_args and test_kwargs... they're all 
"keywords args" but mainly the diff here is `extra` vs other conn params.  And 
this just seems a tad more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to