This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7fc80981334 Re-enable uploading to remote blob storage from Trigger
and executors for the beta (#47115)
7fc80981334 is described below
commit 7fc8098133452a9c9b0eb5a1bc7e17bde5f8ce62
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Feb 26 20:12:05 2025 +0000
Re-enable uploading to remote blob storage from Trigger and executors for
the beta (#47115)
The way this achieved is the epitome of hackyness, but it works well enough
for
S3, GCS and WASB task handlers.
It won't work for the "non-blob" based logging backends such as
CloudWatchLogs, and it only works with the way the 3 blob-based log backends
are currently implemented.
I want remote logs to be working for most people for the Beta, and I will
follow up with a better fix (that doesn't rely on internal implementation
details and works for log "streams" too.)
---
airflow/jobs/triggerer_job_runner.py | 22 ++++++++---
airflow/models/trigger.py | 5 ++-
.../src/airflow/sdk/execution_time/supervisor.py | 32 ++++++++++++----
task_sdk/src/airflow/sdk/log.py | 43 +++++++++++++++++++++-
task_sdk/tests/execution_time/test_supervisor.py | 4 ++
tests/dag_processing/test_manager.py | 1 +
tests/jobs/test_triggerer_job.py | 1 +
7 files changed, 92 insertions(+), 16 deletions(-)
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index 0e82cd1c193..b630f66ee13 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -234,6 +234,15 @@ class TriggerLoggingFactory:
self.bound_logger = logger
return logger
+ def upload_to_remote(self):
+ from airflow.sdk.log import upload_to_remote
+
+ if not hasattr(self, "bound_logger"):
+ # Never actually called, nothing to do
+ return
+
+ upload_to_remote(self.bound_logger)
+
@attrs.define(kw_only=True)
class TriggerRunnerSupervisor(WatchedSubprocess):
@@ -299,7 +308,8 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
self.cancelling_triggers.discard(id)
# Remove logger from the cache, and since structlog doesn't
have an explicit close method, we
# only need to remove the last reference to it to close the
open FH
- self.logger_cache.pop(id, None)
+ if factory := self.logger_cache.pop(id, None):
+ factory.upload_to_remote()
return
raise ValueError(f"Unknown message type {type(msg)}")
@@ -460,10 +470,8 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
self.cancelling_triggers.update(cancel_trigger_ids)
self._send(messages.CancelTriggers(ids=cancel_trigger_ids))
- def _register_pipe_readers(
- self, logger: FilteringBoundLogger, stdout: socket, stderr: socket,
requests: socket, logs: socket
- ):
- super()._register_pipe_readers(logger, stdout, stderr, requests, logs)
+ def _register_pipe_readers(self, stdout: socket, stderr: socket, requests:
socket, logs: socket):
+ super()._register_pipe_readers(stdout, stderr, requests, logs)
# We want to handle logging differently here, so un-register the one
our parent class created
self.selector.unregister(logs)
@@ -488,7 +496,9 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
def get_logger(trigger_id: int) -> WrappedLogger:
# TODO: Is a separate dict worth it, or should we make
`self.running_triggers` a dict?
- return self.logger_cache[trigger_id](processors)
+ if factory := self.logger_cache.get(trigger_id):
+ return factory(processors)
+ return fallback_log
# We need to look at the json, pull out the
while True:
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 478eb89a675..e7bdca0b5bd 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -246,7 +246,10 @@ class Trigger(Base):
handle_event_submit(event, task_instance=task_instance,
session=session)
# Send an event to assets
- trigger = session.scalars(select(cls).where(cls.id ==
trigger_id)).one()
+ trigger = session.scalars(select(cls).where(cls.id ==
trigger_id)).one_or_none()
+ if trigger is None:
+ # Already deleted for some reason
+ return
for asset in trigger.assets:
AssetManager.register_asset_change(
asset=asset.to_public(), session=session,
extra={"from_trigger": True}
diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
index fada6105bce..8bc867a3f5b 100644
--- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -316,6 +316,8 @@ class WatchedSubprocess:
selector: selectors.BaseSelector =
attrs.field(factory=selectors.DefaultSelector)
+ log: FilteringBoundLogger
+
@classmethod
def start(
cls,
@@ -363,17 +365,17 @@ class WatchedSubprocess:
# other end of the pair open
cls._close_unused_sockets(child_stdin, child_stdout, child_stderr,
child_comms, child_logs)
+ logger = logger or cast("FilteringBoundLogger",
structlog.get_logger(logger_name="task").bind())
proc = cls(
pid=pid,
stdin=feed_stdin,
process=psutil.Process(pid),
requests_fd=requests_fd,
+ log=logger,
**constructor_kwargs,
)
- logger = logger or cast("FilteringBoundLogger",
structlog.get_logger(logger_name="task").bind())
proc._register_pipe_readers(
- logger=logger,
stdout=read_stdout,
stderr=read_stderr,
requests=read_msgs,
@@ -382,26 +384,24 @@ class WatchedSubprocess:
return proc
- def _register_pipe_readers(
- self, logger: FilteringBoundLogger, stdout: socket, stderr: socket,
requests: socket, logs: socket
- ):
+ def _register_pipe_readers(self, stdout: socket, stderr: socket, requests:
socket, logs: socket):
"""Register handlers for subprocess communication channels."""
# self.selector is a way of registering a handler/callback to be
called when the given IO channel has
# activity to read on
(https://www.man7.org/linux/man-pages/man2/select.2.html etc, but better
# alternatives are used automatically) -- this is a way of having
"event-based" code, but without
# needing full async, to read and process output from each socket as
it is received.
- self.selector.register(stdout, selectors.EVENT_READ,
self._create_socket_handler(logger, "stdout"))
+ self.selector.register(stdout, selectors.EVENT_READ,
self._create_socket_handler(self.log, "stdout"))
self.selector.register(
stderr,
selectors.EVENT_READ,
- self._create_socket_handler(logger, "stderr",
log_level=logging.ERROR),
+ self._create_socket_handler(self.log, "stderr",
log_level=logging.ERROR),
)
self.selector.register(
logs,
selectors.EVENT_READ,
make_buffered_socket_reader(
- process_log_messages_from_subprocess(logger),
on_close=self._on_socket_closed
+ process_log_messages_from_subprocess(self.log),
on_close=self._on_socket_closed
),
)
self.selector.register(
@@ -658,8 +658,24 @@ class ActivitySubprocess(WatchedSubprocess):
self.client.task_instances.finish(
id=self.id, state=self.final_state,
when=datetime.now(tz=timezone.utc)
)
+
+ # Now at the last possible moment, when all logs and comms with the
subprocess has finished, lets
+ # upload the remote logs
+ self._upload_logs()
+
return self._exit_code
+ def _upload_logs(self):
+ """
+ Upload all log files found to the remote storage.
+
+ We upload logs from here after the task has finished to give us the
best possible chance of logs being
+ uploaded in case the task task.
+ """
+ from airflow.sdk.log import upload_to_remote
+
+ upload_to_remote(self.log)
+
def _monitor_subprocess(self):
"""
Monitor the subprocess until it exits.
diff --git a/task_sdk/src/airflow/sdk/log.py b/task_sdk/src/airflow/sdk/log.py
index 0bb06934c94..3da1d1e1f71 100644
--- a/task_sdk/src/airflow/sdk/log.py
+++ b/task_sdk/src/airflow/sdk/log.py
@@ -31,7 +31,7 @@ import msgspec
import structlog
if TYPE_CHECKING:
- from structlog.typing import EventDict, ExcInfo, Processor
+ from structlog.typing import EventDict, ExcInfo, FilteringBoundLogger,
Processor
__all__ = [
@@ -456,3 +456,44 @@ def init_log_file(local_relative_path: str) -> Path:
log.warning("OSError while changing ownership of the log file. %s", e)
return full_path
+
+
+def upload_to_remote(logger: FilteringBoundLogger):
+ # We haven't yet switched the Remote log handlers over, they are still
wired up in providers as
+ # logging.Handlers (but we should re-write most of them to just be the
upload and read instead of full
+ # variants.) In the mean time, lets just create the right handler directly
+ from airflow.configuration import conf
+ from airflow.utils.log.file_task_handler import FileTaskHandler
+ from airflow.utils.log.log_reader import TaskLogReader
+
+ raw_logger = getattr(logger, "_logger")
+
+ if not raw_logger or not hasattr(raw_logger, "_file"):
+ logger.warning("Unable to find log file, logger was of unexpected
type", type=type(logger))
+ return
+
+ fh = raw_logger._file
+ fname = fh.name
+
+ if fh.fileno() == 1 or not isinstance(fname, str):
+ # Logging to stdout, or something odd about this logger, don't try to
upload!
+ return
+ base_log_folder = conf.get("logging", "base_log_folder")
+ relative_path = Path(fname).relative_to(base_log_folder)
+
+ handler = TaskLogReader().log_handler
+ if not isinstance(handler, FileTaskHandler):
+ logger.warning(
+ "Airflow core logging is not using a FileTaskHandler, can't upload
logs to remote",
+ handler=type(handler),
+ )
+ return
+
+ # This is a _monstrosity_, and super fragile, but we don't want to do the
base FileTaskHandler
+ # set_context() which opens a real FH again. (And worse, in some cases it
_truncates_ the file too). This
+ # is just for the first Airflow 3 betas, but we will re-write a better
remote log interface that isn't
+ # tied to being a logging Handler.
+ handler.log_relative_path = relative_path.as_posix() # type:
ignore[attr-defined]
+ handler.upload_on_close = True # type: ignore[attr-defined]
+
+ handler.close()
diff --git a/task_sdk/tests/execution_time/test_supervisor.py
b/task_sdk/tests/execution_time/test_supervisor.py
index fde63fc7091..114f18c0cf4 100644
--- a/task_sdk/tests/execution_time/test_supervisor.py
+++ b/task_sdk/tests/execution_time/test_supervisor.py
@@ -505,6 +505,7 @@ class TestWatchedSubprocess:
mock_kill =
mocker.patch("airflow.sdk.execution_time.supervisor.WatchedSubprocess.kill")
proc = ActivitySubprocess(
+ log=mocker.MagicMock(),
id=TI_ID,
pid=mock_process.pid,
stdin=mocker.MagicMock(),
@@ -594,6 +595,7 @@ class TestWatchedSubprocess:
monkeypatch.setattr(ActivitySubprocess, "TASK_OVERTIME_THRESHOLD",
overtime_threshold)
mock_watched_subprocess = ActivitySubprocess(
+ log=mocker.MagicMock(),
id=TI_ID,
pid=12345,
stdin=mocker.Mock(),
@@ -735,6 +737,7 @@ class TestWatchedSubprocessKill:
@pytest.fixture
def watched_subprocess(self, mocker, mock_process):
proc = ActivitySubprocess(
+ log=mocker.MagicMock(),
id=TI_ID,
pid=12345,
stdin=mocker.Mock(),
@@ -918,6 +921,7 @@ class TestHandleRequest:
def watched_subprocess(self, mocker):
"""Fixture to provide a WatchedSubprocess instance."""
return ActivitySubprocess(
+ log=mocker.MagicMock(),
id=TI_ID,
pid=12345,
stdin=BytesIO(),
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
index f7f3911cf42..19d5c3f32a7 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -137,6 +137,7 @@ class TestDagFileProcessorManager:
proc.create_time.return_value = time.time()
proc.wait.return_value = 0
ret = DagFileProcessorProcess(
+ log=MagicMock(),
id=uuid7(),
pid=1234,
process=proc,
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index d0e4fbb283f..7d065dd2562 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -149,6 +149,7 @@ def supervisor_builder(mocker, session):
process = mocker.Mock(spec=psutil.Process, pid=10 * job.id + 1)
proc = TriggerRunnerSupervisor(
+ log=mocker.Mock(),
id=job.id,
job=job,
pid=process.pid,