This is an automated email from the ASF dual-hosted git repository.
pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 747f00f2aa Extend task context logging support for remote logging
using Elasticsearch (#32977)
747f00f2aa is described below
commit 747f00f2aa159642f3b2dddbb9908c01b8b3b91c
Author: Pankaj Koti <[email protected]>
AuthorDate: Tue Nov 21 11:55:03 2023 +0530
Extend task context logging support for remote logging using Elasticsearch
(#32977)
* Extend task context logging support for remote logging using Elasticsearch
With the addition of task context logging feature in PR #32646,
this PR extends the feature to Elasticsearch when is it set as
remote logging store. Here, backward compatibility is ensured for
older versions of Airflow that do not have the feature included
in Airflow Core.
* update ensure_ti
---------
Co-authored-by: Daniel Standish
<[email protected]>
---
.../providers/elasticsearch/log/es_task_handler.py | 46 +++++++++++++++++++---
1 file changed, 41 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py
b/airflow/providers/elasticsearch/log/es_task_handler.py
index 79f9ad0b41..1e8c75b7e3 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -34,7 +34,7 @@ import pendulum
from elasticsearch.exceptions import NotFoundError
from airflow.configuration import conf
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models.dagrun import DagRun
from airflow.providers.elasticsearch.log.es_json_formatter import
ElasticsearchJSONFormatter
from airflow.providers.elasticsearch.log.es_response import
ElasticSearchResponse, Hit
@@ -46,7 +46,8 @@ from airflow.utils.session import create_session
if TYPE_CHECKING:
from datetime import datetime
- from airflow.models.taskinstance import TaskInstance
+ from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+
LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""}
# Elasticsearch hosted log type
@@ -84,6 +85,32 @@ def get_es_kwargs_from_config() -> dict[str, Any]:
return kwargs_dict
+def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
+ """Given TI | TIKey, return a TI object.
+
+ Will raise exception if no TI is found in the database.
+ """
+ from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+
+ if not isinstance(ti, TaskInstanceKey):
+ return ti
+ val = (
+ session.query(TaskInstance)
+ .filter(
+ TaskInstance.task_id == ti.task_id,
+ TaskInstance.dag_id == ti.dag_id,
+ TaskInstance.run_id == ti.run_id,
+ TaskInstance.map_index == ti.map_index,
+ )
+ .one_or_none()
+ )
+ if isinstance(val, TaskInstance):
+ val._try_number = ti.try_number
+ return val
+ else:
+ raise AirflowException(f"Could not find TaskInstance for {ti}")
+
+
class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin,
LoggingMixin):
"""
ElasticsearchTaskHandler is a python log handler that reads logs from
Elasticsearch.
@@ -182,8 +209,12 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
return host
- def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
+ def _render_log_id(self, ti: TaskInstance | TaskInstanceKey, try_number:
int) -> str:
+ from airflow.models.taskinstance import TaskInstanceKey
+
with create_session() as session:
+ if isinstance(ti, TaskInstanceKey):
+ ti = _ensure_ti(ti, session)
dag_run = ti.get_dagrun(session=session)
if USE_PER_RUN_LOG_ID:
log_id_template =
dag_run.get_log_template(session=session).elasticsearch_id
@@ -377,11 +408,13 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
setattr(record, self.offset_field, int(time.time() * (10**9)))
self.handler.emit(record)
- def set_context(self, ti: TaskInstance, **kwargs) -> None:
+ def set_context(self, ti: TaskInstance, *, identifier: str | None = None)
-> None:
"""
Provide task_instance context to airflow task handler.
:param ti: task instance object
+ :param identifier: if set, identifies the Airflow component which is
relaying logs from
+ exceptional scenarios related to the task instance
"""
is_trigger_log_context = getattr(ti, "is_trigger_log_context", None)
is_ti_raw = getattr(ti, "raw", None)
@@ -410,7 +443,10 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
self.handler.setLevel(self.level)
self.handler.setFormatter(self.formatter)
else:
- super().set_context(ti)
+ if getattr(self, "supports_task_context_logging", False):
+ super().set_context(ti, identifier=identifier)
+ else:
+ super().set_context(ti)
self.context_set = True
def close(self) -> None: