This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 85aea74b64 Fix AWS Batch waiter failure state (#33656)
85aea74b64 is described below
commit 85aea74b647f978cd1e3c42e3a5f3bb068b56539
Author: Pavel Yermalovich <[email protected]>
AuthorDate: Wed Aug 23 13:21:21 2023 +0200
Fix AWS Batch waiter failure state (#33656)
* Fix AWS Batch waiter failure state
* Add tests for AWS Batch batch_job_complete waiter
---
airflow/providers/amazon/aws/waiters/batch.json | 6 +--
.../amazon/aws/waiters/test_custom_waiters.py | 56 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/amazon/aws/waiters/batch.json
b/airflow/providers/amazon/aws/waiters/batch.json
index 3fbdd43377..a8cd489ea3 100644
--- a/airflow/providers/amazon/aws/waiters/batch.json
+++ b/airflow/providers/amazon/aws/waiters/batch.json
@@ -17,7 +17,7 @@
"argument": "jobs[].status",
"expected": "FAILED",
"matcher": "pathAll",
- "state": "failed"
+ "state": "failure"
}
]
},
@@ -37,13 +37,13 @@
"argument": "computeEnvironments[].status",
"expected": "INVALID",
"matcher": "pathAny",
- "state": "failed"
+ "state": "failure"
},
{
"argument": "computeEnvironments[].status",
"expected": "DELETED",
"matcher": "pathAny",
- "state": "failed"
+ "state": "failure"
}
]
}
diff --git a/tests/providers/amazon/aws/waiters/test_custom_waiters.py
b/tests/providers/amazon/aws/waiters/test_custom_waiters.py
index 21c051f3b4..09b6742f7a 100644
--- a/tests/providers/amazon/aws/waiters/test_custom_waiters.py
+++ b/tests/providers/amazon/aws/waiters/test_custom_waiters.py
@@ -27,6 +27,7 @@ from botocore.waiter import WaiterModel
from moto import mock_eks
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.hooks.ecs import EcsClusterStates, EcsHook,
EcsTaskDefinitionStates
from airflow.providers.amazon.aws.hooks.eks import EksHook
@@ -295,3 +296,58 @@ class TestCustomDynamoDBServiceWaiters:
ExportArn="LoremIpsumissimplydummytextoftheprintingandtypesettingindustry",
WaiterConfig={"Delay": 0.01, "MaxAttempts": 3},
)
+
+
+class TestCustomBatchServiceWaiters:
+ """Test waiters from ``amazon/aws/waiters/batch.json``."""
+
+ JOB_ID = "test_job_id"
+
+ @pytest.fixture(autouse=True)
+ def setup_test_cases(self, monkeypatch):
+ self.client = boto3.client("batch", region_name="eu-west-3")
+ monkeypatch.setattr(BatchClientHook, "conn", self.client)
+
+ @pytest.fixture
+ def mock_describe_jobs(self):
+ """Mock ``BatchClientHook.Client.describe_jobs`` method."""
+ with mock.patch.object(self.client, "describe_jobs") as m:
+ yield m
+
+ def test_service_waiters(self):
+ hook_waiters = BatchClientHook(aws_conn_id=None).list_waiters()
+ assert "batch_job_complete" in hook_waiters
+
+ @staticmethod
+ def describe_jobs(status: str):
+ """
+ Helper function for generate minimal DescribeJobs response for a
single job.
+
https://docs.aws.amazon.com/batch/latest/APIReference/API_DescribeJobs.html
+ """
+ return {
+ "jobs": [
+ {
+ "status": status,
+ },
+ ],
+ }
+
+ def test_job_succeeded(self, mock_describe_jobs):
+ """Test job succeeded"""
+ mock_describe_jobs.side_effect = [
+ self.describe_jobs(BatchClientHook.RUNNING_STATE),
+ self.describe_jobs(BatchClientHook.SUCCESS_STATE),
+ ]
+ waiter =
BatchClientHook(aws_conn_id=None).get_waiter("batch_job_complete")
+ waiter.wait(jobs=[self.JOB_ID], WaiterConfig={"Delay": 0.01,
"MaxAttempts": 2})
+
+ def test_job_failed(self, mock_describe_jobs):
+ """Test job failed"""
+ mock_describe_jobs.side_effect = [
+ self.describe_jobs(BatchClientHook.RUNNING_STATE),
+ self.describe_jobs(BatchClientHook.FAILURE_STATE),
+ ]
+ waiter =
BatchClientHook(aws_conn_id=None).get_waiter("batch_job_complete")
+
+ with pytest.raises(WaiterError, match="Waiter encountered a terminal
failure state"):
+ waiter.wait(jobs=[self.JOB_ID], WaiterConfig={"Delay": 0.01,
"MaxAttempts": 2})