This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7fc80981334 Re-enable uploading to remote blob storage from Trigger 
and executors for the beta (#47115)
7fc80981334 is described below

commit 7fc8098133452a9c9b0eb5a1bc7e17bde5f8ce62
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Feb 26 20:12:05 2025 +0000

    Re-enable uploading to remote blob storage from Trigger and executors for 
the beta (#47115)
    
    The way this achieved is the epitome of hackyness, but it works well enough 
for
    S3, GCS and WASB task handlers.
    
    It won't work for the "non-blob" based logging backends such as
    CloudWatchLogs, and it only works with the way the 3 blob-based log backends
    are currently implemented.
    
    I want remote logs to be working for most people for the Beta, and I will
    follow up with a better fix (that doesn't rely on internal implementation
    details and works for log "streams" too.)
---
 airflow/jobs/triggerer_job_runner.py               | 22 ++++++++---
 airflow/models/trigger.py                          |  5 ++-
 .../src/airflow/sdk/execution_time/supervisor.py   | 32 ++++++++++++----
 task_sdk/src/airflow/sdk/log.py                    | 43 +++++++++++++++++++++-
 task_sdk/tests/execution_time/test_supervisor.py   |  4 ++
 tests/dag_processing/test_manager.py               |  1 +
 tests/jobs/test_triggerer_job.py                   |  1 +
 7 files changed, 92 insertions(+), 16 deletions(-)

diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index 0e82cd1c193..b630f66ee13 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -234,6 +234,15 @@ class TriggerLoggingFactory:
         self.bound_logger = logger
         return logger
 
+    def upload_to_remote(self):
+        from airflow.sdk.log import upload_to_remote
+
+        if not hasattr(self, "bound_logger"):
+            # Never actually called, nothing to do
+            return
+
+        upload_to_remote(self.bound_logger)
+
 
 @attrs.define(kw_only=True)
 class TriggerRunnerSupervisor(WatchedSubprocess):
@@ -299,7 +308,8 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
                 self.cancelling_triggers.discard(id)
                 # Remove logger from the cache, and since structlog doesn't 
have an explicit close method, we
                 # only need to remove the last reference to it to close the 
open FH
-                self.logger_cache.pop(id, None)
+                if factory := self.logger_cache.pop(id, None):
+                    factory.upload_to_remote()
             return
 
         raise ValueError(f"Unknown message type {type(msg)}")
@@ -460,10 +470,8 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
             self.cancelling_triggers.update(cancel_trigger_ids)
             self._send(messages.CancelTriggers(ids=cancel_trigger_ids))
 
-    def _register_pipe_readers(
-        self, logger: FilteringBoundLogger, stdout: socket, stderr: socket, 
requests: socket, logs: socket
-    ):
-        super()._register_pipe_readers(logger, stdout, stderr, requests, logs)
+    def _register_pipe_readers(self, stdout: socket, stderr: socket, requests: 
socket, logs: socket):
+        super()._register_pipe_readers(stdout, stderr, requests, logs)
 
         # We want to handle logging differently here, so un-register the one 
our parent class created
         self.selector.unregister(logs)
@@ -488,7 +496,9 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
 
         def get_logger(trigger_id: int) -> WrappedLogger:
             # TODO: Is a separate dict worth it, or should we make 
`self.running_triggers` a dict?
-            return self.logger_cache[trigger_id](processors)
+            if factory := self.logger_cache.get(trigger_id):
+                return factory(processors)
+            return fallback_log
 
         # We need to look at the json, pull out the
         while True:
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 478eb89a675..e7bdca0b5bd 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -246,7 +246,10 @@ class Trigger(Base):
             handle_event_submit(event, task_instance=task_instance, 
session=session)
 
         # Send an event to assets
-        trigger = session.scalars(select(cls).where(cls.id == 
trigger_id)).one()
+        trigger = session.scalars(select(cls).where(cls.id == 
trigger_id)).one_or_none()
+        if trigger is None:
+            # Already deleted for some reason
+            return
         for asset in trigger.assets:
             AssetManager.register_asset_change(
                 asset=asset.to_public(), session=session, 
extra={"from_trigger": True}
diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
index fada6105bce..8bc867a3f5b 100644
--- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -316,6 +316,8 @@ class WatchedSubprocess:
 
     selector: selectors.BaseSelector = 
attrs.field(factory=selectors.DefaultSelector)
 
+    log: FilteringBoundLogger
+
     @classmethod
     def start(
         cls,
@@ -363,17 +365,17 @@ class WatchedSubprocess:
         # other end of the pair open
         cls._close_unused_sockets(child_stdin, child_stdout, child_stderr, 
child_comms, child_logs)
 
+        logger = logger or cast("FilteringBoundLogger", 
structlog.get_logger(logger_name="task").bind())
         proc = cls(
             pid=pid,
             stdin=feed_stdin,
             process=psutil.Process(pid),
             requests_fd=requests_fd,
+            log=logger,
             **constructor_kwargs,
         )
 
-        logger = logger or cast("FilteringBoundLogger", 
structlog.get_logger(logger_name="task").bind())
         proc._register_pipe_readers(
-            logger=logger,
             stdout=read_stdout,
             stderr=read_stderr,
             requests=read_msgs,
@@ -382,26 +384,24 @@ class WatchedSubprocess:
 
         return proc
 
-    def _register_pipe_readers(
-        self, logger: FilteringBoundLogger, stdout: socket, stderr: socket, 
requests: socket, logs: socket
-    ):
+    def _register_pipe_readers(self, stdout: socket, stderr: socket, requests: 
socket, logs: socket):
         """Register handlers for subprocess communication channels."""
         # self.selector is a way of registering a handler/callback to be 
called when the given IO channel has
         # activity to read on 
(https://www.man7.org/linux/man-pages/man2/select.2.html etc, but better
         # alternatives are used automatically) -- this is a way of having 
"event-based" code, but without
         # needing full async, to read and process output from each socket as 
it is received.
 
-        self.selector.register(stdout, selectors.EVENT_READ, 
self._create_socket_handler(logger, "stdout"))
+        self.selector.register(stdout, selectors.EVENT_READ, 
self._create_socket_handler(self.log, "stdout"))
         self.selector.register(
             stderr,
             selectors.EVENT_READ,
-            self._create_socket_handler(logger, "stderr", 
log_level=logging.ERROR),
+            self._create_socket_handler(self.log, "stderr", 
log_level=logging.ERROR),
         )
         self.selector.register(
             logs,
             selectors.EVENT_READ,
             make_buffered_socket_reader(
-                process_log_messages_from_subprocess(logger), 
on_close=self._on_socket_closed
+                process_log_messages_from_subprocess(self.log), 
on_close=self._on_socket_closed
             ),
         )
         self.selector.register(
@@ -658,8 +658,24 @@ class ActivitySubprocess(WatchedSubprocess):
             self.client.task_instances.finish(
                 id=self.id, state=self.final_state, 
when=datetime.now(tz=timezone.utc)
             )
+
+        # Now at the last possible moment, when all logs and comms with the 
subprocess has finished, lets
+        # upload the remote logs
+        self._upload_logs()
+
         return self._exit_code
 
+    def _upload_logs(self):
+        """
+        Upload all log files found to the remote storage.
+
+        We upload logs from here after the task has finished to give us the 
best possible chance of logs being
+        uploaded in case the task task.
+        """
+        from airflow.sdk.log import upload_to_remote
+
+        upload_to_remote(self.log)
+
     def _monitor_subprocess(self):
         """
         Monitor the subprocess until it exits.
diff --git a/task_sdk/src/airflow/sdk/log.py b/task_sdk/src/airflow/sdk/log.py
index 0bb06934c94..3da1d1e1f71 100644
--- a/task_sdk/src/airflow/sdk/log.py
+++ b/task_sdk/src/airflow/sdk/log.py
@@ -31,7 +31,7 @@ import msgspec
 import structlog
 
 if TYPE_CHECKING:
-    from structlog.typing import EventDict, ExcInfo, Processor
+    from structlog.typing import EventDict, ExcInfo, FilteringBoundLogger, 
Processor
 
 
 __all__ = [
@@ -456,3 +456,44 @@ def init_log_file(local_relative_path: str) -> Path:
         log.warning("OSError while changing ownership of the log file. %s", e)
 
     return full_path
+
+
+def upload_to_remote(logger: FilteringBoundLogger):
+    # We haven't yet switched the Remote log handlers over, they are still 
wired up in providers as
+    # logging.Handlers (but we should re-write most of them to just be the 
upload and read instead of full
+    # variants.) In the mean time, lets just create the right handler directly
+    from airflow.configuration import conf
+    from airflow.utils.log.file_task_handler import FileTaskHandler
+    from airflow.utils.log.log_reader import TaskLogReader
+
+    raw_logger = getattr(logger, "_logger")
+
+    if not raw_logger or not hasattr(raw_logger, "_file"):
+        logger.warning("Unable to find log file, logger was of unexpected 
type", type=type(logger))
+        return
+
+    fh = raw_logger._file
+    fname = fh.name
+
+    if fh.fileno() == 1 or not isinstance(fname, str):
+        # Logging to stdout, or something odd about this logger, don't try to 
upload!
+        return
+    base_log_folder = conf.get("logging", "base_log_folder")
+    relative_path = Path(fname).relative_to(base_log_folder)
+
+    handler = TaskLogReader().log_handler
+    if not isinstance(handler, FileTaskHandler):
+        logger.warning(
+            "Airflow core logging is not using a FileTaskHandler, can't upload 
logs to remote",
+            handler=type(handler),
+        )
+        return
+
+    # This is a _monstrosity_, and super fragile, but we don't want to do the 
base FileTaskHandler
+    # set_context() which opens a real FH again. (And worse, in some cases it 
_truncates_ the file too). This
+    # is just for the first Airflow 3 betas, but we will re-write a better 
remote log interface that isn't
+    # tied to being a logging Handler.
+    handler.log_relative_path = relative_path.as_posix()  # type: 
ignore[attr-defined]
+    handler.upload_on_close = True  # type: ignore[attr-defined]
+
+    handler.close()
diff --git a/task_sdk/tests/execution_time/test_supervisor.py 
b/task_sdk/tests/execution_time/test_supervisor.py
index fde63fc7091..114f18c0cf4 100644
--- a/task_sdk/tests/execution_time/test_supervisor.py
+++ b/task_sdk/tests/execution_time/test_supervisor.py
@@ -505,6 +505,7 @@ class TestWatchedSubprocess:
         mock_kill = 
mocker.patch("airflow.sdk.execution_time.supervisor.WatchedSubprocess.kill")
 
         proc = ActivitySubprocess(
+            log=mocker.MagicMock(),
             id=TI_ID,
             pid=mock_process.pid,
             stdin=mocker.MagicMock(),
@@ -594,6 +595,7 @@ class TestWatchedSubprocess:
         monkeypatch.setattr(ActivitySubprocess, "TASK_OVERTIME_THRESHOLD", 
overtime_threshold)
 
         mock_watched_subprocess = ActivitySubprocess(
+            log=mocker.MagicMock(),
             id=TI_ID,
             pid=12345,
             stdin=mocker.Mock(),
@@ -735,6 +737,7 @@ class TestWatchedSubprocessKill:
     @pytest.fixture
     def watched_subprocess(self, mocker, mock_process):
         proc = ActivitySubprocess(
+            log=mocker.MagicMock(),
             id=TI_ID,
             pid=12345,
             stdin=mocker.Mock(),
@@ -918,6 +921,7 @@ class TestHandleRequest:
     def watched_subprocess(self, mocker):
         """Fixture to provide a WatchedSubprocess instance."""
         return ActivitySubprocess(
+            log=mocker.MagicMock(),
             id=TI_ID,
             pid=12345,
             stdin=BytesIO(),
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index f7f3911cf42..19d5c3f32a7 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -137,6 +137,7 @@ class TestDagFileProcessorManager:
         proc.create_time.return_value = time.time()
         proc.wait.return_value = 0
         ret = DagFileProcessorProcess(
+            log=MagicMock(),
             id=uuid7(),
             pid=1234,
             process=proc,
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index d0e4fbb283f..7d065dd2562 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -149,6 +149,7 @@ def supervisor_builder(mocker, session):
 
         process = mocker.Mock(spec=psutil.Process, pid=10 * job.id + 1)
         proc = TriggerRunnerSupervisor(
+            log=mocker.Mock(),
             id=job.id,
             job=job,
             pid=process.pid,

Reply via email to