This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 07f0e33119f [task sdk] Remove non-standard log_meta_dict param from
upload_to_remote (#47748)
07f0e33119f is described below
commit 07f0e33119fec68199f83b6a857af7754c8c7e19
Author: Ian Buss <[email protected]>
AuthorDate: Thu Mar 13 20:29:28 2025 +0000
[task sdk] Remove non-standard log_meta_dict param from upload_to_remote
(#47748)
---
task-sdk/src/airflow/sdk/execution_time/supervisor.py | 15 +--------------
task-sdk/src/airflow/sdk/log.py | 3 +--
2 files changed, 2 insertions(+), 16 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index e7af31635ee..abbc9a405c4 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -651,8 +651,6 @@ class ActivitySubprocess(WatchedSubprocess):
TASK_OVERTIME_THRESHOLD: ClassVar[float] = 20.0
_task_end_time_monotonic: float | None = attrs.field(default=None,
init=False)
- _what: TaskInstance | None = attrs.field(default=None, init=False)
-
decoder: ClassVar[TypeAdapter[ToSupervisor]] = TypeAdapter(ToSupervisor)
@classmethod
@@ -675,7 +673,6 @@ class ActivitySubprocess(WatchedSubprocess):
def _on_child_started(self, ti: TaskInstance, dag_rel_path: str |
os.PathLike[str], bundle_info):
"""Send startup message to the subprocess."""
- self._what = ti
start_date = datetime.now(tz=timezone.utc)
try:
# We've forked, but the task won't start doing anything until we
send it the StartupDetails
@@ -744,17 +741,7 @@ class ActivitySubprocess(WatchedSubprocess):
"""
from airflow.sdk.log import upload_to_remote
- log_meta_dict = (
- {
- "dag_id": self._what.dag_id,
- "task_id": self._what.task_id,
- "run_id": self._what.run_id,
- "try_number": str(self._what.try_number),
- }
- if self._what
- else {}
- )
- upload_to_remote(self.process_log, log_meta_dict)
+ upload_to_remote(self.process_log)
def _monitor_subprocess(self):
"""
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index a9783af2ed0..426d12a67d6 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -477,7 +477,7 @@ def load_remote_log_handler() -> logging.Handler | None:
configure_logging()
-def upload_to_remote(logger: FilteringBoundLogger, log_meta_dict: dict[str,
Any]):
+def upload_to_remote(logger: FilteringBoundLogger):
# We haven't yet switched the Remote log handlers over, they are still
wired up in providers as
# logging.Handlers (but we should re-write most of them to just be the
upload and read instead of full
# variants.) In the mean time, lets just create the right handler directly
@@ -511,7 +511,6 @@ def upload_to_remote(logger: FilteringBoundLogger,
log_meta_dict: dict[str, Any]
# set_context() which opens a real FH again. (And worse, in some cases it
_truncates_ the file too). This
# is just for the first Airflow 3 betas, but we will re-write a better
remote log interface that isn't
# tied to being a logging Handler.
- handler.log_meta_dict = log_meta_dict # type: ignore[attr-defined]
handler.log_relative_path = relative_path.as_posix() # type:
ignore[attr-defined]
handler.upload_on_close = True # type: ignore[attr-defined]