feluelle commented on a change in pull request #8701:
URL: https://github.com/apache/airflow/pull/8701#discussion_r472055480



##########
File path: 
tests/providers/amazon/aws/hooks/test_elasticache_replication_group.py
##########
@@ -0,0 +1,291 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+from unittest.mock import Mock
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.elasticache_replication_group import 
ElastiCacheReplicationGroupHook
+
+
+class TestElastiCacheReplicationGroupHook(TestCase):
+    REPLICATION_GROUP_ID = "test-elasticache-replication-group-hook"
+
+    REPLICATION_GROUP_CONFIG = {
+        'ReplicationGroupId': REPLICATION_GROUP_ID,
+        'ReplicationGroupDescription': REPLICATION_GROUP_ID,
+        'AutomaticFailoverEnabled': False,
+        'NumCacheClusters': 1,
+        'CacheNodeType': 'cache.m5.large',
+        'Engine': 'redis',
+        'EngineVersion': '5.0.4',
+        'CacheParameterGroupName': 'default.redis5.0'
+    }
+
+    VALID_STATES = frozenset({
+        'creating', 'available', 'modifying', 'deleting', 'create - failed', 
'snapshotting'
+    })
+
+    # Track calls to describe when deleting replication group
+    # First call will return status as `available` and we will initiate delete
+    # Second call with return status as `deleting`
+    # Subsequent call will raise ReplicationGroupNotFoundFault exception
+    describe_call_count_for_delete = 0
+
+    def setUp(self):
+        self.hook = ElastiCacheReplicationGroupHook()
+        # noinspection PyPropertyAccess
+        self.hook.conn = Mock()
+
+        # We need this for every test
+        self.hook.conn.create_replication_group.return_value = {
+            "ReplicationGroup": {
+                "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                "Status": "creating"
+            }
+        }
+
+    def _create_replication_group(self):
+        return 
self.hook.create_replication_group(config=self.REPLICATION_GROUP_CONFIG)
+
+    def test_conn_not_none(self):
+        assert self.hook.conn is not None
+
+    def test_create_replication_group(self):
+        response = self._create_replication_group()
+        assert response["ReplicationGroup"]["ReplicationGroupId"] == 
self.REPLICATION_GROUP_ID
+        assert response["ReplicationGroup"]["Status"] == "creating"
+
+    def test_describe_replication_group(self):
+        self._create_replication_group()
+
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID
+                }
+            ]
+        }
+
+        response = 
self.hook.describe_replication_group(replication_group_id=self.REPLICATION_GROUP_ID)
+        assert response["ReplicationGroups"][0]["ReplicationGroupId"] == 
self.REPLICATION_GROUP_ID
+
+    def test_get_replication_group_status(self):
+        self._create_replication_group()
+
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                    "Status": "available"
+                }
+            ]
+        }
+
+        response = 
self.hook.get_replication_group_status(replication_group_id=self.REPLICATION_GROUP_ID)
+        assert response in self.VALID_STATES
+
+    def test_is_replication_group_available(self):
+        self._create_replication_group()
+
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                    "Status": "available"
+                }
+            ]
+        }
+
+        response = 
self.hook.is_replication_group_available(replication_group_id=self.REPLICATION_GROUP_ID)
+        assert response in (True, False)
+
+    def test_wait_for_availability(self):
+        self._create_replication_group()
+
+        # Test non availability
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                    "Status": "creating"
+                }
+            ]
+        }
+
+        response = self.hook.wait_for_availability(
+            replication_group_id=self.REPLICATION_GROUP_ID,
+            max_retries=1,
+            initial_sleep_time=1,  # seconds
+        )
+        assert response is False
+
+        # Test availability
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                    "Status": "available"
+                }
+            ]
+        }
+
+        response = self.hook.wait_for_availability(
+            replication_group_id=self.REPLICATION_GROUP_ID,
+            max_retries=1,
+            initial_sleep_time=1,  # seconds
+        )
+        assert response is True
+
+    def test_delete_replication_group(self):
+        self._create_replication_group()
+
+        self.hook.conn.delete_replication_group.return_value = {
+            "ReplicationGroup": {
+                "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                "Status": "deleting"
+            }
+        }
+
+        # Wait for availability, can only delete when replication group is 
available
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                    "Status": "available"
+                }
+            ]
+        }
+
+        response = self.hook.wait_for_availability(
+            replication_group_id=self.REPLICATION_GROUP_ID,
+            max_retries=1,
+            initial_sleep_time=1,  # seconds
+        )
+        assert response is True
+
+        response = 
self.hook.delete_replication_group(replication_group_id=self.REPLICATION_GROUP_ID)
+        assert response["ReplicationGroup"]["ReplicationGroupId"] == 
self.REPLICATION_GROUP_ID
+        assert response["ReplicationGroup"]["Status"] == "deleting"
+
+    # noinspection PyUnusedLocal
+    def _mock_describe_side_effect(self, *args, **kwargs):
+        """
+        Mock describe calls to replication group for testing delete calls
+        """
+        # On first call replication group is in available state, this will 
allow to initiate a delete
+        # A replication group can only be deleted when it is in `available` 
state
+        if self.describe_call_count_for_delete == 0:
+            self.describe_call_count_for_delete = 
self.describe_call_count_for_delete + 1
+
+            return {
+                "ReplicationGroups": [
+                    {
+                        "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                        "Status": "available"
+                    }
+                ]
+            }
+
+        # On second call replication group is in deleting state
+        if self.describe_call_count_for_delete == 1:
+            self.describe_call_count_for_delete = 
self.describe_call_count_for_delete + 1
+
+            return {
+                "ReplicationGroups": [
+                    {
+                        "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                        "Status": "deleting"
+                    }
+                ]
+            }
+
+        # On further calls we will assume the replication group is deleted
+        class MockReplicationGroupNotFoundFault(BaseException):
+            pass
+
+        self.hook.conn.exceptions.ReplicationGroupNotFoundFault = 
MockReplicationGroupNotFoundFault
+
+        raise self.hook.conn.exceptions.ReplicationGroupNotFoundFault

Review comment:
       You don't need to save `describe_call_count_for_delete`. `side_effect` 
will do that for you.
   
   Example from 
https://docs.python.org/3/library/unittest.mock.html#quick-guide:
   ```python
   >>> mock.side_effect = [5, 4, 3, 2, 1]
   >>> mock(), mock(), mock()
   (5, 4, 3)
   ```

##########
File path: airflow/providers/amazon/aws/hooks/elasticache_replication_group.py
##########
@@ -0,0 +1,277 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class ElastiCacheReplicationGroupHook(AwsBaseHook):
+    """
+    Interact with AWS ElastiCache
+    """
+
+    TERMINAL_STATES = frozenset({"available", "create-failed", "deleting"})
+
+    def __init__(
+        self, max_retries=10, exponential_back_off_factor=1, 
initial_poke_interval=60, *args, **kwargs
+    ):
+        """
+        :param max_retries: Max retries for checking availability of and 
deleting replication group
+        :type max_retries: int
+        :param exponential_back_off_factor: Factor for deciding next sleep time
+        :type exponential_back_off_factor: float
+        :param initial_poke_interval: Initial sleep time in seconds
+        :type initial_poke_interval: float
+        """

Review comment:
       Please move this to the `ElastiCacheReplicationGroupHook` class doc 
string.

##########
File path: 
tests/providers/amazon/aws/hooks/test_elasticache_replication_group.py
##########
@@ -0,0 +1,291 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+from unittest.mock import Mock
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.elasticache_replication_group import 
ElastiCacheReplicationGroupHook
+
+
+class TestElastiCacheReplicationGroupHook(TestCase):
+    REPLICATION_GROUP_ID = "test-elasticache-replication-group-hook"
+
+    REPLICATION_GROUP_CONFIG = {
+        'ReplicationGroupId': REPLICATION_GROUP_ID,
+        'ReplicationGroupDescription': REPLICATION_GROUP_ID,
+        'AutomaticFailoverEnabled': False,
+        'NumCacheClusters': 1,
+        'CacheNodeType': 'cache.m5.large',
+        'Engine': 'redis',
+        'EngineVersion': '5.0.4',
+        'CacheParameterGroupName': 'default.redis5.0'
+    }
+
+    VALID_STATES = frozenset({
+        'creating', 'available', 'modifying', 'deleting', 'create - failed', 
'snapshotting'
+    })
+
+    # Track calls to describe when deleting replication group
+    # First call will return status as `available` and we will initiate delete
+    # Second call with return status as `deleting`
+    # Subsequent call will raise ReplicationGroupNotFoundFault exception
+    describe_call_count_for_delete = 0
+
+    def setUp(self):
+        self.hook = ElastiCacheReplicationGroupHook()
+        # noinspection PyPropertyAccess
+        self.hook.conn = Mock()
+
+        # We need this for every test
+        self.hook.conn.create_replication_group.return_value = {
+            "ReplicationGroup": {
+                "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                "Status": "creating"
+            }
+        }
+
+    def _create_replication_group(self):
+        return 
self.hook.create_replication_group(config=self.REPLICATION_GROUP_CONFIG)
+
+    def test_conn_not_none(self):
+        assert self.hook.conn is not None
+
+    def test_create_replication_group(self):
+        response = self._create_replication_group()
+        assert response["ReplicationGroup"]["ReplicationGroupId"] == 
self.REPLICATION_GROUP_ID
+        assert response["ReplicationGroup"]["Status"] == "creating"
+
+    def test_describe_replication_group(self):
+        self._create_replication_group()
+
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID
+                }
+            ]
+        }
+
+        response = 
self.hook.describe_replication_group(replication_group_id=self.REPLICATION_GROUP_ID)
+        assert response["ReplicationGroups"][0]["ReplicationGroupId"] == 
self.REPLICATION_GROUP_ID
+
+    def test_get_replication_group_status(self):
+        self._create_replication_group()
+
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                    "Status": "available"
+                }
+            ]
+        }
+
+        response = 
self.hook.get_replication_group_status(replication_group_id=self.REPLICATION_GROUP_ID)
+        assert response in self.VALID_STATES
+
+    def test_is_replication_group_available(self):
+        self._create_replication_group()
+
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                    "Status": "available"
+                }
+            ]
+        }
+
+        response = 
self.hook.is_replication_group_available(replication_group_id=self.REPLICATION_GROUP_ID)
+        assert response in (True, False)
+
+    def test_wait_for_availability(self):
+        self._create_replication_group()
+
+        # Test non availability
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                    "Status": "creating"
+                }
+            ]
+        }
+
+        response = self.hook.wait_for_availability(
+            replication_group_id=self.REPLICATION_GROUP_ID,
+            max_retries=1,
+            initial_sleep_time=1,  # seconds
+        )
+        assert response is False
+
+        # Test availability
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                    "Status": "available"
+                }
+            ]
+        }
+
+        response = self.hook.wait_for_availability(
+            replication_group_id=self.REPLICATION_GROUP_ID,
+            max_retries=1,
+            initial_sleep_time=1,  # seconds
+        )
+        assert response is True
+
+    def test_delete_replication_group(self):
+        self._create_replication_group()
+
+        self.hook.conn.delete_replication_group.return_value = {
+            "ReplicationGroup": {
+                "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                "Status": "deleting"
+            }
+        }
+
+        # Wait for availability, can only delete when replication group is 
available
+        self.hook.conn.describe_replication_groups.return_value = {
+            "ReplicationGroups": [
+                {
+                    "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                    "Status": "available"
+                }
+            ]
+        }
+
+        response = self.hook.wait_for_availability(
+            replication_group_id=self.REPLICATION_GROUP_ID,
+            max_retries=1,
+            initial_sleep_time=1,  # seconds
+        )
+        assert response is True
+
+        response = 
self.hook.delete_replication_group(replication_group_id=self.REPLICATION_GROUP_ID)
+        assert response["ReplicationGroup"]["ReplicationGroupId"] == 
self.REPLICATION_GROUP_ID
+        assert response["ReplicationGroup"]["Status"] == "deleting"
+
+    # noinspection PyUnusedLocal
+    def _mock_describe_side_effect(self, *args, **kwargs):
+        """
+        Mock describe calls to replication group for testing delete calls
+        """
+        # On first call replication group is in available state, this will 
allow to initiate a delete
+        # A replication group can only be deleted when it is in `available` 
state
+        if self.describe_call_count_for_delete == 0:
+            self.describe_call_count_for_delete = 
self.describe_call_count_for_delete + 1
+
+            return {
+                "ReplicationGroups": [
+                    {
+                        "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                        "Status": "available"
+                    }
+                ]
+            }
+
+        # On second call replication group is in deleting state
+        if self.describe_call_count_for_delete == 1:
+            self.describe_call_count_for_delete = 
self.describe_call_count_for_delete + 1
+
+            return {
+                "ReplicationGroups": [
+                    {
+                        "ReplicationGroupId": self.REPLICATION_GROUP_ID,
+                        "Status": "deleting"
+                    }
+                ]
+            }
+
+        # On further calls we will assume the replication group is deleted
+        class MockReplicationGroupNotFoundFault(BaseException):
+            pass
+
+        self.hook.conn.exceptions.ReplicationGroupNotFoundFault = 
MockReplicationGroupNotFoundFault
+
+        raise self.hook.conn.exceptions.ReplicationGroupNotFoundFault

Review comment:
       So you could do sth. like this: 
   ```python
   mock.side_effect = [{
       "ReplicationGroups": [{
           "ReplicationGroupId": self.REPLICATION_GROUP_ID,
           "Status": "available"
       }]
   }, {
       "ReplicationGroups": [{
           "ReplicationGroupId": self.REPLICATION_GROUP_ID,
           "Status": "deleting"
       }]
   }, self.hook.conn.exceptions.ReplicationGroupNotFoundFault()]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to