This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 794ce730aa Revert "Remove remaining Airflow 2.5 backcompat code from
Google Provider (#36366)" (#36440)
794ce730aa is described below
commit 794ce730aa18ded08c06134ed311a39818168728
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Dec 26 23:48:13 2023 +0100
Revert "Remove remaining Airflow 2.5 backcompat code from Google Provider
(#36366)" (#36440)
This reverts commit 2c2763f806517ae514d5614d519966da02ff4371.
---
.../providers/google/cloud/log/gcs_task_handler.py | 41 ++++++++++++++++++++--
.../google/cloud/log/stackdriver_task_handler.py | 13 +++++--
.../google/cloud/log/test_gcs_task_handler.py | 9 +++--
3 files changed, 56 insertions(+), 7 deletions(-)
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py
b/airflow/providers/google/cloud/log/gcs_task_handler.py
index abc2bc8845..9921bb8753 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -26,6 +26,7 @@ from typing import TYPE_CHECKING, Collection
# not sure why but mypy complains on missing `storage` but it is clearly there
and is importable
from google.cloud import storage # type: ignore[attr-defined]
+from packaging.version import Version
from airflow.configuration import conf
from airflow.exceptions import AirflowNotFoundException
@@ -47,6 +48,18 @@ _DEFAULT_SCOPESS = frozenset(
logger = logging.getLogger(__name__)
+def get_default_delete_local_copy():
+ """Load delete_local_logs conf if Airflow version > 2.6 and return False
if not.
+
+ TODO: delete this function when min airflow version >= 2.6.
+ """
+ from airflow.version import version
+
+ if Version(version) < Version("2.6"):
+ return False
+ return conf.getboolean("logging", "delete_local_logs")
+
+
class GCSTaskHandler(FileTaskHandler, LoggingMixin):
"""
GCSTaskHandler is a python log handler that handles and reads task
instance logs.
@@ -95,8 +108,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
self.gcp_keyfile_dict = gcp_keyfile_dict
self.scopes = gcp_scopes
self.project_id = project_id
- self.delete_local_copy = kwargs.get(
- "delete_local_copy", conf.getboolean("logging",
"delete_local_logs")
+ self.delete_local_copy = (
+ kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else
get_default_delete_local_copy()
)
@cached_property
@@ -205,6 +218,30 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
messages.append(f"Unable to read remote log {e}")
return messages, logs
+ def _read(self, ti, try_number, metadata=None):
+ """
+ Read logs of given task instance and try_number from GCS.
+
+ If failed, read the log from task instance host machine.
+
+ todo: when min airflow version >= 2.6, remove this method
+
+ :param ti: task instance object
+ :param try_number: task instance try_number to read logs from
+ :param metadata: log metadata,
+ can be used for steaming log reading and auto-tailing.
+ """
+ if hasattr(super(), "_read_remote_logs"):
+ # from Airflow 2.6, we don't implement the `_read` method.
+ # if parent has _read_remote_logs, we're >= 2.6
+ return super()._read(ti, try_number, metadata)
+
+ messages, logs = self._read_remote_logs(ti, try_number, metadata)
+ if not logs:
+ return super()._read(ti, try_number, metadata)
+
+ return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs),
{"end_of_log": True}
+
def gcs_write(self, log, remote_log_location) -> bool:
"""
Write the log to the remote location and return `True`; fail silently
and return `False` on error.
diff --git a/airflow/providers/google/cloud/log/stackdriver_task_handler.py
b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
index f33e621f30..bf0e3bf4fb 100644
--- a/airflow/providers/google/cloud/log/stackdriver_task_handler.py
+++ b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -30,13 +30,21 @@ from google.cloud.logging_v2.types import
ListLogEntriesRequest, ListLogEntriesR
from airflow.providers.google.cloud.utils.credentials_provider import
get_credentials_and_project_id
from airflow.providers.google.common.consts import CLIENT_INFO
-from airflow.utils.log.trigger_handler import ctx_indiv_trigger
if TYPE_CHECKING:
+ from contextvars import ContextVar
+
from google.auth.credentials import Credentials
from airflow.models import TaskInstance
+try:
+ # todo: remove this conditional import when min airflow version >= 2.6
+ ctx_indiv_trigger: ContextVar | None
+ from airflow.utils.log.trigger_handler import ctx_indiv_trigger
+except ImportError:
+ ctx_indiv_trigger = None
+
DEFAULT_LOGGER_NAME = "airflow"
_GLOBAL_RESOURCE = Resource(type="global", labels={})
@@ -166,7 +174,8 @@ class StackdriverTaskHandler(logging.Handler):
"""
message = self.format(record)
ti = None
- if getattr(record, ctx_indiv_trigger.name, None):
+ # todo: remove ctx_indiv_trigger is not None check when min airflow
version >= 2.6
+ if ctx_indiv_trigger is not None and getattr(record,
ctx_indiv_trigger.name, None):
ti = getattr(record, "task_instance", None) # trigger context
labels = self._get_labels(ti)
self._transport.send(record, message, resource=self.resource,
labels=labels)
diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py
b/tests/providers/google/cloud/log/test_gcs_task_handler.py
index a3e929b985..2d4dd7340d 100644
--- a/tests/providers/google/cloud/log/test_gcs_task_handler.py
+++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py
@@ -248,8 +248,8 @@ class TestGCSTaskHandler:
)
@pytest.mark.parametrize(
- "delete_local_copy, expected_existence_of_local_copy",
- [(True, False), (False, True)],
+ "delete_local_copy, expected_existence_of_local_copy, airflow_version",
+ [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True,
"2.5.0"), (False, True, "2.5.0")],
)
@mock.patch(
"airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id",
@@ -265,9 +265,12 @@ class TestGCSTaskHandler:
local_log_location,
delete_local_copy,
expected_existence_of_local_copy,
+ airflow_version,
):
mock_blob.from_string.return_value.download_as_bytes.return_value =
b"CONTENT"
- with conf_vars({("logging", "delete_local_logs"):
str(delete_local_copy)}):
+ with conf_vars({("logging", "delete_local_logs"):
str(delete_local_copy)}), mock.patch(
+ "airflow.version.version", airflow_version
+ ):
handler = GCSTaskHandler(
base_log_folder=local_log_location,
gcs_log_folder="gs://bucket/remote/log/location",