syedahsn commented on code in PR #32355:
URL: https://github.com/apache/airflow/pull/32355#discussion_r1254989940
##########
tests/providers/amazon/aws/triggers/test_eks.py:
##########
@@ -453,3 +455,403 @@ async def
test_eks_nodegroup_trigger_run_attempts_failed(self, mock_async_conn,
assert "Error checking nodegroup" in str(exc.value)
assert a_mock.get_waiter().wait.call_count == 3
+
+
+class TestEksCreateClusterTrigger:
+ def test_eks_create_cluster_trigger_serialize(self):
+ eks_create_cluster_trigger = EksCreateClusterTrigger(
+ waiter_name="test_waiter_name",
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ waiter_delay=TEST_WAITER_DELAY,
+ waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+ region=TEST_REGION,
+ )
+
+ class_path, args = eks_create_cluster_trigger.serialize()
+ assert class_path ==
"airflow.providers.amazon.aws.triggers.eks.EksCreateClusterTrigger"
+ assert args["waiter_name"] == "test_waiter_name"
+ assert args["cluster_name"] == TEST_CLUSTER_IDENTIFIER
+ assert args["aws_conn_id"] == TEST_AWS_CONN_ID
+ assert args["waiter_delay"] == str(TEST_WAITER_DELAY)
+ assert args["waiter_max_attempts"] == str(TEST_WAITER_MAX_ATTEMPTS)
+ assert args["region"] == TEST_REGION
+
+ @pytest.mark.asyncio
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_cluster_trigger_run(self, mock_async_conn):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+
+ a_mock.get_waiter().wait = AsyncMock()
+
+ eks_create_cluster_trigger = EksCreateClusterTrigger(
+ waiter_name="test_waiter_name",
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ waiter_delay=TEST_WAITER_DELAY,
+ waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+ region=TEST_REGION,
+ )
+
+ generator = eks_create_cluster_trigger.run()
+ response = await generator.asend(None)
+
+ assert response == TriggerEvent(
+ {
+ "status": "success",
+ }
+ )
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_cluster_trigger_run_multiple_attempts(self,
mock_async_conn, mock_sleep):
+ mock_sleep.return_value = True
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"cluster": {"status": "CREATING"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error, error, error,
True])
+
+ eks_create_cluster_trigger = EksCreateClusterTrigger(
+ waiter_name="test_waiter_name",
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ waiter_delay=TEST_WAITER_DELAY,
+ waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+ region=TEST_REGION,
+ )
+
+ generator = eks_create_cluster_trigger.run()
+ response = await generator.asend(None)
+ assert a_mock.get_waiter().wait.call_count == 4
+ assert response == TriggerEvent(
+ {
+ "status": "success",
+ }
+ )
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_cluster_trigger_run_attempts_exceeded(self,
mock_async_conn, mock_sleep):
+ mock_sleep.return_value = True
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"cluster": {"status": "CREATING"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error, error, error,
True])
+
+ eks_create_cluster_trigger = EksCreateClusterTrigger(
+ waiter_name="test_waiter_name",
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ waiter_delay=TEST_WAITER_DELAY,
+ waiter_max_attempts=2,
+ region=TEST_REGION,
+ )
+
+ with pytest.raises(AirflowException) as exc:
+ generator = eks_create_cluster_trigger.run()
+ await generator.asend(None)
+ assert "Waiter error: max attempts reached" in str(exc)
+ assert a_mock.get_waiter().wait.call_count == 2
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_cluster_trigger_run_attempts_failed(self,
mock_async_conn, mock_sleep):
+ mock_sleep.return_value = True
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error_creating = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"cluster": {"status": "CREATING"}},
+ )
+ error_failed = WaiterError(
+ name="test_name",
+ reason="Waiter encountered a terminal failure state:",
+ last_response={"cluster": {"status": "FAILED"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error_creating,
error_creating, error_failed])
+
+ AirflowException("Error checking Eks cluster")
+
+ eks_create_cluster_trigger = EksCreateClusterTrigger(
+ waiter_name="test_waiter_name",
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ waiter_delay=TEST_WAITER_DELAY,
+ waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+ region=TEST_REGION,
+ )
+
+ generator = eks_create_cluster_trigger.run()
+ await generator.asend(None)
+
+ # assert response == TriggerEvent({"status": "failed", "exception":
exc})
+ # from pdb import set_trace; set_trace()
Review Comment:
oops :sweat_smile:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]