This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 230da3e399c EcsRunTaskOperator fails when no containers are provided 
in the response (#51692)
230da3e399c is described below

commit 230da3e399c26699daad9fd95a413a844f7cb13f
Author: Dominik <105610163+dominik...@users.noreply.github.com>
AuthorDate: Sat Sep 6 05:52:39 2025 +0200

    EcsRunTaskOperator fails when no containers are provided in the response 
(#51692)
    
    * Added logic to safely access the container name only if its required and 
poll multiple times in case the task is not yet active
    
    * Adujusted the test case to better reflect actual ecs response
---
 .../airflow/providers/amazon/aws/operators/ecs.py  | 25 +++++++--
 .../tests/unit/amazon/aws/operators/test_ecs.py    | 62 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 3 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
index 30baf423986..0152c465455 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
@@ -21,6 +21,7 @@ import re
 from collections.abc import Sequence
 from datetime import timedelta
 from functools import cached_property
+from time import sleep
 from typing import TYPE_CHECKING, Any
 
 from airflow.configuration import conf
@@ -629,10 +630,22 @@ class EcsRunTaskOperator(EcsBaseOperator):
         self.log.info("ECS Task started: %s", response)
 
         self.arn = response["tasks"][0]["taskArn"]
-        if not self.container_name:
-            self.container_name = response["tasks"][0]["containers"][0]["name"]
         self.log.info("ECS task ID is: %s", self._get_ecs_task_id(self.arn))
 
+        if not self.container_name and (self.awslogs_group and 
self.awslogs_stream_prefix):
+            backoff_schedule = [10, 30]
+            for delay in backoff_schedule:
+                sleep(delay)
+                response = self.client.describe_tasks(cluster=self.cluster, 
tasks=[self.arn])
+                containers = response["tasks"][0].get("containers", [])
+                if containers:
+                    self.container_name = containers[0]["name"]
+                if self.container_name:
+                    break
+
+            if not self.container_name:
+                self.log.info("Could not find container name, required for the 
log stream after 2 tries")
+
     def _try_reattach_task(self, started_by: str):
         if not started_by:
             raise AirflowException("`started_by` should not be empty or None")
@@ -666,7 +679,13 @@ class EcsRunTaskOperator(EcsBaseOperator):
         return self.awslogs_group and self.awslogs_stream_prefix
 
     def _get_logs_stream_name(self) -> str:
-        if (
+        if not self.container_name and self.awslogs_stream_prefix and "/" not 
in self.awslogs_stream_prefix:
+            self.log.warning(
+                "Container name could not be inferred and 
awslogs_stream_prefix '%s' does not contain '/'. "
+                "This may cause issues when extracting logs from Cloudwatch.",
+                self.awslogs_stream_prefix,
+            )
+        elif (
             self.awslogs_stream_prefix
             and self.container_name
             and not 
self.awslogs_stream_prefix.endswith(f"/{self.container_name}")
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
index efc6d33630e..23d3fdc9282 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
@@ -81,6 +81,20 @@ RESPONSE_WITHOUT_FAILURES = {
         }
     ],
 }
+RESPONSE_WITHOUT_NAME = {
+    "failures": [],
+    "tasks": [
+        {
+            "containers": [],
+            "desiredStatus": "RUNNING",
+            "lastStatus": "PENDING",
+            "taskArn": f"arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}",
+            "taskDefinitionArn": 
"arn:aws:ecs:us-east-1:012345678910:task-definition/hello_world:11",
+        }
+    ],
+}
+
+
 WAITERS_TEST_CASES = [
     pytest.param(None, None, id="default-values"),
     pytest.param(3.14, None, id="set-delay-only"),
@@ -788,6 +802,54 @@ class TestEcsRunTaskOperator(EcsBaseTestCase):
 
         assert 
self.ecs._get_logs_stream_name().startswith(f"{prefix}/{container_name}/")
 
+    @mock.patch.object(EcsBaseOperator, "client")
+    @mock.patch("airflow.providers.amazon.aws.operators.ecs.sleep", 
return_value=None)
+    def test_container_name_not_set(self, sleep_mock, client_mock):
+        self.set_up_operator(
+            awslogs_group="awslogs-group",
+            awslogs_stream_prefix="prefix",
+            container_name=None,
+        )
+        client_mock.run_task.return_value = RESPONSE_WITHOUT_NAME
+        client_mock.describe_tasks.side_effect = [
+            {"tasks": [{"containers": []}]},
+            {"tasks": [{"containers": [{"name": "resolved-container"}]}]},
+        ]
+        self.ecs._start_task()
+        assert client_mock.describe_tasks.call_count == 2
+        assert self.ecs.container_name == "resolved-container"
+
+    @mock.patch.object(EcsBaseOperator, "client")
+    @mock.patch.object(EcsBaseOperator, "log")
+    @mock.patch("airflow.providers.amazon.aws.operators.ecs.sleep", 
return_value=None)
+    def test_container_name_resolution_fails_logs_message(self, sleep_mock, 
log_mock, client_mock):
+        self.set_up_operator(
+            awslogs_group="test-group",
+            awslogs_stream_prefix="prefix",
+            container_name=None,
+        )
+        client_mock.run_task.return_value = RESPONSE_WITHOUT_NAME
+        client_mock.describe_tasks.return_value = {"tasks": [{"containers": 
[{"name": None}]}]}
+
+        self.ecs._start_task()
+
+        assert client_mock.describe_tasks.call_count == 2
+        assert self.ecs.container_name is None
+        log_mock.info.assert_called_with(
+            "Could not find container name, required for the log stream after 
2 tries"
+        )
+
+    @mock.patch.object(EcsBaseOperator, "client")
+    def test_container_name_not_polled(self, client_mock):
+        self.set_up_operator(
+            awslogs_group=None,
+            awslogs_stream_prefix=None,
+            container_name=None,
+        )
+        client_mock.run_task.return_value = RESPONSE_WITHOUT_NAME
+        self.ecs._start_task()
+        assert client_mock.describe_tasks.call_count == 0
+
 
 class TestEcsCreateClusterOperator(EcsBaseTestCase):
     @pytest.mark.parametrize("waiter_delay, waiter_max_attempts", 
WAITERS_TEST_CASES)

Reply via email to