This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a998e28e68e Add skip_on_exit_code support to EcsRunTaskOperator 
(#63274)
a998e28e68e is described below

commit a998e28e68ec822af45d622094b6c60395644b77
Author: Yunho Jung <[email protected]>
AuthorDate: Wed Mar 11 02:09:18 2026 +0900

    Add skip_on_exit_code support to EcsRunTaskOperator (#63274)
    
    Allow users to specify exit codes that should raise an
    AirflowSkipException (marking the task as skipped) via the new
    `skip_on_exit_code` parameter. This is consistent with the existing
    behavior in DockerOperator and KubernetesPodOperator.
---
 .../airflow/providers/amazon/aws/operators/ecs.py  | 25 ++++++++++++---
 .../tests/unit/amazon/aws/operators/test_ecs.py    | 37 +++++++++++++++++++++-
 2 files changed, 57 insertions(+), 5 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
index 5e4a4b04f90..05f0102152d 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
@@ -18,7 +18,7 @@
 from __future__ import annotations
 
 import re
-from collections.abc import Sequence
+from collections.abc import Container, Sequence
 from datetime import timedelta
 from functools import cached_property
 from time import sleep
@@ -39,7 +39,7 @@ from airflow.providers.amazon.aws.utils import 
validate_execute_complete_event
 from airflow.providers.amazon.aws.utils.identifiers import generate_uuid
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 from airflow.providers.amazon.aws.utils.task_log_fetcher import 
AwsTaskLogFetcher
-from airflow.providers.common.compat.sdk import AirflowException, conf
+from airflow.providers.common.compat.sdk import AirflowException, 
AirflowSkipException, conf
 from airflow.utils.helpers import prune_dict
 
 if TYPE_CHECKING:
@@ -394,6 +394,9 @@ class EcsRunTaskOperator(EcsBaseOperator):
     :param deferrable: If True, the operator will wait asynchronously for the 
job to complete.
         This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
         (default: False)
+    :param skip_on_exit_code: If task exits with this exit code, leave the task
+        in ``skipped`` state (default: None). If set to ``None``, any non-zero
+        exit code will be treated as a failure. Can be an int or a container 
of ints.
     :param do_xcom_push: If True, the operator will push the ECS task ARN to 
XCom with key 'ecs_task_arn'.
         Additionally, if logs are fetched, the last log message will be pushed 
to XCom with the key 'return_value'. (default: False)
     :param stop_task_on_failure: If True, attempt to stop the ECS task if the 
Airflow task fails
@@ -461,6 +464,7 @@ class EcsRunTaskOperator(EcsBaseOperator):
         # Set the default waiter duration to 70 days (attempts*delay)
         # Airflow execution_timeout handles task timeout
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        skip_on_exit_code: int | Container[int] | None = None,
         stop_task_on_failure: bool = True,
         **kwargs,
     ):
@@ -500,6 +504,13 @@ class EcsRunTaskOperator(EcsBaseOperator):
         self.waiter_delay = waiter_delay
         self.waiter_max_attempts = waiter_max_attempts
         self.deferrable = deferrable
+        self.skip_on_exit_code = (
+            skip_on_exit_code
+            if isinstance(skip_on_exit_code, Container)
+            else [skip_on_exit_code]
+            if skip_on_exit_code is not None
+            else []
+        )
         self.stop_task_on_failure = stop_task_on_failure
 
         if self._aws_logs_enabled() and not self.wait_for_completion:
@@ -763,15 +774,21 @@ class EcsRunTaskOperator(EcsBaseOperator):
             containers = task["containers"]
             for container in containers:
                 if container.get("lastStatus") == "STOPPED" and 
container.get("exitCode", 1) != 0:
+                    exit_code = container.get("exitCode", 1)
+                    if exit_code in self.skip_on_exit_code:
+                        exception_cls: type[AirflowException] = 
AirflowSkipException
+                    else:
+                        exception_cls = AirflowException
+
                     if self.task_log_fetcher:
                         last_logs = "\n".join(
                             
self.task_log_fetcher.get_last_log_messages(self.number_logs_exception)
                         )
-                        raise AirflowException(
+                        raise exception_cls(
                             f"This task is not in success state - last 
{self.number_logs_exception} "
                             f"logs from Cloudwatch:\n{last_logs}"
                         )
-                    raise AirflowException(f"This task is not in success state 
{task}")
+                    raise exception_cls(f"This task is not in success state 
{task}")
                 if container.get("lastStatus") == "PENDING":
                     raise AirflowException(f"This task is still pending 
{task}")
                 if "error" in container.get("reason", "").lower():
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
index 5a866e36e11..946b086e0fe 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
@@ -38,7 +38,7 @@ from airflow.providers.amazon.aws.operators.ecs import (
 from airflow.providers.amazon.aws.triggers.ecs import TaskDoneTrigger
 from airflow.providers.amazon.aws.utils.task_log_fetcher import 
AwsTaskLogFetcher
 from airflow.providers.amazon.version_compat import NOTSET
-from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred
+from airflow.providers.common.compat.sdk import AirflowException, 
AirflowSkipException, TaskDeferred
 
 from unit.amazon.aws.utils.test_template_fields import validate_template_fields
 
@@ -603,6 +603,41 @@ class TestEcsRunTaskOperator(EcsBaseTestCase):
         self.ecs._check_success_task()
         client_mock.describe_tasks.assert_called_once_with(cluster="c", 
tasks=["arn"])
 
+    @mock.patch.object(EcsBaseOperator, "client")
+    def test_check_success_task_raises_skip_exception(self, client_mock):
+        self.ecs.arn = "arn"
+        self.ecs.skip_on_exit_code = [2]
+        client_mock.describe_tasks.return_value = {
+            "tasks": [{"containers": [{"name": "container-name", "lastStatus": 
"STOPPED", "exitCode": 2}]}]
+        }
+        with pytest.raises(AirflowSkipException):
+            self.ecs._check_success_task()
+
+    @mock.patch.object(EcsBaseOperator, "client")
+    
@mock.patch("airflow.providers.amazon.aws.utils.task_log_fetcher.AwsTaskLogFetcher")
+    def test_check_success_task_skip_exception_with_logs(self, 
log_fetcher_mock, client_mock):
+        self.ecs.arn = "arn"
+        self.ecs.skip_on_exit_code = [2]
+        self.ecs.task_log_fetcher = log_fetcher_mock
+        log_fetcher_mock.get_last_log_messages.return_value = ["log1", "log2"]
+        client_mock.describe_tasks.return_value = {
+            "tasks": [{"containers": [{"name": "container-name", "lastStatus": 
"STOPPED", "exitCode": 2}]}]
+        }
+        with pytest.raises(AirflowSkipException, match="This task is not in 
success state"):
+            self.ecs._check_success_task()
+
+    @mock.patch.object(EcsBaseOperator, "client")
+    def 
test_check_success_task_unmatched_exit_code_raises_airflow_exception(self, 
client_mock):
+        """Exit codes not in skip_on_exit_code raise AirflowException."""
+        self.ecs.arn = "arn"
+        self.ecs.skip_on_exit_code = [2]
+        client_mock.describe_tasks.return_value = {
+            "tasks": [{"containers": [{"name": "container-name", "lastStatus": 
"STOPPED", "exitCode": 1}]}]
+        }
+        with pytest.raises(AirflowException) as ctx:
+            self.ecs._check_success_task()
+        assert type(ctx.value) is AirflowException
+
     @pytest.mark.parametrize(
         ("launch_type", "tags"),
         [

Reply via email to