This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 43deb993a0a Temporary fix to allow remote logging upload for task sdk 
(#47668)
43deb993a0a is described below

commit 43deb993a0abbce87332792c29ae706e355d0b14
Author: Ian Buss <[email protected]>
AuthorDate: Wed Mar 12 14:57:36 2025 +0000

    Temporary fix to allow remote logging upload for task sdk (#47668)
---
 task-sdk/src/airflow/sdk/execution_time/supervisor.py | 15 ++++++++++++++-
 task-sdk/src/airflow/sdk/log.py                       | 19 ++++++++++++++++---
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index f456b53cc62..39ad05aebf9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -645,6 +645,8 @@ class ActivitySubprocess(WatchedSubprocess):
     TASK_OVERTIME_THRESHOLD: ClassVar[float] = 20.0
     _task_end_time_monotonic: float | None = attrs.field(default=None, 
init=False)
 
+    _what: TaskInstance | None = attrs.field(default=None, init=False)
+
     decoder: ClassVar[TypeAdapter[ToSupervisor]] = TypeAdapter(ToSupervisor)
 
     @classmethod
@@ -667,6 +669,7 @@ class ActivitySubprocess(WatchedSubprocess):
 
     def _on_child_started(self, ti: TaskInstance, dag_rel_path: str | 
os.PathLike[str], bundle_info):
         """Send startup message to the subprocess."""
+        self._what = ti
         start_date = datetime.now(tz=timezone.utc)
         try:
             # We've forked, but the task won't start doing anything until we 
send it the StartupDetails
@@ -735,7 +738,17 @@ class ActivitySubprocess(WatchedSubprocess):
         """
         from airflow.sdk.log import upload_to_remote
 
-        upload_to_remote(self.log)
+        log_meta_dict = (
+            {
+                "dag_id": self._what.dag_id,
+                "task_id": self._what.task_id,
+                "run_id": self._what.run_id,
+                "try_number": str(self._what.try_number),
+            }
+            if self._what
+            else {}
+        )
+        upload_to_remote(self.log, log_meta_dict)
 
     def _monitor_subprocess(self):
         """
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index 3da1d1e1f71..eb44e21b2ef 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -458,13 +458,25 @@ def init_log_file(local_relative_path: str) -> Path:
     return full_path
 
 
-def upload_to_remote(logger: FilteringBoundLogger):
+def load_remote_log_handler() -> logging.Handler | None:
+    from airflow.logging_config import configure_logging as 
airflow_configure_logging
+    from airflow.utils.log.log_reader import TaskLogReader
+
+    try:
+        airflow_configure_logging()
+
+        return TaskLogReader().log_handler
+    finally:
+        # This is a _monstrosity_ but put our logging back immediately...
+        configure_logging()
+
+
+def upload_to_remote(logger: FilteringBoundLogger, log_meta_dict: dict[str, 
Any]):
     # We haven't yet switched the Remote log handlers over, they are still 
wired up in providers as
     # logging.Handlers (but we should re-write most of them to just be the 
upload and read instead of full
     # variants.) In the mean time, lets just create the right handler directly
     from airflow.configuration import conf
     from airflow.utils.log.file_task_handler import FileTaskHandler
-    from airflow.utils.log.log_reader import TaskLogReader
 
     raw_logger = getattr(logger, "_logger")
 
@@ -481,7 +493,7 @@ def upload_to_remote(logger: FilteringBoundLogger):
     base_log_folder = conf.get("logging", "base_log_folder")
     relative_path = Path(fname).relative_to(base_log_folder)
 
-    handler = TaskLogReader().log_handler
+    handler = load_remote_log_handler()
     if not isinstance(handler, FileTaskHandler):
         logger.warning(
             "Airflow core logging is not using a FileTaskHandler, can't upload 
logs to remote",
@@ -493,6 +505,7 @@ def upload_to_remote(logger: FilteringBoundLogger):
     # set_context() which opens a real FH again. (And worse, in some cases it 
_truncates_ the file too). This
     # is just for the first Airflow 3 betas, but we will re-write a better 
remote log interface that isn't
     # tied to being a logging Handler.
+    handler.log_meta_dict = log_meta_dict  # type: ignore[attr-defined]
     handler.log_relative_path = relative_path.as_posix()  # type: 
ignore[attr-defined]
     handler.upload_on_close = True  # type: ignore[attr-defined]
 

Reply via email to