This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f80a44588db Fix RuntimeError when using OpensearchTaskHandler as
remote log handler (#53529)
f80a44588db is described below
commit f80a44588db1fb6ea2770a0b3147520598fa84cf
Author: Owen Leung <[email protected]>
AuthorDate: Mon Jul 21 21:45:52 2025 +0800
Fix RuntimeError when using OpensearchTaskHandler as remote log handler
(#53529)
* Add supports_external_link
* Make ExternalLoggingMixin's metaclass as ABCMeta
* Add get_external_log_url to OSTaskHandler
* Add log_name to OSTaskHandler
* Fix incompatible signature for get_external_log_url
---
.../src/airflow/utils/log/logging_mixin.py | 2 +-
.../providers/opensearch/log/os_task_handler.py | 30 +++++++++++++++++++---
2 files changed, 28 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/utils/log/logging_mixin.py
b/airflow-core/src/airflow/utils/log/logging_mixin.py
index 6bc6efcdd7f..55f34d58fb7 100644
--- a/airflow-core/src/airflow/utils/log/logging_mixin.py
+++ b/airflow-core/src/airflow/utils/log/logging_mixin.py
@@ -129,7 +129,7 @@ class LoggingMixin:
set_context(self.log, context)
-class ExternalLoggingMixin:
+class ExternalLoggingMixin(metaclass=abc.ABCMeta):
"""Define a log handler based on an external service (e.g. ELK,
StackDriver)."""
@property
diff --git
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
index 4d567ab3340..57c68e7ab9c 100644
---
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
+++
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
@@ -209,9 +209,11 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
extras={
"dag_id": str(ti.dag_id),
"task_id": str(ti.task_id),
- date_key: self._clean_date(ti.logical_date)
- if AIRFLOW_V_3_0_PLUS
- else self._clean_date(ti.execution_date),
+ date_key: (
+ self._clean_date(ti.logical_date)
+ if AIRFLOW_V_3_0_PLUS
+ else self._clean_date(ti.execution_date)
+ ),
"try_number": str(ti.try_number),
"log_id": self._render_log_id(ti, ti.try_number),
},
@@ -597,3 +599,25 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
# Just a safe-guard to preserve backwards-compatibility
return hit.message
+
+ @property
+ def supports_external_link(self) -> bool:
+ """
+ Whether we can support external links.
+
+ TODO: It should support frontend just like ElasticSearchTaskhandler.
+ """
+ return False
+
+ def get_external_log_url(self, task_instance, try_number) -> str:
+ """
+ Create an address for an external log collecting service.
+
+ TODO: It should support frontend just like ElasticSearchTaskhandler.
+ """
+ return ""
+
+ @property
+ def log_name(self) -> str:
+ """The log name."""
+ return self.LOG_NAME