This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5ca62b8f23d Fix Cloudwatch remote logging (#48774)
5ca62b8f23d is described below
commit 5ca62b8f23ddb23bfb468044dfc08ef1126031ff
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Apr 4 06:25:37 2025 -0700
Fix Cloudwatch remote logging (#48774)
There were three main issues:
1) A circular loop that eventually fails due to call depth exceeded. This
is because the handler was lazily initted during the first log emission. But
when the handler is created some code down stream tries to log, and since there
is no handler yet (because we're in the middle of creating it), it tries to
create another one, and we were spinning ad infinitum.
2) The stream name is not set on read, because we don't call set_context
anywhere in the SDK path, and the processor doesn't have access to the TI
anyway (which is used for the stream name). So a 0 byte stream name was being
used and was causing a failure in Watchtower.
3) read is also failing because it is using the relative_path as the stream
name, which is almost right, but the name isn't sanitized (there are some
characters that cloudwatch doesn't allow in a stream name). set_context used to
sanitize the name and set it, but it isn't called in the SDK path.
---
.../amazon/aws/log/cloudwatch_task_handler.py | 13 ++++-
.../amazon/aws/log/test_cloudwatch_task_handler.py | 65 ++++++++++++++--------
task-sdk/src/airflow/sdk/log.py | 28 ++++++----
3 files changed, 70 insertions(+), 36 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index 37cff5bca19..6191cb82f50 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -116,8 +116,18 @@ class CloudWatchRemoteLogIO(LoggingMixin): # noqa: D101
import structlog.stdlib
logRecordFactory = getLogRecordFactory()
+ # The handler MUST be initted here, before the processor is actually
used to log anything.
+ # Otherwise, logging that occurs during the creation of the handler
can create infinite loops.
+ _handler = self.handler
+ from airflow.sdk.log import relative_path_from_logger
def proc(logger: structlog.typing.WrappedLogger, method_name: str,
event: structlog.typing.EventDict):
+ if not logger or not (stream_name :=
relative_path_from_logger(logger)):
+ return event
+ # Only init the handler stream_name once. We cannot do it above
when we init the handler because
+ # we don't yet know the log path at that point.
+ if not _handler.log_stream_name:
+ _handler.log_stream_name = stream_name.as_posix().replace(":",
"_")
name = event.get("logger_name") or event.get("logger", "")
level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(),
logging.INFO)
msg = copy.copy(event)
@@ -134,7 +144,7 @@ class CloudWatchRemoteLogIO(LoggingMixin): # noqa: D101
ct = created.timestamp()
record.created = ct
record.msecs = int((ct - int(ct)) * 1000) + 0.0 # Copied from
stdlib logging
- self.handler.handle(record)
+ _handler.handle(record)
return event
return (proc,)
@@ -177,6 +187,7 @@ class CloudWatchRemoteLogIO(LoggingMixin): # noqa: D101
:param task_instance: the task instance to get logs about
:return: string of all logs from the given log stream
"""
+ stream_name = stream_name.replace(":", "_")
# If there is an end_date to the task instance, fetch logs until that
date + 30 seconds
# 30 seconds is an arbitrary buffer so that we don't miss any logs
that were emitted
end_time = (
diff --git
a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
index dd765ab6945..3696f552d76 100644
--- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -75,19 +75,24 @@ class TestCloudRemoteLogIO:
self.remote_log_group = "log_group_name"
self.region_name = "us-west-2"
+ self.task_log_path = "dag_id=a/0:0.log"
self.local_log_location = tmp_path / "local-cloudwatch-log-location"
self.local_log_location.mkdir()
+ # Create the local log file structure
+ task_log_path_parts = self.task_log_path.split("/")
+ dag_dir = self.local_log_location / task_log_path_parts[0]
+ dag_dir.mkdir()
+ task_log_file = dag_dir / task_log_path_parts[1]
+ task_log_file.touch()
# The subject under test
self.subject = CloudWatchRemoteLogIO(
base_log_folder=self.local_log_location,
log_group_arn=f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
- log_stream_name="dag_id=a/0.log",
)
conn = boto3.client("logs", region_name=self.region_name)
conn.create_log_group(logGroupName=self.remote_log_group)
- conn.create_log_stream(logGroupName=self.remote_log_group,
logStreamName=self.subject.log_stream_name)
processors = structlog.get_config()["processors"]
old_processors = processors.copy()
@@ -108,7 +113,13 @@ class TestCloudRemoteLogIO:
raise structlog.DropEvent()
processors[-1] = drop
- structlog.configure(processors=processors)
+ structlog.configure(
+ processors=processors,
+ # Create a logger factory and pass in the file path we want it
to use
+ # This is because we use the logger to determine the
streamname/filepath
+ # in the CloudWatchRemoteLogIO processor.
+
logger_factory=structlog.PrintLoggerFactory(file=task_log_file.open("w+")),
+ )
yield
finally:
# remove LogCapture and restore original processors
@@ -118,32 +129,38 @@ class TestCloudRemoteLogIO:
@time_machine.travel(datetime(2025, 3, 27, 21, 58, 1, 2345), tick=False)
def test_log_message(self):
- import structlog
+ # Use a context instead of a decorator on the test method because we
need access to self to
+ # get the path from the setup method.
+ with conf_vars({("logging", "base_log_folder"):
self.local_log_location.as_posix()}):
+ import structlog
- log = structlog.get_logger()
- log.info("Hi", foo="bar")
- # We need to close in order to flush the logs etc.
- self.subject.close()
+ log = structlog.get_logger()
+ log.info("Hi", foo="bar")
+ # We need to close in order to flush the logs etc.
+ self.subject.close()
- logs = self.subject.read("dag_id=a/0.log", self.ti)
+ # Inside the Cloudwatch logger we swap colons for underscores
since colons are not allowed in
+ # stream names.
+ stream_name = self.task_log_path.replace(":", "_")
+ logs = self.subject.read(stream_name, self.ti)
- if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.log.file_task_handler import
StructuredLogMessage
+ if AIRFLOW_V_3_0_PLUS:
+ from airflow.utils.log.file_task_handler import
StructuredLogMessage
- metadata, logs = logs
+ metadata, logs = logs
- results = TypeAdapter(list[StructuredLogMessage]).dump_python(logs)
- assert metadata == [
- "Reading remote log from Cloudwatch log_group: log_group_name
log_stream: dag_id=a/0.log"
- ]
- assert results == [
- {
- "event": "Hi",
- "foo": "bar",
- "level": "info",
- "timestamp": datetime(2025, 3, 27, 21, 58, 1, 2000,
tzinfo=TzInfo(0)),
- },
- ]
+ results =
TypeAdapter(list[StructuredLogMessage]).dump_python(logs)
+ assert metadata == [
+ f"Reading remote log from Cloudwatch log_group:
log_group_name log_stream: {stream_name}"
+ ]
+ assert results == [
+ {
+ "event": "Hi",
+ "foo": "bar",
+ "level": "info",
+ "timestamp": datetime(2025, 3, 27, 21, 58, 1, 2000,
tzinfo=TzInfo(0)),
+ },
+ ]
def test_event_to_str(self):
handler = self.subject
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index d209a14a7f4..8c6e6ff8f9d 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -496,26 +496,32 @@ def load_remote_log_handler() -> RemoteLogIO | None:
return airflow.logging_config.REMOTE_TASK_LOG
-def upload_to_remote(logger: FilteringBoundLogger):
- from airflow.configuration import conf
-
- raw_logger = getattr(logger, "_logger")
-
- if not raw_logger or not hasattr(raw_logger, "_file"):
+def relative_path_from_logger(logger) -> Path | None:
+ if not logger:
+ return None
+ if not hasattr(logger, "_file"):
logger.warning("Unable to find log file, logger was of unexpected
type", type=type(logger))
- return
+ return None
- fh = raw_logger._file
+ fh = logger._file
fname = fh.name
if fh.fileno() == 1 or not isinstance(fname, str):
# Logging to stdout, or something odd about this logger, don't try to
upload!
- return
+ return None
+ from airflow.configuration import conf
+
base_log_folder = conf.get("logging", "base_log_folder")
- relative_path = Path(fname).relative_to(base_log_folder)
+ return Path(fname).relative_to(base_log_folder)
+
+
+def upload_to_remote(logger: FilteringBoundLogger):
+ raw_logger = getattr(logger, "_logger")
+
+ relative_path = relative_path_from_logger(raw_logger)
handler = load_remote_log_handler()
- if not handler:
+ if not handler or not relative_path:
return
log_relative_path = relative_path.as_posix()