This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 05fe5a4ccf2 Add write feature to ESTaskHandler (#44973)
05fe5a4ccf2 is described below
commit 05fe5a4ccf23bd2775d64ba552b1e57c67c87cfd
Author: Owen Leung <[email protected]>
AuthorDate: Tue Jan 14 00:15:32 2025 +0800
Add write feature to ESTaskHandler (#44973)
* Add write feature to ESTaskHandler
---
airflow/config_templates/airflow_local_settings.py | 4 ++
.../provider_config_fallback_defaults.cfg | 2 +
.../logging/index.rst | 19 +++++++
docs/spelling_wordlist.txt | 1 +
.../providers/elasticsearch/log/es_task_handler.py | 62 ++++++++++++++++++++--
.../airflow/providers/elasticsearch/provider.yaml | 14 +++++
.../elasticsearch/log/test_es_task_handler.py | 26 +++++++--
7 files changed, 121 insertions(+), 7 deletions(-)
diff --git a/airflow/config_templates/airflow_local_settings.py
b/airflow/config_templates/airflow_local_settings.py
index cee5b428f6d..f440261dafc 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -308,8 +308,10 @@ if REMOTE_LOGGING:
ELASTICSEARCH_END_OF_LOG_MARK: str =
conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
ELASTICSEARCH_FRONTEND: str =
conf.get_mandatory_value("elasticsearch", "frontend")
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch",
"WRITE_STDOUT")
+ ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch",
"WRITE_TO_ES")
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch",
"JSON_FORMAT")
ELASTICSEARCH_JSON_FIELDS: str =
conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
+ ELASTICSEARCH_TARGET_INDEX: str =
conf.get_mandatory_value("elasticsearch", "TARGET_INDEX")
ELASTICSEARCH_HOST_FIELD: str =
conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
ELASTICSEARCH_OFFSET_FIELD: str =
conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
@@ -322,6 +324,8 @@ if REMOTE_LOGGING:
"host": ELASTICSEARCH_HOST,
"frontend": ELASTICSEARCH_FRONTEND,
"write_stdout": ELASTICSEARCH_WRITE_STDOUT,
+ "write_to_es": ELASTICSEARCH_WRITE_TO_ES,
+ "target_index": ELASTICSEARCH_TARGET_INDEX,
"json_format": ELASTICSEARCH_JSON_FORMAT,
"json_fields": ELASTICSEARCH_JSON_FIELDS,
"host_field": ELASTICSEARCH_HOST_FIELD,
diff --git a/airflow/config_templates/provider_config_fallback_defaults.cfg
b/airflow/config_templates/provider_config_fallback_defaults.cfg
index ba92feaef47..b49c633c5af 100644
--- a/airflow/config_templates/provider_config_fallback_defaults.cfg
+++ b/airflow/config_templates/provider_config_fallback_defaults.cfg
@@ -82,6 +82,8 @@ json_fields = asctime, filename, lineno, levelname, message
host_field = host
offset_field = offset
index_patterns = _all
+write_to_es = False
+target_index = airflow-logs
[elasticsearch_configs]
use_ssl = False
diff --git a/docs/apache-airflow-providers-elasticsearch/logging/index.rst
b/docs/apache-airflow-providers-elasticsearch/logging/index.rst
index 7b56958cafe..eaa46def53b 100644
--- a/docs/apache-airflow-providers-elasticsearch/logging/index.rst
+++ b/docs/apache-airflow-providers-elasticsearch/logging/index.rst
@@ -22,6 +22,8 @@ Writing logs to Elasticsearch
Airflow can be configured to read task logs from Elasticsearch and optionally
write logs to stdout in standard or json format. These logs can later be
collected and forwarded to the Elasticsearch cluster using tools like fluentd,
logstash or others.
+Airflow also supports writing log to Elasticsearch directly without requiring
additional software like filebeat and logstash. To enable this feature, set
``write_to_es`` and ``json_format`` to ``True`` and ``write_stdout`` to
``False`` in ``airflow.cfg``. Please be aware that if you set both
``write_to_es`` and ``delete_local_logs`` in logging section to true, airflow
will delete the local copy of task logs upon successfully writing task logs to
ElasticSearch.
+
You can choose to have all task logs from workers output to the highest parent
level process, instead of the standard file locations. This allows for some
additional flexibility in container environments like Kubernetes, where
container stdout is already being logged to the host nodes. From there a log
shipping tool can be used to forward them along to Elasticsearch. To use this
feature, set the ``write_stdout`` option in ``airflow.cfg``.
You can also choose to have the logs output in a JSON format, using the
``json_format`` option. Airflow uses the standard Python logging module and
JSON fields are directly extracted from the LogRecord object. To use this
feature, set the ``json_fields`` option in ``airflow.cfg``. Add the fields to
the comma-delimited string that you want collected for the logs. These fields
are from the LogRecord object in the ``logging`` module. `Documentation on
different attributes can be found here [...]
@@ -47,6 +49,21 @@ To output task logs to stdout in JSON format, the following
config could be used
write_stdout = True
json_format = True
+To output task logs to ElasticSearch, the following config could be used: (set
``delete_local_logs`` to true if you don't want retain a local copy of task log)
+
+.. code-block:: ini
+
+ [logging]
+ remote_logging = True
+ delete_local_logs = False
+
+ [elasticsearch]
+ host = <host>:<port>
+ write_stdout = False
+ json_format = True
+ write_to_es = True
+ target_index = [name of the index to store logs]
+
.. _write-logs-elasticsearch-tls:
Writing logs to Elasticsearch over TLS
@@ -55,6 +72,8 @@ Writing logs to Elasticsearch over TLS
To add custom configurations to ElasticSearch (e.g. turning on ``ssl_verify``,
adding a custom self-signed
cert, etc.) use the ``elasticsearch_configs`` setting in your ``airflow.cfg``
+Note that these configurations also apply when you enable writing logs to
ElasticSearch
+
.. code-block:: ini
[logging]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index dafe706a1c3..d1a1e62d521 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -633,6 +633,7 @@ fetchmany
fetchone
FieldMask
Filebeat
+filebeat
filehandle
fileloc
filelocs
diff --git
a/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py
b/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py
index 241fb14aa23..9881f1ac5dc 100644
--- a/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -19,7 +19,11 @@ from __future__ import annotations
import contextlib
import inspect
+import json
import logging
+import os
+import pathlib
+import shutil
import sys
import time
from collections import defaultdict
@@ -30,6 +34,7 @@ from urllib.parse import quote, urlparse
# Using `from elasticsearch import *` would break elasticsearch mocking used
in unit test.
import elasticsearch
import pendulum
+from elasticsearch import helpers
from elasticsearch.exceptions import NotFoundError
from airflow.configuration import conf
@@ -106,10 +111,13 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
"""
ElasticsearchTaskHandler is a python log handler that reads logs from
Elasticsearch.
- Note that Airflow does not handle the indexing of logs into Elasticsearch.
Instead,
+ Note that Airflow by default does not handle the indexing of logs into
Elasticsearch. Instead,
Airflow flushes logs into local files. Additional software setup is
required to index
the logs into Elasticsearch, such as using Filebeat and Logstash.
+ Airflow can be configured to support directly writing logging to
Elasticsearch. To enable this feature,
+ set `json_format` and `write_to_es` to `True`.
+
To efficiently query and sort Elasticsearch results, this handler assumes
each
log message has a field `log_id` consists of ti primary keys:
`log_id = {dag_id}-{task_id}-{logical_date}-{try_number}`
@@ -136,6 +144,8 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
write_stdout: bool,
json_format: bool,
json_fields: str,
+ write_to_es: bool = False,
+ target_index: str = "airflow-logs",
host_field: str = "host",
offset_field: str = "offset",
host: str = "http://localhost:9200",
@@ -166,6 +176,11 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
self.index_patterns = index_patterns
self.index_patterns_callable = index_patterns_callable
self.context_set = False
+ self.write_to_es = write_to_es
+ self.target_index = target_index
+ self.delete_local_copy = kwargs.get(
+ "delete_local_copy", conf.getboolean("logging",
"delete_local_logs")
+ )
self.formatter: logging.Formatter
self.handler: logging.FileHandler | logging.StreamHandler # type:
ignore[assignment]
@@ -428,9 +443,11 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
extras={
"dag_id": str(ti.dag_id),
"task_id": str(ti.task_id),
- date_key: self._clean_date(ti.logical_date)
- if AIRFLOW_V_3_0_PLUS
- else self._clean_date(ti.execution_date),
+ date_key: (
+ self._clean_date(ti.logical_date)
+ if AIRFLOW_V_3_0_PLUS
+ else self._clean_date(ti.execution_date)
+ ),
"try_number": str(ti.try_number),
"log_id": self._render_log_id(ti, ti.try_number),
},
@@ -480,6 +497,18 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
self.handler.close()
sys.stdout = sys.__stdout__
+ if self.write_to_es and not self.write_stdout:
+ full_path = self.handler.baseFilename # type: ignore[union-attr]
+ log_relative_path =
pathlib.Path(full_path).relative_to(self.local_base).as_posix()
+ local_loc = os.path.join(self.local_base, log_relative_path)
+ if os.path.exists(local_loc):
+ # read log and remove old logs to get just the latest additions
+ log = pathlib.Path(local_loc).read_text()
+ log_lines = self._parse_raw_log(log)
+ success = self._write_to_es(log_lines)
+ if success and self.delete_local_copy:
+ shutil.rmtree(os.path.dirname(local_loc))
+
super().close()
self.closed = True
@@ -599,6 +628,31 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
callback: type[Hit] | Callable[..., Any] = getattr(doc_class,
"from_es", doc_class)
return callback(hit)
+ def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
+ logs = log.split("\n")
+ parsed_logs = []
+ for line in logs:
+ # Make sure line is not empty
+ if line.strip():
+ parsed_logs.append(json.loads(line))
+
+ return parsed_logs
+
+ def _write_to_es(self, log_lines: list[dict[str, Any]]) -> bool:
+ """
+ Write the log to ElasticSearch; return `True` or fails silently and
return `False`.
+
+ :param log_lines: the log_lines to write to the ElasticSearch.
+ """
+ # Prepare the bulk request for Elasticsearch
+ bulk_actions = [{"_index": self.target_index, "_source": log} for log
in log_lines]
+ try:
+ _ = helpers.bulk(self.client, bulk_actions)
+ return True
+ except Exception as e:
+ self.log.exception("Unable to insert logs into Elasticsearch.
Reason: %s", str(e))
+ return False
+
def getattr_nested(obj, item, default):
"""
diff --git a/providers/src/airflow/providers/elasticsearch/provider.yaml
b/providers/src/airflow/providers/elasticsearch/provider.yaml
index a1ddbae845c..88ebba2a510 100644
--- a/providers/src/airflow/providers/elasticsearch/provider.yaml
+++ b/providers/src/airflow/providers/elasticsearch/provider.yaml
@@ -136,6 +136,20 @@ config:
type: string
example: ~
default: "False"
+ write_to_es:
+ description: |
+ Write the task logs to the ElasticSearch
+ version_added: 5.5.4
+ type: string
+ example: ~
+ default: "False"
+ target_index:
+ description: |
+ Name of the index to write to, when enabling writing the task logs
to the ElasticSearch
+ version_added: 5.5.4
+ type: string
+ example: ~
+ default: "airflow-logs"
json_format:
description: |
Instead of the default log formatter, write the log lines as JSON
diff --git a/providers/tests/elasticsearch/log/test_es_task_handler.py
b/providers/tests/elasticsearch/log/test_es_task_handler.py
index f87d4ffb143..f6b3f793950 100644
--- a/providers/tests/elasticsearch/log/test_es_task_handler.py
+++ b/providers/tests/elasticsearch/log/test_es_task_handler.py
@@ -81,9 +81,11 @@ class TestElasticsearchTaskHandler:
def ti(self, create_task_instance, create_log_template):
create_log_template(
self.FILENAME_TEMPLATE,
- "{dag_id}-{task_id}-{logical_date}-{try_number}"
- if AIRFLOW_V_3_0_PLUS
- else "{dag_id}-{task_id}-{execution_date}-{try_number}",
+ (
+ "{dag_id}-{task_id}-{logical_date}-{try_number}"
+ if AIRFLOW_V_3_0_PLUS
+ else "{dag_id}-{task_id}-{execution_date}-{try_number}"
+ ),
)
yield get_ti(
dag_id=self.DAG_ID,
@@ -673,6 +675,24 @@ class TestElasticsearchTaskHandler:
filename_template=None,
)
+ def test_write_to_es(self, ti):
+ self.es_task_handler.write_to_es = True
+ self.es_task_handler.json_format = True
+ self.es_task_handler.write_stdout = False
+ self.es_task_handler.local_base = Path(os.getcwd()) / "local" / "log"
/ "location"
+ formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s
- %(message)s")
+ self.es_task_handler.formatter = formatter
+
+ self.es_task_handler.set_context(ti)
+ with patch(
+
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler._write_to_es"
+ ) as mock_write_to_es:
+ mock_write = Mock(return_value=True)
+ mock_write_to_es.return_value = mock_write
+ self.es_task_handler._write_to_es = mock_write_to_es
+ self.es_task_handler.close()
+ mock_write_to_es.assert_called_once()
+
def test_safe_attrgetter():
class A: ...