This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new e50e946  Task logging handlers can provide custom log links (#9354)
e50e946 is described below

commit e50e94613a671a86d72e687d9f27fe1cb73ebf36
Author: Mauricio De Diana <mdedi...@gmail.com>
AuthorDate: Thu Jul 2 06:15:53 2020 -0300

    Task logging handlers can provide custom log links (#9354)
    
    Use a mixin to define log handlers based on remote services. The main
    changes are:
     - Create RemoteLoggingMixin to define remote log handlers.
     - Remove explicit mentions to Elasticsearch in dag.html.
     - Rename the /elasticsearch endpoint in views.py to
       /redirect_to_remote_log and dispatch the remote URL building to the
       log handler.
    
    Co-authored-by: Kamil BreguĊ‚a <mik-...@users.noreply.github.com>
---
 airflow/config_templates/airflow_local_settings.py |  2 +
 airflow/utils/log/es_task_handler.py               | 34 +++++++++-
 airflow/utils/log/log_reader.py                    |  8 ++-
 airflow/utils/log/logging_mixin.py                 | 16 +++++
 airflow/www/templates/airflow/dag.html             | 30 +++++----
 airflow/www/views.py                               | 55 ++++++++++++----
 docs/howto/write-logs.rst                          | 26 ++++++++
 tests/utils/log/test_es_task_handler.py            | 22 +++++++
 tests/www/test_views.py                            | 75 +++++++++++++++++++++-
 9 files changed, 235 insertions(+), 33 deletions(-)

diff --git a/airflow/config_templates/airflow_local_settings.py 
b/airflow/config_templates/airflow_local_settings.py
index 2374bd90..5633ea9 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -234,6 +234,7 @@ if REMOTE_LOGGING:
     elif ELASTICSEARCH_HOST:
         ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get('elasticsearch', 
'LOG_ID_TEMPLATE')
         ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get('elasticsearch', 
'END_OF_LOG_MARK')
+        ELASTICSEARCH_FRONTEND: str = conf.get('elasticsearch', 'frontend')
         ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 
'WRITE_STDOUT')
         ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 
'JSON_FORMAT')
         ELASTICSEARCH_JSON_FIELDS: str = conf.get('elasticsearch', 
'JSON_FIELDS')
@@ -247,6 +248,7 @@ if REMOTE_LOGGING:
                 'filename_template': FILENAME_TEMPLATE,
                 'end_of_log_mark': ELASTICSEARCH_END_OF_LOG_MARK,
                 'host': ELASTICSEARCH_HOST,
+                'frontend': ELASTICSEARCH_FRONTEND,
                 'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
                 'json_format': ELASTICSEARCH_JSON_FORMAT,
                 'json_fields': ELASTICSEARCH_JSON_FIELDS
diff --git a/airflow/utils/log/es_task_handler.py 
b/airflow/utils/log/es_task_handler.py
index 47f970f..b6e1687 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/utils/log/es_task_handler.py
@@ -18,6 +18,7 @@
 
 import logging
 import sys
+from urllib.parse import quote
 
 # Using `from elasticsearch import *` would break elasticsearch mocking used 
in unit test.
 import elasticsearch
@@ -25,14 +26,15 @@ import pendulum
 from elasticsearch_dsl import Search
 
 from airflow.configuration import conf
+from airflow.models import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.helpers import parse_template_string
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.json_formatter import JSONFormatter
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
 
 
-class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
+class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin, 
ExternalLoggingMixin):
     """
     ElasticsearchTaskHandler is a python log handler that
     reads logs from Elasticsearch. Note logs are not directly
@@ -51,11 +53,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
LoggingMixin):
 
     PAGE = 0
     MAX_LINE_PER_PAGE = 1000
+    LOG_NAME = 'Elasticsearch'
 
-    def __init__(self, base_log_folder, filename_template,
+    def __init__(self, base_log_folder, filename_template,  # pylint: 
disable=too-many-arguments
                  log_id_template, end_of_log_mark,
                  write_stdout, json_format, json_fields,
                  host='localhost:9200',
+                 frontend='localhost:5601',
                  es_kwargs=conf.getsection("elasticsearch_configs")):
         """
         :param base_log_folder: base folder to store logs locally
@@ -72,6 +76,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
 
         self.client = elasticsearch.Elasticsearch([host], **es_kwargs)
 
+        self.frontend = frontend
         self.mark_end_on_close = True
         self.end_of_log_mark = end_of_log_mark
         self.write_stdout = write_stdout
@@ -262,3 +267,26 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
LoggingMixin):
         super().close()
 
         self.closed = True
+
+    @property
+    def log_name(self):
+        return self.LOG_NAME
+
+    def get_external_log_url(self, task_instance: TaskInstance, try_number: 
int) -> str:
+        """
+        Creates an address for an external log collecting service.
+
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: task instance try_number to read logs from.
+        :type try_number: Optional[int]
+        :return: URL to the external log collection service
+        :rtype: str
+        """
+        log_id = self.log_id_template.format(
+            dag_id=task_instance.dag_id,
+            task_id=task_instance.task_id,
+            execution_date=task_instance.execution_date,
+            try_number=try_number)
+        url = 'https://' + self.frontend.format(log_id=quote(log_id))
+        return url
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index ee36f31..6aab9e6 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -23,6 +23,7 @@ from cached_property import cached_property
 from airflow.configuration import conf
 from airflow.models import TaskInstance
 from airflow.utils.helpers import render_log_filename
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin
 
 
 class TaskLogReader:
@@ -95,11 +96,16 @@ class TaskLogReader:
         return handler
 
     @property
-    def is_supported(self):
+    def supports_read(self):
         """Checks if a read operation is supported by a current log handler."""
 
         return hasattr(self.log_handler, 'read')
 
+    @property
+    def supports_external_link(self):
+        """Check if the logging handler supports external links (e.g. to 
Elasticsearch, Stackdriver, etc)."""
+        return isinstance(self.log_handler, ExternalLoggingMixin)
+
     def render_log_filename(self, ti: TaskInstance, try_number: Optional[int] 
= None):
         """
         Renders the log attachment filename
diff --git a/airflow/utils/log/logging_mixin.py 
b/airflow/utils/log/logging_mixin.py
index ac6ca59..4ffb4f8 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import abc
 import logging
 import re
 import sys
@@ -58,6 +59,21 @@ class LoggingMixin:
             set_context(self.log, context)
 
 
+class ExternalLoggingMixin:
+    """
+    Define a log handler based on an external service (e.g. ELK, StackDriver).
+    """
+    @abc.abstractproperty
+    def log_name(self) -> str:
+        """Return log name"""
+
+    @abc.abstractmethod
+    def get_external_log_url(self, task_instance, try_number) -> str:
+        """
+        Return the URL for log visualization in the external service.
+        """
+
+
 # TODO: Formally inherit from io.IOBase
 class StreamLogWriter:
     """
diff --git a/airflow/www/templates/airflow/dag.html 
b/airflow/www/templates/airflow/dag.html
index 49dda5a..5ee1b20 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -167,12 +167,14 @@
             <hr/>
             <hr/>
           </div>
-          <div id="dag_es_logs">
-            <label style="display:inline"> View Logs in Elasticsearch (by 
attempts): </label>
-            <ul class="nav nav-pills" role="tablist" id="es_try_index" 
style="display:inline">
+          <div id="dag_redir_logs">
+            {% if external_log_name is defined %}
+            <label style="display:inline"> View Logs in {{ external_log_name 
}} (by attempts): </label>
+            <ul class="nav nav-pills" role="tablist" id="redir_log_try_index" 
style="display:inline">
             </ul>
             <hr/>
             <hr/>
+            {% endif %}
           </div>
           <form method="POST">
             <input name="csrf_token" type="hidden" value="{{ csrf_token() }}"/>
@@ -366,9 +368,9 @@ function updateQueryStringParameter(uri, key, value) {
     var task_id = '';
     var execution_date = '';
     var subdag_id = '';
-    var show_es_logs = false;
-    {% if show_external_logs is defined %}
-      show_es_logs = '{{ show_external_logs }}' == "True";
+    var show_external_log_redirect = false;
+    {% if show_external_log_redirect is defined %}
+      show_external_log_redirect = '{{ show_external_log_redirect }}' == 
"True";
     {% endif %}
 
     var buttons = 
Array.from(document.querySelectorAll('a[id^="btn_"][data-base-url]')).reduce(function(obj,
 elm) {
@@ -441,18 +443,18 @@ function updateQueryStringParameter(uri, key, value) {
       }
 
       $("#dag_dl_logs").hide();
-      $("#dag_es_logs").hide();
+      $("#dag_redir_logs").hide();
       if (try_numbers > 0) {
           $("#dag_dl_logs").show();
-          if (show_es_logs) {
-              $("#dag_es_logs").show();
+          if (show_external_log_redirect) {
+              $("#dag_redir_logs").show();
           }
       }
 
       updateModalUrls();
 
       $("#try_index > li").remove();
-      $("#es_try_index > li").remove();
+      $("#redir_log_try_index > li").remove();
       var startIndex = (try_numbers > 2 ? 0 : 1)
       for (var index = startIndex; index <  try_numbers; index++) {
         var url = "{{ url_for('Airflow.get_logs_with_metadata') }}" +
@@ -474,14 +476,14 @@ function updateQueryStringParameter(uri, key, value) {
           </li>`
         );
 
-        if (index == 0 || !show_es_logs) continue;
-        var es_url = "{{ url_for('Airflow.elasticsearch') }}" +
+        if (index == 0 || !show_external_log_redirect) continue;
+        var redir_log_url = "{{ url_for('Airflow.redirect_to_external_log') 
}}" +
           "?dag_id=" + encodeURIComponent(dag_id) +
           "&task_id=" + encodeURIComponent(task_id) +
           "&execution_date=" + encodeURIComponent(execution_date) +
           "&try_number=" + index;
-        $("#es_try_index").append(`<li role="presentation" 
style="display:inline">
-          <a href="${es_url}"> ${showLabel} </a>
+        $("#redir_log_try_index").append(`<li role="presentation" 
style="display:inline">
+          <a href="${redir_log_url}"> ${showLabel} </a>
           </li>`
         );
       }
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 591d2e2..3be4e25 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -28,7 +28,7 @@ from collections import defaultdict
 from datetime import datetime, timedelta
 from json import JSONDecodeError
 from typing import Dict, List, Optional, Tuple
-from urllib.parse import quote, unquote
+from urllib.parse import unquote
 
 import lazy_object_proxy
 import nvd3
@@ -678,7 +678,7 @@ class Airflow(AirflowBaseView):  # noqa: D101
             return response
 
         task_log_reader = TaskLogReader()
-        if not task_log_reader.is_supported:
+        if not task_log_reader.supports_read:
             return jsonify(
                 message="Task log handler does not support read logs.",
                 error=True,
@@ -760,21 +760,34 @@ class Airflow(AirflowBaseView):  # noqa: D101
             execution_date=execution_date, form=form,
             root=root, wrapped=conf.getboolean('webserver', 'default_wrap'))
 
-    @expose('/elasticsearch')
+    @expose('/redirect_to_external_log')
     @has_dag_access(can_dag_read=True)
     @has_access
     @action_logging
-    def elasticsearch(self):
+    @provide_session
+    def redirect_to_external_log(self, session=None):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
+        dttm = timezone.parse(execution_date)
         try_number = request.args.get('try_number', 1)
-        elasticsearch_frontend = conf.get('elasticsearch', 'frontend')
-        log_id_template = conf.get('elasticsearch', 'log_id_template')
-        log_id = log_id_template.format(
-            dag_id=dag_id, task_id=task_id,
-            execution_date=execution_date, try_number=try_number)
-        url = 'https://' + elasticsearch_frontend.format(log_id=quote(log_id))
+
+        ti = session.query(models.TaskInstance).filter(
+            models.TaskInstance.dag_id == dag_id,
+            models.TaskInstance.task_id == task_id,
+            models.TaskInstance.execution_date == dttm).first()
+
+        if not ti:
+            flash(f"Task [{dag_id}.{task_id}] does not exist", "error")
+            return redirect(url_for('Airflow.index'))
+
+        task_log_reader = TaskLogReader()
+        if not task_log_reader.supports_external_link:
+            flash("Task log handler does not support external links", "error")
+            return redirect(url_for('Airflow.index'))
+
+        handler = task_log_reader.log_handler
+        url = handler.get_external_log_url(ti, try_number)
         return redirect(url)
 
     @expose('/task')
@@ -1491,8 +1504,15 @@ class Airflow(AirflowBaseView):  # noqa: D101
 
         form = DateTimeWithNumRunsForm(data={'base_date': max_date,
                                              'num_runs': num_runs})
-        external_logs = conf.get('elasticsearch', 'frontend')
+
         doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None), 
css_class='dag-doc')
+
+        task_log_reader = TaskLogReader()
+        if task_log_reader.supports_external_link:
+            external_log_name = task_log_reader.log_handler.log_name
+        else:
+            external_log_name = None
+
         # avoid spaces to reduce payload size
         data = htmlsafe_json_dumps(data, separators=(',', ':'))
 
@@ -1505,7 +1525,8 @@ class Airflow(AirflowBaseView):  # noqa: D101
             doc_md=doc_md,
             data=data,
             blur=blur, num_runs=num_runs,
-            show_external_logs=bool(external_logs))
+            show_external_log_redirect=task_log_reader.supports_external_link,
+            external_log_name=external_log_name)
 
     @expose('/graph')
     @has_dag_access(can_dag_read=True)
@@ -1587,7 +1608,12 @@ class Airflow(AirflowBaseView):  # noqa: D101
         session.commit()
         doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None), 
css_class='dag-doc')
 
-        external_logs = conf.get('elasticsearch', 'frontend')
+        task_log_reader = TaskLogReader()
+        if task_log_reader.supports_external_link:
+            external_log_name = task_log_reader.log_handler.log_name
+        else:
+            external_log_name = None
+
         return self.render_template(
             'airflow/graph.html',
             dag=dag,
@@ -1605,7 +1631,8 @@ class Airflow(AirflowBaseView):  # noqa: D101
             tasks=tasks,
             nodes=nodes,
             edges=edges,
-            show_external_logs=bool(external_logs))
+            show_external_log_redirect=task_log_reader.supports_external_link,
+            external_log_name=external_log_name)
 
     @expose('/duration')
     @has_dag_access(can_dag_read=True)
diff --git a/docs/howto/write-logs.rst b/docs/howto/write-logs.rst
index c16a59b..9e0be0e 100644
--- a/docs/howto/write-logs.rst
+++ b/docs/howto/write-logs.rst
@@ -234,6 +234,7 @@ First, to use the handler, ``airflow.cfg`` must be 
configured as follows:
     remote_logging = True
 
     [elasticsearch]
+    host = <host>:<port>
     log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
     end_of_log_mark = end_of_log
     write_stdout =
@@ -251,6 +252,7 @@ To output task logs to stdout in JSON format, the following 
config could be used
     remote_logging = True
 
     [elasticsearch]
+    host = <host>:<port>
     log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
     end_of_log_mark = end_of_log
     write_stdout = True
@@ -318,3 +320,27 @@ be used.
 By using the ``logging_config_class`` option you can get :ref:`advanced 
features <write-logs-advanced>` of
 this handler. Details are available in the handler's documentation -
 :class:`~airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler`.
+
+External Links
+==============
+
+When using remote logging, users can configure Airflow to show a link to an 
external UI within the Airflow Web UI. Clicking the link redirects a user to 
the external UI.
+
+Some external systems require specific configuration in Airflow for 
redirection to work but others do not.
+
+.. _log-link-elasticsearch:
+
+Elasticsearch External Link
+------------------------------------
+
+A user can configure Airflow to show a link to an Elasticsearch log viewing 
system (e.g. Kibana).
+
+To enable it, ``airflow.cfg`` must be configured as in the example below. Note 
the required ``{log_id}`` in the URL, when constructing the external link, 
Airflow replaces this parameter with the same ``log_id_template`` used for 
writing logs (see `Writing Logs to Elasticsearch`_).
+
+.. code-block:: ini
+
+    [elasticsearch]
+    # Qualified URL for an elasticsearch frontend (like Kibana) with a 
template argument for log_id
+    # Code will construct log_id using the log_id template from the argument 
above.
+    # NOTE: The code will prefix the https:// automatically, don't include 
that here.
+    frontend = <host_port>/{log_id}
diff --git a/tests/utils/log/test_es_task_handler.py 
b/tests/utils/log/test_es_task_handler.py
index b4e8fac..8453ca8 100644
--- a/tests/utils/log/test_es_task_handler.py
+++ b/tests/utils/log/test_es_task_handler.py
@@ -21,9 +21,11 @@ import os
 import shutil
 import unittest
 from unittest import mock
+from urllib.parse import quote
 
 import elasticsearch
 import pendulum
+from parameterized import parameterized
 
 from airflow.configuration import conf
 from airflow.models import DAG, TaskInstance
@@ -346,3 +348,23 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
     def test_clean_execution_date(self):
         clean_execution_date = 
self.es_task_handler._clean_execution_date(datetime(2016, 7, 8, 9, 10, 11, 12))
         self.assertEqual('2016_07_08T09_10_11_000012', clean_execution_date)
+
+    @parameterized.expand([
+        # Common case
+        ('localhost:5601/{log_id}', 'https://localhost:5601/' + 
quote(LOG_ID.replace('T', ' '))),
+        # Ignore template if "{log_id}"" is missing in the URL
+        ('localhost:5601', 'https://localhost:5601'),
+    ])
+    def test_get_external_log_url(self, es_frontend, expected_url):
+        es_task_handler = ElasticsearchTaskHandler(
+            self.local_log_location,
+            self.filename_template,
+            self.log_id_template,
+            self.end_of_log_mark,
+            self.write_stdout,
+            self.json_format,
+            self.json_fields,
+            frontend=es_frontend
+        )
+        url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number)
+        self.assertEqual(expected_url, url)
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 090f3af..155c16c 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -56,6 +56,7 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.settings import Session
 from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, 
RUNNABLE_STATES
 from airflow.utils import dates, timezone
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin
 from airflow.utils.session import create_session
 from airflow.utils.sqlalchemy import using_mysql
 from airflow.utils.state import State
@@ -992,6 +993,36 @@ class TestAirflowBaseViews(TestBase):
         self.session.query(DM).filter(DM.dag_id == 
test_dag_id).update({'dag_id': dag_id})
         self.session.commit()
 
+    @parameterized.expand(["graph", "tree"])
+    def test_show_external_log_redirect_link_with_local_log_handler(self, 
endpoint):
+        """Do not show external links if log handler is local."""
+        url = f'{endpoint}?dag_id=example_bash_operator'
+        with self.capture_templates() as templates:
+            self.client.get(url, follow_redirects=True)
+            ctx = templates[0].local_context
+            self.assertFalse(ctx['show_external_log_redirect'])
+            self.assertIsNone(ctx['external_log_name'])
+
+    @parameterized.expand(["graph", "tree"])
+    @mock.patch('airflow.utils.log.log_reader.TaskLogReader.log_handler', 
new_callable=PropertyMock)
+    def test_show_external_log_redirect_link_with_external_log_handler(self, 
endpoint, mock_log_handler):
+        """Show external links if log handler is external."""
+        class ExternalHandler(ExternalLoggingMixin):
+            LOG_NAME = 'ExternalLog'
+
+            @property
+            def log_name(self):
+                return self.LOG_NAME
+
+        mock_log_handler.return_value = ExternalHandler()
+
+        url = f'{endpoint}?dag_id=example_bash_operator'
+        with self.capture_templates() as templates:
+            self.client.get(url, follow_redirects=True)
+            ctx = templates[0].local_context
+            self.assertTrue(ctx['show_external_log_redirect'])
+            self.assertEqual(ctx['external_log_name'], 
ExternalHandler.LOG_NAME)
+
 
 class TestConfigurationView(TestBase):
     def test_configuration_do_not_expose_config(self):
@@ -1264,7 +1295,7 @@ class TestLogView(TestBase):
 
     @mock.patch("airflow.www.views.TaskLogReader")
     def test_get_logs_for_handler_without_read_method(self, mock_log_reader):
-        type(mock_log_reader.return_value).is_supported = 
PropertyMock(return_value=False)
+        type(mock_log_reader.return_value).supports_read = 
PropertyMock(return_value=False)
 
         url_template = "get_logs_with_metadata?dag_id={}&" \
                        "task_id={}&execution_date={}&" \
@@ -1283,6 +1314,48 @@ class TestLogView(TestBase):
             'Task log handler does not support read logs.',
             response.json['message'])
 
+    @parameterized.expand([
+        ('inexistent', ),
+        (TASK_ID, ),
+    ])
+    def test_redirect_to_external_log_with_local_log_handler(self, task_id):
+        """Redirect to home if TI does not exist or if log handler is local"""
+        url_template = "redirect_to_external_log?dag_id={}&" \
+                       "task_id={}&execution_date={}&" \
+                       "try_number={}"
+        try_number = 1
+        url = url_template.format(self.DAG_ID,
+                                  task_id,
+                                  quote_plus(self.DEFAULT_DATE.isoformat()),
+                                  try_number)
+        response = self.client.get(url)
+
+        self.assertEqual(302, response.status_code)
+        self.assertEqual('http://localhost/home', response.headers['Location'])
+
+    @mock.patch('airflow.utils.log.log_reader.TaskLogReader.log_handler', 
new_callable=PropertyMock)
+    def test_redirect_to_external_log_with_external_log_handler(self, 
mock_log_handler):
+        class ExternalHandler(ExternalLoggingMixin):
+            EXTERNAL_URL = 'http://external-service.com'
+
+            def get_external_log_url(self, *args, **kwargs):
+                return self.EXTERNAL_URL
+
+        mock_log_handler.return_value = ExternalHandler()
+
+        url_template = "redirect_to_external_log?dag_id={}&" \
+                       "task_id={}&execution_date={}&" \
+                       "try_number={}"
+        try_number = 1
+        url = url_template.format(self.DAG_ID,
+                                  self.TASK_ID,
+                                  quote_plus(self.DEFAULT_DATE.isoformat()),
+                                  try_number)
+        response = self.client.get(url)
+
+        self.assertEqual(302, response.status_code)
+        self.assertEqual(ExternalHandler.EXTERNAL_URL, 
response.headers['Location'])
+
 
 class TestVersionView(TestBase):
     def test_version(self):

Reply via email to