This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 43deb993a0a Temporary fix to allow remote logging upload for task sdk
(#47668)
43deb993a0a is described below
commit 43deb993a0abbce87332792c29ae706e355d0b14
Author: Ian Buss <[email protected]>
AuthorDate: Wed Mar 12 14:57:36 2025 +0000
Temporary fix to allow remote logging upload for task sdk (#47668)
---
task-sdk/src/airflow/sdk/execution_time/supervisor.py | 15 ++++++++++++++-
task-sdk/src/airflow/sdk/log.py | 19 ++++++++++++++++---
2 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index f456b53cc62..39ad05aebf9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -645,6 +645,8 @@ class ActivitySubprocess(WatchedSubprocess):
TASK_OVERTIME_THRESHOLD: ClassVar[float] = 20.0
_task_end_time_monotonic: float | None = attrs.field(default=None,
init=False)
+ _what: TaskInstance | None = attrs.field(default=None, init=False)
+
decoder: ClassVar[TypeAdapter[ToSupervisor]] = TypeAdapter(ToSupervisor)
@classmethod
@@ -667,6 +669,7 @@ class ActivitySubprocess(WatchedSubprocess):
def _on_child_started(self, ti: TaskInstance, dag_rel_path: str |
os.PathLike[str], bundle_info):
"""Send startup message to the subprocess."""
+ self._what = ti
start_date = datetime.now(tz=timezone.utc)
try:
# We've forked, but the task won't start doing anything until we
send it the StartupDetails
@@ -735,7 +738,17 @@ class ActivitySubprocess(WatchedSubprocess):
"""
from airflow.sdk.log import upload_to_remote
- upload_to_remote(self.log)
+ log_meta_dict = (
+ {
+ "dag_id": self._what.dag_id,
+ "task_id": self._what.task_id,
+ "run_id": self._what.run_id,
+ "try_number": str(self._what.try_number),
+ }
+ if self._what
+ else {}
+ )
+ upload_to_remote(self.log, log_meta_dict)
def _monitor_subprocess(self):
"""
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index 3da1d1e1f71..eb44e21b2ef 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -458,13 +458,25 @@ def init_log_file(local_relative_path: str) -> Path:
return full_path
-def upload_to_remote(logger: FilteringBoundLogger):
+def load_remote_log_handler() -> logging.Handler | None:
+ from airflow.logging_config import configure_logging as
airflow_configure_logging
+ from airflow.utils.log.log_reader import TaskLogReader
+
+ try:
+ airflow_configure_logging()
+
+ return TaskLogReader().log_handler
+ finally:
+ # This is a _monstrosity_ but put our logging back immediately...
+ configure_logging()
+
+
+def upload_to_remote(logger: FilteringBoundLogger, log_meta_dict: dict[str,
Any]):
# We haven't yet switched the Remote log handlers over, they are still
wired up in providers as
# logging.Handlers (but we should re-write most of them to just be the
upload and read instead of full
# variants.) In the mean time, lets just create the right handler directly
from airflow.configuration import conf
from airflow.utils.log.file_task_handler import FileTaskHandler
- from airflow.utils.log.log_reader import TaskLogReader
raw_logger = getattr(logger, "_logger")
@@ -481,7 +493,7 @@ def upload_to_remote(logger: FilteringBoundLogger):
base_log_folder = conf.get("logging", "base_log_folder")
relative_path = Path(fname).relative_to(base_log_folder)
- handler = TaskLogReader().log_handler
+ handler = load_remote_log_handler()
if not isinstance(handler, FileTaskHandler):
logger.warning(
"Airflow core logging is not using a FileTaskHandler, can't upload
logs to remote",
@@ -493,6 +505,7 @@ def upload_to_remote(logger: FilteringBoundLogger):
# set_context() which opens a real FH again. (And worse, in some cases it
_truncates_ the file too). This
# is just for the first Airflow 3 betas, but we will re-write a better
remote log interface that isn't
# tied to being a logging Handler.
+ handler.log_meta_dict = log_meta_dict # type: ignore[attr-defined]
handler.log_relative_path = relative_path.as_posix() # type:
ignore[attr-defined]
handler.upload_on_close = True # type: ignore[attr-defined]