This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 230da3e399c EcsRunTaskOperator fails when no containers are provided in the response (#51692) 230da3e399c is described below commit 230da3e399c26699daad9fd95a413a844f7cb13f Author: Dominik <105610163+dominik...@users.noreply.github.com> AuthorDate: Sat Sep 6 05:52:39 2025 +0200 EcsRunTaskOperator fails when no containers are provided in the response (#51692) * Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active * Adujusted the test case to better reflect actual ecs response --- .../airflow/providers/amazon/aws/operators/ecs.py | 25 +++++++-- .../tests/unit/amazon/aws/operators/test_ecs.py | 62 ++++++++++++++++++++++ 2 files changed, 84 insertions(+), 3 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py index 30baf423986..0152c465455 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py @@ -21,6 +21,7 @@ import re from collections.abc import Sequence from datetime import timedelta from functools import cached_property +from time import sleep from typing import TYPE_CHECKING, Any from airflow.configuration import conf @@ -629,10 +630,22 @@ class EcsRunTaskOperator(EcsBaseOperator): self.log.info("ECS Task started: %s", response) self.arn = response["tasks"][0]["taskArn"] - if not self.container_name: - self.container_name = response["tasks"][0]["containers"][0]["name"] self.log.info("ECS task ID is: %s", self._get_ecs_task_id(self.arn)) + if not self.container_name and (self.awslogs_group and self.awslogs_stream_prefix): + backoff_schedule = [10, 30] + for delay in backoff_schedule: + sleep(delay) + response = self.client.describe_tasks(cluster=self.cluster, tasks=[self.arn]) + containers = response["tasks"][0].get("containers", []) + if containers: + self.container_name = containers[0]["name"] + if self.container_name: + break + + if not self.container_name: + self.log.info("Could not find container name, required for the log stream after 2 tries") + def _try_reattach_task(self, started_by: str): if not started_by: raise AirflowException("`started_by` should not be empty or None") @@ -666,7 +679,13 @@ class EcsRunTaskOperator(EcsBaseOperator): return self.awslogs_group and self.awslogs_stream_prefix def _get_logs_stream_name(self) -> str: - if ( + if not self.container_name and self.awslogs_stream_prefix and "/" not in self.awslogs_stream_prefix: + self.log.warning( + "Container name could not be inferred and awslogs_stream_prefix '%s' does not contain '/'. " + "This may cause issues when extracting logs from Cloudwatch.", + self.awslogs_stream_prefix, + ) + elif ( self.awslogs_stream_prefix and self.container_name and not self.awslogs_stream_prefix.endswith(f"/{self.container_name}") diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py b/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py index efc6d33630e..23d3fdc9282 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py @@ -81,6 +81,20 @@ RESPONSE_WITHOUT_FAILURES = { } ], } +RESPONSE_WITHOUT_NAME = { + "failures": [], + "tasks": [ + { + "containers": [], + "desiredStatus": "RUNNING", + "lastStatus": "PENDING", + "taskArn": f"arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}", + "taskDefinitionArn": "arn:aws:ecs:us-east-1:012345678910:task-definition/hello_world:11", + } + ], +} + + WAITERS_TEST_CASES = [ pytest.param(None, None, id="default-values"), pytest.param(3.14, None, id="set-delay-only"), @@ -788,6 +802,54 @@ class TestEcsRunTaskOperator(EcsBaseTestCase): assert self.ecs._get_logs_stream_name().startswith(f"{prefix}/{container_name}/") + @mock.patch.object(EcsBaseOperator, "client") + @mock.patch("airflow.providers.amazon.aws.operators.ecs.sleep", return_value=None) + def test_container_name_not_set(self, sleep_mock, client_mock): + self.set_up_operator( + awslogs_group="awslogs-group", + awslogs_stream_prefix="prefix", + container_name=None, + ) + client_mock.run_task.return_value = RESPONSE_WITHOUT_NAME + client_mock.describe_tasks.side_effect = [ + {"tasks": [{"containers": []}]}, + {"tasks": [{"containers": [{"name": "resolved-container"}]}]}, + ] + self.ecs._start_task() + assert client_mock.describe_tasks.call_count == 2 + assert self.ecs.container_name == "resolved-container" + + @mock.patch.object(EcsBaseOperator, "client") + @mock.patch.object(EcsBaseOperator, "log") + @mock.patch("airflow.providers.amazon.aws.operators.ecs.sleep", return_value=None) + def test_container_name_resolution_fails_logs_message(self, sleep_mock, log_mock, client_mock): + self.set_up_operator( + awslogs_group="test-group", + awslogs_stream_prefix="prefix", + container_name=None, + ) + client_mock.run_task.return_value = RESPONSE_WITHOUT_NAME + client_mock.describe_tasks.return_value = {"tasks": [{"containers": [{"name": None}]}]} + + self.ecs._start_task() + + assert client_mock.describe_tasks.call_count == 2 + assert self.ecs.container_name is None + log_mock.info.assert_called_with( + "Could not find container name, required for the log stream after 2 tries" + ) + + @mock.patch.object(EcsBaseOperator, "client") + def test_container_name_not_polled(self, client_mock): + self.set_up_operator( + awslogs_group=None, + awslogs_stream_prefix=None, + container_name=None, + ) + client_mock.run_task.return_value = RESPONSE_WITHOUT_NAME + self.ecs._start_task() + assert client_mock.describe_tasks.call_count == 0 + class TestEcsCreateClusterOperator(EcsBaseTestCase): @pytest.mark.parametrize("waiter_delay, waiter_max_attempts", WAITERS_TEST_CASES)