This is an automated email from the ASF dual-hosted git repository.
pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b9f3a5392f Extend task context logging support for remote logging
using GCP GCS (#32970)
b9f3a5392f is described below
commit b9f3a5392f1b2fb8043ca56b2c8ded20f40e2297
Author: Pankaj Koti <[email protected]>
AuthorDate: Fri Nov 17 20:17:13 2023 +0530
Extend task context logging support for remote logging using GCP GCS
(#32970)
With the addition of taxt context logging feature in PR #32646,
this PR extends the feature to GCP Cloud storage when is it set as
remote logging store. Here, backward compatibility is ensured for
older versions of Airflow that do not have the feature included in
Airflow Core.
---
airflow/providers/google/cloud/log/gcs_task_handler.py | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py
b/airflow/providers/google/cloud/log/gcs_task_handler.py
index cebe9a829e..39d0f072a8 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -22,7 +22,7 @@ import os
import shutil
from functools import cached_property
from pathlib import Path
-from typing import Collection
+from typing import TYPE_CHECKING, Collection
# not sure why but mypy complains on missing `storage` but it is clearly there
and is importable
from google.cloud import storage # type: ignore[attr-defined]
@@ -36,6 +36,9 @@ from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
+if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
+
_DEFAULT_SCOPESS = frozenset(
[
"https://www.googleapis.com/auth/devstorage.read_write",
@@ -96,6 +99,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
**kwargs,
):
super().__init__(base_log_folder, filename_template)
+ self.handler: logging.FileHandler | None = None
self.remote_base = gcs_log_folder
self.log_relative_path = ""
self.closed = False
@@ -137,15 +141,21 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
project=self.project_id if self.project_id else project_id,
)
- def set_context(self, ti):
- super().set_context(ti)
+ def set_context(self, ti: TaskInstance, *, identifier: str | None = None)
-> None:
+ if getattr(self, "supports_task_context_logging", False):
+ super().set_context(ti, identifier=identifier)
+ else:
+ super().set_context(ti)
# Log relative path is used to construct local and remote
# log path to upload log files into GCS and read from the
# remote location.
+ if TYPE_CHECKING:
+ assert self.handler is not None
+
full_path = self.handler.baseFilename
self.log_relative_path =
Path(full_path).relative_to(self.local_base).as_posix()
is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
- self.upload_on_close = is_trigger_log_context or not ti.raw
+ self.upload_on_close = is_trigger_log_context or not getattr(ti,
"raw", None)
def close(self):
"""Close and upload local log file to remote storage GCS."""