feluelle commented on a change in pull request #10655:
URL: https://github.com/apache/airflow/pull/10655#discussion_r485420985



##########
File path: airflow/providers/amazon/aws/hooks/secrets_manager.py
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import base64
+import json
+from typing import Union, Optional
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class SecretsManagerHook(AwsBaseHook):
+    """
+    Interact with Amazon SecretsManager Service.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. see also::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(client_type='secretsmanager', *args, **kwargs)
+
+    def get_secret(self, secret_name: str) -> Union[str, bytes]:
+        """
+        Retrieve secret value from AWS Secrets Manager as a str or bytes
+        reflecting format it stored in the AWS Secrets Manager
+        :param secret_name: name of the secrets.

Review comment:
       ```suggestion
           reflecting format it stored in the AWS Secrets Manager
   
           :param secret_name: the secret id in the AWS Secrets Manager.
   ```

##########
File path: tests/providers/amazon/aws/hooks/test_secrets_manager.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import base64
+import json
+
+from airflow.providers.amazon.aws.hooks.secrets_manager import 
SecretsManagerHook
+
+try:
+    from moto import mock_secretsmanager
+except ImportError:
+    mock_secretsmanager = None
+
+
+class TestSecretsManagerHook(unittest.TestCase):
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+        self.assertIsNotNone(hook.get_conn())
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_string(self):
+        secret_name = 
"arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
+        secret_value = '{"user": "test"}'
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+
+        param = {
+            'SecretId': secret_name,
+            'SecretString': secret_value,
+        }
+
+        hook.get_conn().put_secret_value(**param)
+
+        secrets = hook.get_secret(secret_name)
+        self.assertEqual(secrets, secret_value)

Review comment:
       ```suggestion
           secret = hook.get_secret(secret_name)
           self.assertEqual(secret, secret_value)
   ```

##########
File path: tests/providers/amazon/aws/hooks/test_secrets_manager.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import base64
+import json
+
+from airflow.providers.amazon.aws.hooks.secrets_manager import 
SecretsManagerHook
+
+try:
+    from moto import mock_secretsmanager
+except ImportError:
+    mock_secretsmanager = None
+
+
+class TestSecretsManagerHook(unittest.TestCase):
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+        self.assertIsNotNone(hook.get_conn())
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_string(self):
+        secret_name = 
"arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
+        secret_value = '{"user": "test"}'
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+
+        param = {
+            'SecretId': secret_name,
+            'SecretString': secret_value,
+        }
+
+        hook.get_conn().put_secret_value(**param)
+
+        secrets = hook.get_secret(secret_name)
+        self.assertEqual(secrets, secret_value)
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_dict(self):

Review comment:
       ```suggestion
       def test_get_secret_dict(self):
   ```

##########
File path: tests/providers/amazon/aws/hooks/test_secrets_manager.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import base64
+import json
+
+from airflow.providers.amazon.aws.hooks.secrets_manager import 
SecretsManagerHook
+
+try:
+    from moto import mock_secretsmanager
+except ImportError:
+    mock_secretsmanager = None
+
+
+class TestSecretsManagerHook(unittest.TestCase):
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+        self.assertIsNotNone(hook.get_conn())
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_string(self):
+        secret_name = 
"arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
+        secret_value = '{"user": "test"}'
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+
+        param = {
+            'SecretId': secret_name,
+            'SecretString': secret_value,
+        }
+
+        hook.get_conn().put_secret_value(**param)
+
+        secrets = hook.get_secret(secret_name)
+        self.assertEqual(secrets, secret_value)
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_dict(self):
+        secret_name = 
"arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
+        secret_value = '{"user": "test"}'
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+
+        param = {
+            'SecretId': secret_name,
+            'SecretString': secret_value,
+        }
+
+        hook.get_conn().put_secret_value(**param)
+
+        secrets = hook.get_secret_as_dict(secret_name)
+        self.assertEqual(secrets, json.loads(secret_value))

Review comment:
       ```suggestion
           secret = hook.get_secret_as_dict(secret_name)
           self.assertEqual(secret, json.loads(secret_value))
   ```

##########
File path: airflow/providers/amazon/aws/hooks/secrets_manager.py
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import base64
+import json
+from typing import Union, Optional
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class SecretsManagerHook(AwsBaseHook):
+    """
+    Interact with Amazon SecretsManager Service.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. see also::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(client_type='secretsmanager', *args, **kwargs)
+
+    def get_secret(self, secret_name: str) -> Union[str, bytes]:
+        """
+        Retrieve secret value from AWS Secrets Manager as a str or bytes
+        reflecting format it stored in the AWS Secrets Manager
+        :param secret_name: name of the secrets.
+        :type secret_name: str
+        :return: Union[str, bytes] with the information about the secrets
+        :rtype: Union[str, bytes]
+        """
+        # Depending on whether the secret is a string or binary, one of
+        # these fields will be populated.
+        get_secret_value_response = 
self.get_conn().get_secret_value(SecretId=secret_name)
+        if 'SecretString' in get_secret_value_response:
+            secret = get_secret_value_response['SecretString']
+        else:
+            secret = 
base64.b64decode(get_secret_value_response['SecretBinary'])
+        return secret
+
+    def get_secret_as_dict(self, secret_name: str) -> Optional[dict]:
+        """
+        Retrieve secret value from AWS Secrets Manager in a dict representation
+        :param secret_name: name of the secrets.

Review comment:
       ```suggestion
           Retrieve secret value from AWS Secrets Manager in a dict 
representation
   
           :param secret_name: the secret id in the AWS Secrets Manager.
   ```

##########
File path: tests/providers/amazon/aws/hooks/test_secrets_manager.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import base64
+import json
+
+from airflow.providers.amazon.aws.hooks.secrets_manager import 
SecretsManagerHook
+
+try:
+    from moto import mock_secretsmanager
+except ImportError:
+    mock_secretsmanager = None
+
+
+class TestSecretsManagerHook(unittest.TestCase):
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+        self.assertIsNotNone(hook.get_conn())
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_string(self):

Review comment:
       ```suggestion
       def test_get_secret_string(self):
   ```

##########
File path: airflow/providers/amazon/aws/hooks/secrets_manager.py
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import base64
+import json
+from typing import Union, Optional
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class SecretsManagerHook(AwsBaseHook):
+    """
+    Interact with Amazon SecretsManager Service.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. see also::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(client_type='secretsmanager', *args, **kwargs)
+
+    def get_secret(self, secret_name: str) -> Union[str, bytes]:
+        """
+        Retrieve secret value from AWS Secrets Manager as a str or bytes
+        reflecting format it stored in the AWS Secrets Manager
+        :param secret_name: name of the secrets.
+        :type secret_name: str
+        :return: Union[str, bytes] with the information about the secrets
+        :rtype: Union[str, bytes]
+        """
+        # Depending on whether the secret is a string or binary, one of
+        # these fields will be populated.
+        get_secret_value_response = 
self.get_conn().get_secret_value(SecretId=secret_name)
+        if 'SecretString' in get_secret_value_response:
+            secret = get_secret_value_response['SecretString']
+        else:
+            secret = 
base64.b64decode(get_secret_value_response['SecretBinary'])
+        return secret
+
+    def get_secret_as_dict(self, secret_name: str) -> Optional[dict]:
+        """
+        Retrieve secret value from AWS Secrets Manager in a dict representation
+        :param secret_name: name of the secrets.
+        :type secret_name: str
+        :return: dict with the information about the secrets
+        :rtype: dict
+        """
+        # Depending on whether the secret is a string or binary, one of
+        # these fields will be populated.

Review comment:
       ```suggestion
   ```

##########
File path: tests/providers/amazon/aws/hooks/test_secrets_manager.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import base64
+import json
+
+from airflow.providers.amazon.aws.hooks.secrets_manager import 
SecretsManagerHook
+
+try:
+    from moto import mock_secretsmanager
+except ImportError:
+    mock_secretsmanager = None
+
+
+class TestSecretsManagerHook(unittest.TestCase):
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+        self.assertIsNotNone(hook.get_conn())
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_string(self):
+        secret_name = 
"arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
+        secret_value = '{"user": "test"}'
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+
+        param = {
+            'SecretId': secret_name,
+            'SecretString': secret_value,
+        }
+
+        hook.get_conn().put_secret_value(**param)
+
+        secrets = hook.get_secret(secret_name)
+        self.assertEqual(secrets, secret_value)
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_dict(self):
+        secret_name = 
"arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
+        secret_value = '{"user": "test"}'
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+
+        param = {
+            'SecretId': secret_name,
+            'SecretString': secret_value,
+        }
+
+        hook.get_conn().put_secret_value(**param)
+
+        secrets = hook.get_secret_as_dict(secret_name)
+        self.assertEqual(secrets, json.loads(secret_value))
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_binary(self):

Review comment:
       ```suggestion
       def test_get_secret_binary(self):
   ```

##########
File path: tests/providers/amazon/aws/hooks/test_secrets_manager.py
##########
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import base64
+import json
+
+from airflow.providers.amazon.aws.hooks.secrets_manager import 
SecretsManagerHook
+
+try:
+    from moto import mock_secretsmanager
+except ImportError:
+    mock_secretsmanager = None
+
+
+class TestSecretsManagerHook(unittest.TestCase):
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+        self.assertIsNotNone(hook.get_conn())
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_string(self):
+        secret_name = 
"arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
+        secret_value = '{"user": "test"}'
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+
+        param = {
+            'SecretId': secret_name,
+            'SecretString': secret_value,
+        }
+
+        hook.get_conn().put_secret_value(**param)
+
+        secrets = hook.get_secret(secret_name)
+        self.assertEqual(secrets, secret_value)
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_dict(self):
+        secret_name = 
"arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
+        secret_value = '{"user": "test"}'
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+
+        param = {
+            'SecretId': secret_name,
+            'SecretString': secret_value,
+        }
+
+        hook.get_conn().put_secret_value(**param)
+
+        secrets = hook.get_secret_as_dict(secret_name)
+        self.assertEqual(secrets, json.loads(secret_value))
+
+    @unittest.skipIf(mock_secretsmanager is None, 'mock_secretsmanager package 
not present')
+    @mock_secretsmanager
+    def test_get_secrets_binary(self):
+        secret_name = 
"arn:aws:secretsmanager:us-east-2:999999999999:secret:db_cluster-YYYYYYY"
+        secret_value_binary = base64.b64encode(b'{"username": "test"}')
+        hook = SecretsManagerHook(aws_conn_id='aws_default')
+
+        param = {
+            'SecretId': secret_name,
+            'SecretBinary': secret_value_binary,
+        }
+
+        hook.get_conn().put_secret_value(**param)
+
+        secrets = hook.get_secret(secret_name)
+        self.assertEqual(secrets, base64.b64decode(secret_value_binary))

Review comment:
       ```suggestion
           secret = hook.get_secret(secret_name)
           self.assertEqual(secret, base64.b64decode(secret_value_binary))
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to