This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new e50e946 Task logging handlers can provide custom log links (#9354)
e50e946 is described below
commit e50e94613a671a86d72e687d9f27fe1cb73ebf36
Author: Mauricio De Diana <[email protected]>
AuthorDate: Thu Jul 2 06:15:53 2020 -0300
Task logging handlers can provide custom log links (#9354)
Use a mixin to define log handlers based on remote services. The main
changes are:
- Create RemoteLoggingMixin to define remote log handlers.
- Remove explicit mentions to Elasticsearch in dag.html.
- Rename the /elasticsearch endpoint in views.py to
/redirect_to_remote_log and dispatch the remote URL building to the
log handler.
Co-authored-by: Kamil BreguĊa <[email protected]>
---
airflow/config_templates/airflow_local_settings.py | 2 +
airflow/utils/log/es_task_handler.py | 34 +++++++++-
airflow/utils/log/log_reader.py | 8 ++-
airflow/utils/log/logging_mixin.py | 16 +++++
airflow/www/templates/airflow/dag.html | 30 +++++----
airflow/www/views.py | 55 ++++++++++++----
docs/howto/write-logs.rst | 26 ++++++++
tests/utils/log/test_es_task_handler.py | 22 +++++++
tests/www/test_views.py | 75 +++++++++++++++++++++-
9 files changed, 235 insertions(+), 33 deletions(-)
diff --git a/airflow/config_templates/airflow_local_settings.py
b/airflow/config_templates/airflow_local_settings.py
index 2374bd90..5633ea9 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -234,6 +234,7 @@ if REMOTE_LOGGING:
elif ELASTICSEARCH_HOST:
ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get('elasticsearch',
'LOG_ID_TEMPLATE')
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get('elasticsearch',
'END_OF_LOG_MARK')
+ ELASTICSEARCH_FRONTEND: str = conf.get('elasticsearch', 'frontend')
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch',
'WRITE_STDOUT')
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch',
'JSON_FORMAT')
ELASTICSEARCH_JSON_FIELDS: str = conf.get('elasticsearch',
'JSON_FIELDS')
@@ -247,6 +248,7 @@ if REMOTE_LOGGING:
'filename_template': FILENAME_TEMPLATE,
'end_of_log_mark': ELASTICSEARCH_END_OF_LOG_MARK,
'host': ELASTICSEARCH_HOST,
+ 'frontend': ELASTICSEARCH_FRONTEND,
'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
'json_format': ELASTICSEARCH_JSON_FORMAT,
'json_fields': ELASTICSEARCH_JSON_FIELDS
diff --git a/airflow/utils/log/es_task_handler.py
b/airflow/utils/log/es_task_handler.py
index 47f970f..b6e1687 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/utils/log/es_task_handler.py
@@ -18,6 +18,7 @@
import logging
import sys
+from urllib.parse import quote
# Using `from elasticsearch import *` would break elasticsearch mocking used
in unit test.
import elasticsearch
@@ -25,14 +26,15 @@ import pendulum
from elasticsearch_dsl import Search
from airflow.configuration import conf
+from airflow.models import TaskInstance
from airflow.utils import timezone
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.json_formatter import JSONFormatter
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
-class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
+class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin,
ExternalLoggingMixin):
"""
ElasticsearchTaskHandler is a python log handler that
reads logs from Elasticsearch. Note logs are not directly
@@ -51,11 +53,13 @@ class ElasticsearchTaskHandler(FileTaskHandler,
LoggingMixin):
PAGE = 0
MAX_LINE_PER_PAGE = 1000
+ LOG_NAME = 'Elasticsearch'
- def __init__(self, base_log_folder, filename_template,
+ def __init__(self, base_log_folder, filename_template, # pylint:
disable=too-many-arguments
log_id_template, end_of_log_mark,
write_stdout, json_format, json_fields,
host='localhost:9200',
+ frontend='localhost:5601',
es_kwargs=conf.getsection("elasticsearch_configs")):
"""
:param base_log_folder: base folder to store logs locally
@@ -72,6 +76,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
self.client = elasticsearch.Elasticsearch([host], **es_kwargs)
+ self.frontend = frontend
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark
self.write_stdout = write_stdout
@@ -262,3 +267,26 @@ class ElasticsearchTaskHandler(FileTaskHandler,
LoggingMixin):
super().close()
self.closed = True
+
+ @property
+ def log_name(self):
+ return self.LOG_NAME
+
+ def get_external_log_url(self, task_instance: TaskInstance, try_number:
int) -> str:
+ """
+ Creates an address for an external log collecting service.
+
+ :param task_instance: task instance object
+ :type: task_instance: TaskInstance
+ :param try_number: task instance try_number to read logs from.
+ :type try_number: Optional[int]
+ :return: URL to the external log collection service
+ :rtype: str
+ """
+ log_id = self.log_id_template.format(
+ dag_id=task_instance.dag_id,
+ task_id=task_instance.task_id,
+ execution_date=task_instance.execution_date,
+ try_number=try_number)
+ url = 'https://' + self.frontend.format(log_id=quote(log_id))
+ return url
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index ee36f31..6aab9e6 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -23,6 +23,7 @@ from cached_property import cached_property
from airflow.configuration import conf
from airflow.models import TaskInstance
from airflow.utils.helpers import render_log_filename
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin
class TaskLogReader:
@@ -95,11 +96,16 @@ class TaskLogReader:
return handler
@property
- def is_supported(self):
+ def supports_read(self):
"""Checks if a read operation is supported by a current log handler."""
return hasattr(self.log_handler, 'read')
+ @property
+ def supports_external_link(self):
+ """Check if the logging handler supports external links (e.g. to
Elasticsearch, Stackdriver, etc)."""
+ return isinstance(self.log_handler, ExternalLoggingMixin)
+
def render_log_filename(self, ti: TaskInstance, try_number: Optional[int]
= None):
"""
Renders the log attachment filename
diff --git a/airflow/utils/log/logging_mixin.py
b/airflow/utils/log/logging_mixin.py
index ac6ca59..4ffb4f8 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import abc
import logging
import re
import sys
@@ -58,6 +59,21 @@ class LoggingMixin:
set_context(self.log, context)
+class ExternalLoggingMixin:
+ """
+ Define a log handler based on an external service (e.g. ELK, StackDriver).
+ """
+ @abc.abstractproperty
+ def log_name(self) -> str:
+ """Return log name"""
+
+ @abc.abstractmethod
+ def get_external_log_url(self, task_instance, try_number) -> str:
+ """
+ Return the URL for log visualization in the external service.
+ """
+
+
# TODO: Formally inherit from io.IOBase
class StreamLogWriter:
"""
diff --git a/airflow/www/templates/airflow/dag.html
b/airflow/www/templates/airflow/dag.html
index 49dda5a..5ee1b20 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -167,12 +167,14 @@
<hr/>
<hr/>
</div>
- <div id="dag_es_logs">
- <label style="display:inline"> View Logs in Elasticsearch (by
attempts): </label>
- <ul class="nav nav-pills" role="tablist" id="es_try_index"
style="display:inline">
+ <div id="dag_redir_logs">
+ {% if external_log_name is defined %}
+ <label style="display:inline"> View Logs in {{ external_log_name
}} (by attempts): </label>
+ <ul class="nav nav-pills" role="tablist" id="redir_log_try_index"
style="display:inline">
</ul>
<hr/>
<hr/>
+ {% endif %}
</div>
<form method="POST">
<input name="csrf_token" type="hidden" value="{{ csrf_token() }}"/>
@@ -366,9 +368,9 @@ function updateQueryStringParameter(uri, key, value) {
var task_id = '';
var execution_date = '';
var subdag_id = '';
- var show_es_logs = false;
- {% if show_external_logs is defined %}
- show_es_logs = '{{ show_external_logs }}' == "True";
+ var show_external_log_redirect = false;
+ {% if show_external_log_redirect is defined %}
+ show_external_log_redirect = '{{ show_external_log_redirect }}' ==
"True";
{% endif %}
var buttons =
Array.from(document.querySelectorAll('a[id^="btn_"][data-base-url]')).reduce(function(obj,
elm) {
@@ -441,18 +443,18 @@ function updateQueryStringParameter(uri, key, value) {
}
$("#dag_dl_logs").hide();
- $("#dag_es_logs").hide();
+ $("#dag_redir_logs").hide();
if (try_numbers > 0) {
$("#dag_dl_logs").show();
- if (show_es_logs) {
- $("#dag_es_logs").show();
+ if (show_external_log_redirect) {
+ $("#dag_redir_logs").show();
}
}
updateModalUrls();
$("#try_index > li").remove();
- $("#es_try_index > li").remove();
+ $("#redir_log_try_index > li").remove();
var startIndex = (try_numbers > 2 ? 0 : 1)
for (var index = startIndex; index < try_numbers; index++) {
var url = "{{ url_for('Airflow.get_logs_with_metadata') }}" +
@@ -474,14 +476,14 @@ function updateQueryStringParameter(uri, key, value) {
</li>`
);
- if (index == 0 || !show_es_logs) continue;
- var es_url = "{{ url_for('Airflow.elasticsearch') }}" +
+ if (index == 0 || !show_external_log_redirect) continue;
+ var redir_log_url = "{{ url_for('Airflow.redirect_to_external_log')
}}" +
"?dag_id=" + encodeURIComponent(dag_id) +
"&task_id=" + encodeURIComponent(task_id) +
"&execution_date=" + encodeURIComponent(execution_date) +
"&try_number=" + index;
- $("#es_try_index").append(`<li role="presentation"
style="display:inline">
- <a href="${es_url}"> ${showLabel} </a>
+ $("#redir_log_try_index").append(`<li role="presentation"
style="display:inline">
+ <a href="${redir_log_url}"> ${showLabel} </a>
</li>`
);
}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 591d2e2..3be4e25 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -28,7 +28,7 @@ from collections import defaultdict
from datetime import datetime, timedelta
from json import JSONDecodeError
from typing import Dict, List, Optional, Tuple
-from urllib.parse import quote, unquote
+from urllib.parse import unquote
import lazy_object_proxy
import nvd3
@@ -678,7 +678,7 @@ class Airflow(AirflowBaseView): # noqa: D101
return response
task_log_reader = TaskLogReader()
- if not task_log_reader.is_supported:
+ if not task_log_reader.supports_read:
return jsonify(
message="Task log handler does not support read logs.",
error=True,
@@ -760,21 +760,34 @@ class Airflow(AirflowBaseView): # noqa: D101
execution_date=execution_date, form=form,
root=root, wrapped=conf.getboolean('webserver', 'default_wrap'))
- @expose('/elasticsearch')
+ @expose('/redirect_to_external_log')
@has_dag_access(can_dag_read=True)
@has_access
@action_logging
- def elasticsearch(self):
+ @provide_session
+ def redirect_to_external_log(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
+ dttm = timezone.parse(execution_date)
try_number = request.args.get('try_number', 1)
- elasticsearch_frontend = conf.get('elasticsearch', 'frontend')
- log_id_template = conf.get('elasticsearch', 'log_id_template')
- log_id = log_id_template.format(
- dag_id=dag_id, task_id=task_id,
- execution_date=execution_date, try_number=try_number)
- url = 'https://' + elasticsearch_frontend.format(log_id=quote(log_id))
+
+ ti = session.query(models.TaskInstance).filter(
+ models.TaskInstance.dag_id == dag_id,
+ models.TaskInstance.task_id == task_id,
+ models.TaskInstance.execution_date == dttm).first()
+
+ if not ti:
+ flash(f"Task [{dag_id}.{task_id}] does not exist", "error")
+ return redirect(url_for('Airflow.index'))
+
+ task_log_reader = TaskLogReader()
+ if not task_log_reader.supports_external_link:
+ flash("Task log handler does not support external links", "error")
+ return redirect(url_for('Airflow.index'))
+
+ handler = task_log_reader.log_handler
+ url = handler.get_external_log_url(ti, try_number)
return redirect(url)
@expose('/task')
@@ -1491,8 +1504,15 @@ class Airflow(AirflowBaseView): # noqa: D101
form = DateTimeWithNumRunsForm(data={'base_date': max_date,
'num_runs': num_runs})
- external_logs = conf.get('elasticsearch', 'frontend')
+
doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None),
css_class='dag-doc')
+
+ task_log_reader = TaskLogReader()
+ if task_log_reader.supports_external_link:
+ external_log_name = task_log_reader.log_handler.log_name
+ else:
+ external_log_name = None
+
# avoid spaces to reduce payload size
data = htmlsafe_json_dumps(data, separators=(',', ':'))
@@ -1505,7 +1525,8 @@ class Airflow(AirflowBaseView): # noqa: D101
doc_md=doc_md,
data=data,
blur=blur, num_runs=num_runs,
- show_external_logs=bool(external_logs))
+ show_external_log_redirect=task_log_reader.supports_external_link,
+ external_log_name=external_log_name)
@expose('/graph')
@has_dag_access(can_dag_read=True)
@@ -1587,7 +1608,12 @@ class Airflow(AirflowBaseView): # noqa: D101
session.commit()
doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None),
css_class='dag-doc')
- external_logs = conf.get('elasticsearch', 'frontend')
+ task_log_reader = TaskLogReader()
+ if task_log_reader.supports_external_link:
+ external_log_name = task_log_reader.log_handler.log_name
+ else:
+ external_log_name = None
+
return self.render_template(
'airflow/graph.html',
dag=dag,
@@ -1605,7 +1631,8 @@ class Airflow(AirflowBaseView): # noqa: D101
tasks=tasks,
nodes=nodes,
edges=edges,
- show_external_logs=bool(external_logs))
+ show_external_log_redirect=task_log_reader.supports_external_link,
+ external_log_name=external_log_name)
@expose('/duration')
@has_dag_access(can_dag_read=True)
diff --git a/docs/howto/write-logs.rst b/docs/howto/write-logs.rst
index c16a59b..9e0be0e 100644
--- a/docs/howto/write-logs.rst
+++ b/docs/howto/write-logs.rst
@@ -234,6 +234,7 @@ First, to use the handler, ``airflow.cfg`` must be
configured as follows:
remote_logging = True
[elasticsearch]
+ host = <host>:<port>
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
end_of_log_mark = end_of_log
write_stdout =
@@ -251,6 +252,7 @@ To output task logs to stdout in JSON format, the following
config could be used
remote_logging = True
[elasticsearch]
+ host = <host>:<port>
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
end_of_log_mark = end_of_log
write_stdout = True
@@ -318,3 +320,27 @@ be used.
By using the ``logging_config_class`` option you can get :ref:`advanced
features <write-logs-advanced>` of
this handler. Details are available in the handler's documentation -
:class:`~airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler`.
+
+External Links
+==============
+
+When using remote logging, users can configure Airflow to show a link to an
external UI within the Airflow Web UI. Clicking the link redirects a user to
the external UI.
+
+Some external systems require specific configuration in Airflow for
redirection to work but others do not.
+
+.. _log-link-elasticsearch:
+
+Elasticsearch External Link
+------------------------------------
+
+A user can configure Airflow to show a link to an Elasticsearch log viewing
system (e.g. Kibana).
+
+To enable it, ``airflow.cfg`` must be configured as in the example below. Note
the required ``{log_id}`` in the URL, when constructing the external link,
Airflow replaces this parameter with the same ``log_id_template`` used for
writing logs (see `Writing Logs to Elasticsearch`_).
+
+.. code-block:: ini
+
+ [elasticsearch]
+ # Qualified URL for an elasticsearch frontend (like Kibana) with a
template argument for log_id
+ # Code will construct log_id using the log_id template from the argument
above.
+ # NOTE: The code will prefix the https:// automatically, don't include
that here.
+ frontend = <host_port>/{log_id}
diff --git a/tests/utils/log/test_es_task_handler.py
b/tests/utils/log/test_es_task_handler.py
index b4e8fac..8453ca8 100644
--- a/tests/utils/log/test_es_task_handler.py
+++ b/tests/utils/log/test_es_task_handler.py
@@ -21,9 +21,11 @@ import os
import shutil
import unittest
from unittest import mock
+from urllib.parse import quote
import elasticsearch
import pendulum
+from parameterized import parameterized
from airflow.configuration import conf
from airflow.models import DAG, TaskInstance
@@ -346,3 +348,23 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
def test_clean_execution_date(self):
clean_execution_date =
self.es_task_handler._clean_execution_date(datetime(2016, 7, 8, 9, 10, 11, 12))
self.assertEqual('2016_07_08T09_10_11_000012', clean_execution_date)
+
+ @parameterized.expand([
+ # Common case
+ ('localhost:5601/{log_id}', 'https://localhost:5601/' +
quote(LOG_ID.replace('T', ' '))),
+ # Ignore template if "{log_id}"" is missing in the URL
+ ('localhost:5601', 'https://localhost:5601'),
+ ])
+ def test_get_external_log_url(self, es_frontend, expected_url):
+ es_task_handler = ElasticsearchTaskHandler(
+ self.local_log_location,
+ self.filename_template,
+ self.log_id_template,
+ self.end_of_log_mark,
+ self.write_stdout,
+ self.json_format,
+ self.json_fields,
+ frontend=es_frontend
+ )
+ url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number)
+ self.assertEqual(expected_url, url)
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 090f3af..155c16c 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -56,6 +56,7 @@ from airflow.operators.dummy_operator import DummyOperator
from airflow.settings import Session
from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES,
RUNNABLE_STATES
from airflow.utils import dates, timezone
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import create_session
from airflow.utils.sqlalchemy import using_mysql
from airflow.utils.state import State
@@ -992,6 +993,36 @@ class TestAirflowBaseViews(TestBase):
self.session.query(DM).filter(DM.dag_id ==
test_dag_id).update({'dag_id': dag_id})
self.session.commit()
+ @parameterized.expand(["graph", "tree"])
+ def test_show_external_log_redirect_link_with_local_log_handler(self,
endpoint):
+ """Do not show external links if log handler is local."""
+ url = f'{endpoint}?dag_id=example_bash_operator'
+ with self.capture_templates() as templates:
+ self.client.get(url, follow_redirects=True)
+ ctx = templates[0].local_context
+ self.assertFalse(ctx['show_external_log_redirect'])
+ self.assertIsNone(ctx['external_log_name'])
+
+ @parameterized.expand(["graph", "tree"])
+ @mock.patch('airflow.utils.log.log_reader.TaskLogReader.log_handler',
new_callable=PropertyMock)
+ def test_show_external_log_redirect_link_with_external_log_handler(self,
endpoint, mock_log_handler):
+ """Show external links if log handler is external."""
+ class ExternalHandler(ExternalLoggingMixin):
+ LOG_NAME = 'ExternalLog'
+
+ @property
+ def log_name(self):
+ return self.LOG_NAME
+
+ mock_log_handler.return_value = ExternalHandler()
+
+ url = f'{endpoint}?dag_id=example_bash_operator'
+ with self.capture_templates() as templates:
+ self.client.get(url, follow_redirects=True)
+ ctx = templates[0].local_context
+ self.assertTrue(ctx['show_external_log_redirect'])
+ self.assertEqual(ctx['external_log_name'],
ExternalHandler.LOG_NAME)
+
class TestConfigurationView(TestBase):
def test_configuration_do_not_expose_config(self):
@@ -1264,7 +1295,7 @@ class TestLogView(TestBase):
@mock.patch("airflow.www.views.TaskLogReader")
def test_get_logs_for_handler_without_read_method(self, mock_log_reader):
- type(mock_log_reader.return_value).is_supported =
PropertyMock(return_value=False)
+ type(mock_log_reader.return_value).supports_read =
PropertyMock(return_value=False)
url_template = "get_logs_with_metadata?dag_id={}&" \
"task_id={}&execution_date={}&" \
@@ -1283,6 +1314,48 @@ class TestLogView(TestBase):
'Task log handler does not support read logs.',
response.json['message'])
+ @parameterized.expand([
+ ('inexistent', ),
+ (TASK_ID, ),
+ ])
+ def test_redirect_to_external_log_with_local_log_handler(self, task_id):
+ """Redirect to home if TI does not exist or if log handler is local"""
+ url_template = "redirect_to_external_log?dag_id={}&" \
+ "task_id={}&execution_date={}&" \
+ "try_number={}"
+ try_number = 1
+ url = url_template.format(self.DAG_ID,
+ task_id,
+ quote_plus(self.DEFAULT_DATE.isoformat()),
+ try_number)
+ response = self.client.get(url)
+
+ self.assertEqual(302, response.status_code)
+ self.assertEqual('http://localhost/home', response.headers['Location'])
+
+ @mock.patch('airflow.utils.log.log_reader.TaskLogReader.log_handler',
new_callable=PropertyMock)
+ def test_redirect_to_external_log_with_external_log_handler(self,
mock_log_handler):
+ class ExternalHandler(ExternalLoggingMixin):
+ EXTERNAL_URL = 'http://external-service.com'
+
+ def get_external_log_url(self, *args, **kwargs):
+ return self.EXTERNAL_URL
+
+ mock_log_handler.return_value = ExternalHandler()
+
+ url_template = "redirect_to_external_log?dag_id={}&" \
+ "task_id={}&execution_date={}&" \
+ "try_number={}"
+ try_number = 1
+ url = url_template.format(self.DAG_ID,
+ self.TASK_ID,
+ quote_plus(self.DEFAULT_DATE.isoformat()),
+ try_number)
+ response = self.client.get(url)
+
+ self.assertEqual(302, response.status_code)
+ self.assertEqual(ExternalHandler.EXTERNAL_URL,
response.headers['Location'])
+
class TestVersionView(TestBase):
def test_version(self):