This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 75ca1714f96 fix: Support log level parsing from container output in
`EcsRunTaskOperator` (#67180)
75ca1714f96 is described below
commit 75ca1714f963d75aa2f9af6f2ddb2605decf6670
Author: Zach Liu <[email protected]>
AuthorDate: Thu May 21 13:12:14 2026 -0400
fix: Support log level parsing from container output in
`EcsRunTaskOperator` (#67180)
* Add log level detection for CloudWatch ECS task logs
* Add log level detection to AwsTaskLogFetcher
---
.../airflow/providers/amazon/aws/triggers/ecs.py | 5 +-
.../providers/amazon/aws/utils/task_log_fetcher.py | 48 +++++++-
.../unit/amazon/aws/utils/test_task_log_fetcher.py | 124 ++++++++++++++++++++-
3 files changed, 168 insertions(+), 9 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/ecs.py
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/ecs.py
index ae38079ff53..3768af58a12 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/ecs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/ecs.py
@@ -26,7 +26,7 @@ from botocore.exceptions import ClientError, WaiterError
from airflow.providers.amazon.aws.hooks.ecs import EcsHook
from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
-from airflow.providers.amazon.aws.utils.task_log_fetcher import
AwsTaskLogFetcher
+from airflow.providers.amazon.aws.utils.task_log_fetcher import
AwsTaskLogFetcher, _parse_log_level
from airflow.providers.common.compat.sdk import AirflowException
from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -228,7 +228,8 @@ class TaskDoneTrigger(BaseTrigger):
events = response["events"]
for log_event in events:
- self.log.info(AwsTaskLogFetcher.event_to_str(log_event))
+ level = _parse_log_level(log_event["message"])
+ self.log.log(level, AwsTaskLogFetcher.event_to_str(log_event))
if len(events) == 0 or next_token == response["nextForwardToken"]:
return response["nextForwardToken"]
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/utils/task_log_fetcher.py
b/providers/amazon/src/airflow/providers/amazon/aws/utils/task_log_fetcher.py
index c509c2a7fa7..dc0c35aedb2 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/utils/task_log_fetcher.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/utils/task_log_fetcher.py
@@ -17,6 +17,9 @@
from __future__ import annotations
+import json
+import logging
+import re
import time
from collections.abc import Generator
from datetime import datetime, timedelta, timezone
@@ -30,9 +33,49 @@ from airflow.providers.amazon.aws.hooks.logs import
AwsLogsHook
if TYPE_CHECKING:
from airflow.sdk.types import Logger
+_LOG_LEVEL_PATTERN = re.compile(
+
r"^\s*(?:\[)?(DEBUG|INFO|WARNING|WARN|ERROR|CRITICAL|FATAL)(?:\])?\s*[:\-]?\s*",
+ re.IGNORECASE,
+)
+_LOG_LEVEL_MAP: dict[str, int] = {
+ "DEBUG": logging.DEBUG,
+ "INFO": logging.INFO,
+ "WARNING": logging.WARNING,
+ "WARN": logging.WARNING,
+ "ERROR": logging.ERROR,
+ "CRITICAL": logging.CRITICAL,
+ "FATAL": logging.CRITICAL,
+}
+
+
+def _parse_log_level(message: str) -> int:
+ """
+ Detect the Python logging level from a CloudWatch log message.
+
+ Supports two formats:
+ 1. JSON-structured logs with a ``levelname`` or ``level`` field.
+ 2. Plain-text prefixes such as ``ERROR:``, ``[WARNING]``, ``CRITICAL -``,
etc.
+
+ Returns ``logging.INFO`` when no known level is found
(backwards-compatible).
+ """
+ stripped = message.strip()
+ if stripped.startswith("{"):
+ try:
+ parsed = json.loads(stripped)
+ level_str = parsed.get("levelname") or parsed.get("level") or ""
+ level = _LOG_LEVEL_MAP.get(level_str.upper(), -1)
+ if level != -1:
+ return level
+ except (json.JSONDecodeError, AttributeError):
+ pass
+ match = _LOG_LEVEL_PATTERN.match(message)
+ if match:
+ return _LOG_LEVEL_MAP.get(match.group(1).upper(), logging.INFO)
+ return logging.INFO
+
class AwsTaskLogFetcher(Thread):
- """Fetch Cloudwatch log events with specific interval and send the log
events to the logger.info."""
+ """Fetch Cloudwatch log events with specific interval and forward them at
the detected log level."""
def __init__(
self,
@@ -72,7 +115,8 @@ class AwsTaskLogFetcher(Thread):
# When a slight delay is added before logging the event,
that solves the issue
# See https://github.com/apache/airflow/issues/40875
time.sleep(0.001)
- self.logger.info(self.event_to_str(log_event))
+ level = _parse_log_level(log_event["message"])
+ self.logger.log(level, self.event_to_str(log_event))
prev_timestamp_event = current_timestamp_event
def _get_log_events(self, skip_token: AwsLogsHook.ContinuationToken | None
= None) -> Generator:
diff --git
a/providers/amazon/tests/unit/amazon/aws/utils/test_task_log_fetcher.py
b/providers/amazon/tests/unit/amazon/aws/utils/test_task_log_fetcher.py
index c6da6208373..66f883120eb 100644
--- a/providers/amazon/tests/unit/amazon/aws/utils/test_task_log_fetcher.py
+++ b/providers/amazon/tests/unit/amazon/aws/utils/test_task_log_fetcher.py
@@ -17,6 +17,7 @@
from __future__ import annotations
+import logging
from datetime import timedelta
from unittest import mock
from unittest.mock import PropertyMock
@@ -25,7 +26,7 @@ import pytest
from botocore.exceptions import ClientError
from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
-from airflow.providers.amazon.aws.utils.task_log_fetcher import
AwsTaskLogFetcher
+from airflow.providers.amazon.aws.utils.task_log_fetcher import
AwsTaskLogFetcher, _parse_log_level
class TestAwsTaskLogFetcher:
@@ -67,11 +68,11 @@ class TestAwsTaskLogFetcher:
def test_run(self, get_log_events_mock, event_is_set_mock):
self.log_fetcher.run()
- self.logger_mock.info.assert_has_calls(
+ self.logger_mock.log.assert_has_calls(
[
- mock.call("[2021-04-02 21:51:07,123] First"),
- mock.call("[2021-04-02 21:52:47,456] Second"),
- mock.call("[2021-04-02 21:54:27,789] Third"),
+ mock.call(logging.INFO, "[2021-04-02 21:51:07,123] First"),
+ mock.call(logging.INFO, "[2021-04-02 21:52:47,456] Second"),
+ mock.call(logging.INFO, "[2021-04-02 21:54:27,789] Third"),
]
)
@@ -144,3 +145,116 @@ class TestAwsTaskLogFetcher:
@mock.patch.object(AwsLogsHook, "conn")
def test_get_last_log_messages_with_no_log_events(self, mock_conn):
assert self.log_fetcher.get_last_log_messages(2) == []
+
+ @mock.patch(
+ "threading.Event.is_set",
+ side_effect=(False, True),
+ )
+ @mock.patch(
+ "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events",
+ side_effect=(
+ iter(
+ [
+ {
+ "timestamp": 1617400267123,
+ "message": '{"levelname": "ERROR", "message":
"Something failed"}',
+ },
+ {
+ "timestamp": 1617400367456,
+ "message": "WARNING: disk space low",
+ },
+ {
+ "timestamp": 1617400467789,
+ "message": "Just a plain message",
+ },
+ ]
+ ),
+ ),
+ )
+ def test_run_with_log_level_detection(self, get_log_events_mock,
event_is_set_mock):
+ self.log_fetcher.run()
+
+ self.logger_mock.log.assert_has_calls(
+ [
+ mock.call(
+ logging.ERROR,
+ '[2021-04-02 21:51:07,123] {"levelname": "ERROR",
"message": "Something failed"}',
+ ),
+ mock.call(logging.WARNING, "[2021-04-02 21:52:47,456] WARNING:
disk space low"),
+ mock.call(logging.INFO, "[2021-04-02 21:54:27,789] Just a
plain message"),
+ ]
+ )
+
+
+class TestParseLogLevel:
+ @pytest.mark.parametrize(
+ ("message", "expected_level"),
+ [
+ ('{"levelname": "ERROR", "message": "fail"}', logging.ERROR),
+ ('{"levelname": "WARNING", "message": "warn"}', logging.WARNING),
+ ('{"levelname": "DEBUG", "message": "dbg"}', logging.DEBUG),
+ ('{"levelname": "CRITICAL", "message": "crit"}', logging.CRITICAL),
+ ('{"levelname": "INFO", "message": "ok"}', logging.INFO),
+ ('{"level": "error", "msg": "fail"}', logging.ERROR),
+ ('{"level": "WARNING", "msg": "warn"}', logging.WARNING),
+ ],
+ ids=[
+ "json-error",
+ "json-warning",
+ "json-debug",
+ "json-critical",
+ "json-info",
+ "json-level-field-lowercase",
+ "json-level-field-uppercase",
+ ],
+ )
+ def test_json_structured_logs(self, message, expected_level):
+ assert _parse_log_level(message) == expected_level
+
+ @pytest.mark.parametrize(
+ ("message", "expected_level"),
+ [
+ ("ERROR: something broke", logging.ERROR),
+ ("WARNING: watch out", logging.WARNING),
+ ("WARN: also watch out", logging.WARNING),
+ ("DEBUG: details", logging.DEBUG),
+ ("CRITICAL: very bad", logging.CRITICAL),
+ ("FATAL: system down", logging.CRITICAL),
+ ("[ERROR] something broke", logging.ERROR),
+ ("[WARNING] watch out", logging.WARNING),
+ ("INFO - all good", logging.INFO),
+ ],
+ ids=[
+ "prefix-error",
+ "prefix-warning",
+ "prefix-warn",
+ "prefix-debug",
+ "prefix-critical",
+ "prefix-fatal",
+ "bracketed-error",
+ "bracketed-warning",
+ "prefix-info-dash",
+ ],
+ )
+ def test_plain_text_prefix(self, message, expected_level):
+ assert _parse_log_level(message) == expected_level
+
+ @pytest.mark.parametrize(
+ "message",
+ [
+ "Just a regular log message",
+ "This message mentions ERROR in the middle",
+ "",
+ "2026-05-18 08:06:04 some log",
+ "{invalid json",
+ ],
+ ids=[
+ "plain-text",
+ "error-in-middle",
+ "empty",
+ "timestamp-prefix",
+ "invalid-json",
+ ],
+ )
+ def test_defaults_to_info(self, message):
+ assert _parse_log_level(message) == logging.INFO