[AIRFLOW-1325] Add ElasticSearch log handler and reader

Closes #3214 from
yrqls21/kevin_yang_add_es_task_handler


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ec38ba95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ec38ba95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ec38ba95

Branch: refs/heads/master
Commit: ec38ba9594395de04ec932481212a86fbe9ae107
Parents: 34f827f
Author: Kevin Yang <kevin.y...@airbnb.com>
Authored: Fri Apr 13 11:09:50 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Fri Apr 13 11:09:50 2018 +0200

----------------------------------------------------------------------
 .gitignore                                      |   3 +
 LICENSE                                         |   1 +
 airflow/bin/cli.py                              |   4 +-
 .../config_templates/airflow_local_settings.py  |  24 +-
 airflow/config_templates/default_airflow.cfg    |   7 +-
 airflow/models.py                               |   4 +-
 airflow/utils/helpers.py                        |   9 +
 airflow/utils/log/es_task_handler.py            | 183 ++++++++++
 airflow/utils/log/file_processor_handler.py     |  10 +-
 airflow/utils/log/file_task_handler.py          |  29 +-
 airflow/utils/log/gcs_task_handler.py           |   8 +-
 airflow/utils/log/s3_task_handler.py            |   8 +-
 airflow/utils/log/wasb_task_handler.py          | 352 ++++++++++---------
 airflow/www/api/experimental/endpoints.py       |  26 +-
 airflow/www/templates/airflow/ti_log.html       | 159 +++++++--
 airflow/www/views.py                            |  87 +++--
 licenses/LICENSE-elasticmock.txt                |  21 ++
 setup.py                                        |   7 +-
 tests/utils/log/elasticmock/__init__.py         |  56 +++
 .../utils/log/elasticmock/fake_elasticsearch.py | 295 ++++++++++++++++
 .../utils/log/elasticmock/utilities/__init__.py |  33 ++
 tests/utils/log/test_es_task_handler.py         | 255 ++++++++++++++
 tests/utils/log/test_s3_task_handler.py         |   6 +-
 tests/utils/test_log_handlers.py                |  15 +-
 tests/www/test_views.py                         |  19 +-
 25 files changed, 1339 insertions(+), 282 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index f5ed5ad..5e7b24d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -130,3 +130,6 @@ ENV/
 
 # Spark
 rat-results.txt
+
+# Git stuff
+.gitattributes

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 540289b..405540c 100644
--- a/LICENSE
+++ b/LICENSE
@@ -225,6 +225,7 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
     (MIT License) Underscorejs (http://underscorejs.org)
     (MIT License) Bootstrap Toggle (http://www.bootstraptoggle.com)
     (MIT License) normalize.css (http://necolas.github.io/normalize.css/)
+    (MIT License) ElasticMock (https://github.com/vrcmarcos/elasticmock)
 
 ========================================================================
 BSD 2-Clause licenses

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index eb96e77..52d5957 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -457,10 +457,8 @@ def run(args, dag=None):
     if args.interactive:
         _run(args, dag, ti)
     else:
-        with redirect_stdout(ti.log, logging.INFO),\
-                redirect_stderr(ti.log, logging.WARN):
+        with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, 
logging.WARN):
             _run(args, dag, ti)
-        logging.shutdown()
 
 
 @cli_utils.action_logging

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/config_templates/airflow_local_settings.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/airflow_local_settings.py 
b/airflow/config_templates/airflow_local_settings.py
index 3086cf0..2ab343e 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -37,12 +37,19 @@ FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts 
}}/{{ try_number }}.
 
 PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
 
+LOG_ID_TEMPLATE = '{dag_id}-{task_id}-{execution_date}-{try_number}'
+
 # Storage bucket url for remote logging
 # s3 buckets should start with "s3://"
 # gcs buckets should start with "gs://"
-# wasb buckets should start with "wasb" just to help Airflow select correct 
handler
+# wasb buckets should start with "wasb"
+# just to help Airflow select correct handler
 REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
 
+ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST')
+
+END_OF_LOG_MARK = 'end_of_log'
+
 DEFAULT_LOGGING_CONFIG = {
     'version': 1,
     'disable_existing_loggers': False,
@@ -145,7 +152,18 @@ REMOTE_HANDLERS = {
             'filename_template': PROCESSOR_FILENAME_TEMPLATE,
             'delete_local_copy': False,
         },
-    }
+    },
+    'elasticsearch': {
+        'task': {
+            'class': 
'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'log_id_template': LOG_ID_TEMPLATE,
+            'filename_template': FILENAME_TEMPLATE,
+            'end_of_log_mark': END_OF_LOG_MARK,
+            'host': ELASTICSEARCH_HOST,
+        },
+    },
 }
 
 REMOTE_LOGGING = conf.get('core', 'remote_logging')
@@ -156,3 +174,5 @@ elif REMOTE_LOGGING and 
REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
         DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
 elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
         DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
+elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
+        
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index e8da82f..1748c42 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -35,8 +35,8 @@ dags_folder = {AIRFLOW_HOME}/dags
 # This path must be absolute
 base_log_folder = {AIRFLOW_HOME}/logs
 
-# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
-# must supply an Airflow connection id that provides access to the storage
+# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic 
Search.
+# Users must supply an Airflow connection id that provides access to the 
storage
 # location. If remote_logging is set to true, see UPDATING.md for additional
 # configuration requirements.
 remote_logging = False
@@ -486,3 +486,6 @@ api_rev = v3
 [admin]
 # UI to hide sensitive variable fields when set to True
 hide_sensitive_variable_fields = True
+
+[elasticsearch]
+elasticsearch_host =

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index afcacd1..9406150 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -857,7 +857,7 @@ class TaskInstance(Base, LoggingMixin):
         self.init_on_load()
         # Is this TaskInstance being currently running within `airflow run 
--raw`.
         # Not persisted to the database so only valid for the current process
-        self.is_raw = False
+        self.raw = False
 
     @reconstructor
     def init_on_load(self):
@@ -1956,8 +1956,8 @@ class TaskInstance(Base, LoggingMixin):
         """
         Sets the log context.
         """
-        self._set_context(self)
         self.raw = raw
+        self._set_context(self)
 
 
 class TaskFail(Base):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 9e6a439..c28851b 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -31,6 +31,8 @@ import subprocess
 import sys
 import warnings
 
+from jinja2 import Template
+
 from airflow import configuration
 from airflow.exceptions import AirflowException
 
@@ -223,6 +225,13 @@ def reap_process_group(pid, log, sig=signal.SIGTERM,
                 log.error("Process %s (%s) could not be killed. Giving up.", 
p, p.pid)
 
 
+def parse_template_string(template_string):
+    if "{{" in template_string:  # jinja mode
+        return None, Template(template_string)
+    else:
+        return template_string, None
+
+
 class AirflowImporter(object):
     """
     Importer that dynamically loads a class and module from its parent. This

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/utils/log/es_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/es_task_handler.py 
b/airflow/utils/log/es_task_handler.py
new file mode 100644
index 0000000..76dacfe
--- /dev/null
+++ b/airflow/utils/log/es_task_handler.py
@@ -0,0 +1,183 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Using `from elasticsearch import *` would break elasticseach mocking used in 
unit test.
+import elasticsearch
+import pendulum
+from elasticsearch_dsl import Search
+
+from airflow.utils import timezone
+from airflow.utils.helpers import parse_template_string
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
+    PAGE = 0
+    MAX_LINE_PER_PAGE = 1000
+
+    """
+    ElasticsearchTaskHandler is a python log handler that
+    reads logs from Elasticsearch. Note logs are not directly
+    indexed into Elasticsearch. Instead, it flushes logs
+    into local files. Additional software setup is required
+    to index the log into Elasticsearch, such as using
+    Filebeat and Logstash.
+    To efficiently query and sort Elasticsearch results, we assume each
+    log message has a field `log_id` consists of ti primary keys:
+    `log_id = {dag_id}-{task_id}-{execution_date}-{try_number}`
+    Log messages with specific log_id are sorted based on `offset`,
+    which is a unique integer indicates log message's order.
+    Timestamp here are unreliable because multiple log messages
+    might have the same timestamp.
+    """
+
+    def __init__(self, base_log_folder, filename_template,
+                 log_id_template, end_of_log_mark,
+                 host='localhost:9200'):
+        """
+        :param base_log_folder: base folder to store logs locally
+        :param log_id_template: log id template
+        :param host: Elasticsearch host name
+        """
+        super(ElasticsearchTaskHandler, self).__init__(
+            base_log_folder, filename_template)
+        self.closed = False
+
+        self.log_id_template, self.log_id_jinja_template = \
+            parse_template_string(log_id_template)
+
+        self.client = elasticsearch.Elasticsearch([host])
+
+        self.mark_end_on_close = True
+        self.end_of_log_mark = end_of_log_mark
+
+    def _render_log_id(self, ti, try_number):
+        if self.log_id_jinja_template:
+            jinja_context = ti.get_template_context()
+            jinja_context['try_number'] = try_number
+            return self.log_id_jinja_template.render(**jinja_context)
+
+        return self.log_id_template.format(dag_id=ti.dag_id,
+                                           task_id=ti.task_id,
+                                           execution_date=ti
+                                           .execution_date.isoformat(),
+                                           try_number=try_number)
+
+    def _read(self, ti, try_number, metadata=None):
+        """
+        Endpoint for streaming log.
+        :param ti: task instance object
+        :param try_number: try_number of the task instance
+        :param metadata: log metadata,
+                         can be used for steaming log reading and auto-tailing.
+        :return a list of log documents and metadata.
+        """
+        if not metadata:
+            metadata = {'offset': 0}
+        if 'offset' not in metadata:
+            metadata['offset'] = 0
+
+        offset = metadata['offset']
+        log_id = self._render_log_id(ti, try_number)
+
+        logs = self.es_read(log_id, offset)
+
+        next_offset = offset if not logs else logs[-1].offset
+
+        metadata['offset'] = next_offset
+        # end_of_log_mark may contain characters like '\n' which is needed to
+        # have the log uploaded but will not be stored in elasticsearch.
+        metadata['end_of_log'] = False if not logs \
+            else logs[-1].message == self.end_of_log_mark.strip()
+
+        cur_ts = pendulum.now()
+        # Assume end of log after not receiving new log for 5 min,
+        # as executor heartbeat is 1 min and there might be some
+        # delay before Elasticsearch makes the log available.
+        if 'last_log_timestamp' in metadata:
+            last_log_ts = timezone.parse(metadata['last_log_timestamp'])
+            if cur_ts.diff(last_log_ts).in_minutes() >= 5:
+                metadata['end_of_log'] = True
+
+        if offset != next_offset or 'last_log_timestamp' not in metadata:
+            metadata['last_log_timestamp'] = str(cur_ts)
+
+        message = '\n'.join([log.message for log in logs])
+
+        return message, metadata
+
+    def es_read(self, log_id, offset):
+        """
+        Returns the logs matching log_id in Elasticsearch and next offset.
+        Returns '' if no log is found or there was an error.
+        :param log_id: the log_id of the log to read.
+        :type log_id: str
+        :param offset: the offset start to read log from.
+        :type offset: str
+        """
+
+        # Offset is the unique key for sorting logs given log_id.
+        s = Search(using=self.client) \
+            .query('match', log_id=log_id) \
+            .sort('offset')
+
+        s = s.filter('range', offset={'gt': offset})
+
+        logs = []
+        if s.count() != 0:
+            try:
+
+                logs = s[self.MAX_LINE_PER_PAGE * 
self.PAGE:self.MAX_LINE_PER_PAGE] \
+                    .execute()
+            except Exception as e:
+                msg = 'Could not read log with log_id: {}, ' \
+                      'error: {}'.format(log_id, str(e))
+                self.log.exception(msg)
+
+        return logs
+
+    def set_context(self, ti):
+        super(ElasticsearchTaskHandler, self).set_context(ti)
+        self.mark_end_on_close = not ti.raw
+
+    def close(self):
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        if not self.mark_end_on_close:
+            self.closed = True
+            return
+
+        # Case which context of the handler was not set.
+        if self.handler is None:
+            self.closed = True
+            return
+
+        # Reopen the file stream, because FileHandler.close() would be called
+        # first in logging.shutdown() and the stream in it would be set to 
None.
+        if self.handler.stream is None or self.handler.stream.closed:
+            self.handler.stream = self.handler._open()
+
+        # Mark the end of file using end of log mark,
+        # so we know where to stop while auto-tailing.
+        self.handler.stream.write(self.end_of_log_mark)
+
+        super(ElasticsearchTaskHandler, self).close()
+
+        self.closed = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/utils/log/file_processor_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_processor_handler.py 
b/airflow/utils/log/file_processor_handler.py
index f6d8d93..9897afc 100644
--- a/airflow/utils/log/file_processor_handler.py
+++ b/airflow/utils/log/file_processor_handler.py
@@ -16,9 +16,8 @@ import errno
 import logging
 import os
 
-from jinja2 import Template
-
 from airflow import configuration as conf
+from airflow.utils.helpers import parse_template_string
 from datetime import datetime
 
 
@@ -38,11 +37,8 @@ class FileProcessorHandler(logging.Handler):
         self.handler = None
         self.base_log_folder = base_log_folder
         self.dag_dir = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
-        self.filename_template = filename_template
-        self.filename_jinja_template = None
-
-        if "{{" in self.filename_template: #jinja mode
-            self.filename_jinja_template = Template(self.filename_template)
+        self.filename_template, self.filename_jinja_template = \
+            parse_template_string(filename_template)
 
         self._cur_date = datetime.today()
         if not os.path.exists(self._get_log_directory()):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/utils/log/file_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index a67f445..adec5ab 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -16,11 +16,10 @@ import logging
 import os
 import requests
 
-from jinja2 import Template
-
 from airflow import configuration as conf
 from airflow.configuration import AirflowConfigException
 from airflow.utils.file import mkdirs
+from airflow.utils.helpers import parse_template_string
 
 
 class FileTaskHandler(logging.Handler):
@@ -39,11 +38,8 @@ class FileTaskHandler(logging.Handler):
         super(FileTaskHandler, self).__init__()
         self.handler = None
         self.local_base = base_log_folder
-        self.filename_template = filename_template
-        self.filename_jinja_template = None
-
-        if "{{" in self.filename_template: #jinja mode
-            self.filename_jinja_template = Template(self.filename_template)
+        self.filename_template, self.filename_jinja_template = \
+            parse_template_string(filename_template)
 
     def set_context(self, ti):
         """
@@ -78,13 +74,15 @@ class FileTaskHandler(logging.Handler):
                                              
execution_date=ti.execution_date.isoformat(),
                                              try_number=try_number)
 
-    def _read(self, ti, try_number):
+    def _read(self, ti, try_number, metadata=None):
         """
         Template method that contains custom logic of reading
         logs given the try_number.
         :param ti: task instance record
         :param try_number: current try_number to read log from
-        :return: log message as a string
+        :param metadata: log metadata,
+                         can be used for steaming log reading and auto-tailing.
+        :return: log message as a string and metadata.
         """
         # Task instance here might be different from task instance when
         # initializing the handler. Thus explicitly getting log location
@@ -127,14 +125,16 @@ class FileTaskHandler(logging.Handler):
             except Exception as e:
                 log += "*** Failed to fetch log file from worker. 
{}\n".format(str(e))
 
-        return log
+        return log, {'end_of_log': True}
 
-    def read(self, task_instance, try_number=None):
+    def read(self, task_instance, try_number=None, metadata=None):
         """
         Read logs of given task instance from local machine.
         :param task_instance: task instance object
         :param try_number: task instance try_number to read logs from. If None
                            it returns all logs separated by try_number
+        :param metadata: log metadata,
+                         can be used for steaming log reading and auto-tailing.
         :return: a list of logs
         """
         # Task instance increments its try number when it starts to run.
@@ -154,10 +154,13 @@ class FileTaskHandler(logging.Handler):
             try_numbers = [try_number]
 
         logs = [''] * len(try_numbers)
+        metadatas = [{}] * len(try_numbers)
         for i, try_number in enumerate(try_numbers):
-            logs[i] += self._read(task_instance, try_number)
+            log, metadata = self._read(task_instance, try_number, metadata)
+            logs[i] += log
+            metadatas[i] = metadata
 
-        return logs
+        return logs, metadatas
 
     def _init_file(self, ti):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py 
b/airflow/utils/log/gcs_task_handler.py
index 3b83c8c..e69d739 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -60,7 +60,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         # log path to upload log files into GCS and read from the
         # remote location.
         self.log_relative_path = self._render_filename(ti, ti.try_number)
-        self.upload_on_close = not ti.is_raw
+        self.upload_on_close = not ti.raw
 
     def close(self):
         """
@@ -89,12 +89,14 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         # Mark closed so we don't double write if close is called twice
         self.closed = True
 
-    def _read(self, ti, try_number):
+    def _read(self, ti, try_number, metadata=None):
         """
         Read logs of given task instance and try_number from GCS.
         If failed, read the log from task instance host machine.
         :param ti: task instance object
         :param try_number: task instance try_number to read logs from
+        :param metadata: log metadata,
+                         can be used for steaming log reading and auto-tailing.
         """
         # Explicitly getting log relative path is necessary as the given
         # task instance might be different than task instance passed in
@@ -112,7 +114,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             self.log.error(log)
             log += super(GCSTaskHandler, self)._read(ti, try_number)
 
-        return log
+        return log, {'end_of_log': True}
 
     def gcs_read(self, remote_log_location):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py 
b/airflow/utils/log/s3_task_handler.py
index b3acf3a..31e8ef8 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -55,7 +55,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         # Local location and remote location is needed to open and
         # upload local log file to S3 remote storage.
         self.log_relative_path = self._render_filename(ti, ti.try_number)
-        self.upload_on_close = not ti.is_raw
+        self.upload_on_close = not ti.raw
 
     def close(self):
         """
@@ -84,12 +84,14 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         # Mark closed so we don't double write if close is called twice
         self.closed = True
 
-    def _read(self, ti, try_number):
+    def _read(self, ti, try_number, metadata=None):
         """
         Read logs of given task instance and try_number from S3 remote storage.
         If failed, read the log from task instance host machine.
         :param ti: task instance object
         :param try_number: task instance try_number to read logs from
+        :param metadata: log metadata,
+                         can be used for steaming log reading and auto-tailing.
         """
         # Explicitly getting log relative path is necessary as the given
         # task instance might be different than task instance passed in
@@ -107,7 +109,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         else:
             log = super(S3TaskHandler, self)._read(ti, try_number)
 
-        return log
+        return log, {'end_of_log': True}
 
     def s3_log_exists(self, remote_log_location):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/utils/log/wasb_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/wasb_task_handler.py 
b/airflow/utils/log/wasb_task_handler.py
index 4784b10..dfb261c 100644
--- a/airflow/utils/log/wasb_task_handler.py
+++ b/airflow/utils/log/wasb_task_handler.py
@@ -1,175 +1,177 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import os
-import shutil
-
-from airflow import configuration
-from airflow.contrib.hooks.wasb_hook import WasbHook
-from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.log.file_task_handler import FileTaskHandler
-from azure.common import AzureHttpError
-
-
-class WasbTaskHandler(FileTaskHandler, LoggingMixin):
-    """
-    WasbTaskHandler is a python log handler that handles and reads
-    task instance logs. It extends airflow FileTaskHandler and
-    uploads to and reads from Wasb remote storage.
-    """
-
-    def __init__(self, base_log_folder, wasb_log_folder, wasb_container,
-                 filename_template, delete_local_copy):
-        super(WasbTaskHandler, self).__init__(base_log_folder, 
filename_template)
-        self.wasb_container = wasb_container
-        self.remote_base = wasb_log_folder
-        self.log_relative_path = ''
-        self._hook = None
-        self.closed = False
-        self.upload_on_close = True
-        self.delete_local_copy = delete_local_copy
-
-    def _build_hook(self):
-        remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
-        try:
-            return WasbHook(remote_conn_id)
-        except AzureHttpError:
-            self.log.error(
-                'Could not create an WasbHook with connection id "%s". '
-                'Please make sure that airflow[azure] is installed and '
-                'the Wasb connection exists.', remote_conn_id
-            )
-
-    @property
-    def hook(self):
-        if self._hook is None:
-            self._hook = self._build_hook()
-        return self._hook
-
-    def set_context(self, ti):
-        super(WasbTaskHandler, self).set_context(ti)
-        # Local location and remote location is needed to open and
-        # upload local log file to Wasb remote storage.
-        self.log_relative_path = self._render_filename(ti, ti.try_number)
-        self.upload_on_close = not ti.is_raw
-
-    def close(self):
-        """
-        Close and upload local log file to remote storage Wasb.
-        """
-        # When application exit, system shuts down all handlers by
-        # calling close method. Here we check if logger is already
-        # closed to prevent uploading the log to remote storage multiple
-        # times when `logging.shutdown` is called.
-        if self.closed:
-            return
-
-        super(WasbTaskHandler, self).close()
-
-        if not self.upload_on_close:
-            return
-
-        local_loc = os.path.join(self.local_base, self.log_relative_path)
-        remote_loc = os.path.join(self.remote_base, self.log_relative_path)
-        if os.path.exists(local_loc):
-            # read log and remove old logs to get just the latest additions
-            with open(local_loc, 'r') as logfile:
-                log = logfile.read()
-            self.wasb_write(log, remote_loc, append=True)
-
-            if self.delete_local_copy:
-                shutil.rmtree(os.path.dirname(local_loc))
-        # Mark closed so we don't double write if close is called twice
-        self.closed = True
-
-    def _read(self, ti, try_number):
-        """
-        Read logs of given task instance and try_number from Wasb remote 
storage.
-        If failed, read the log from task instance host machine.
-        :param ti: task instance object
-        :param try_number: task instance try_number to read logs from
-        """
-        # Explicitly getting log relative path is necessary as the given
-        # task instance might be different than task instance passed in
-        # in set_context method.
-        log_relative_path = self._render_filename(ti, try_number)
-        remote_loc = os.path.join(self.remote_base, log_relative_path)
-
-        if self.wasb_log_exists(remote_loc):
-            # If Wasb remote file exists, we do not fetch logs from task 
instance
-            # local machine even if there are errors reading remote logs, as
-            # returned remote_log will contain error messages.
-            remote_log = self.wasb_read(remote_loc, return_error=True)
-            log = '*** Reading remote log from {}.\n{}\n'.format(
-                remote_loc, remote_log)
-        else:
-            log = super(WasbTaskHandler, self)._read(ti, try_number)
-
-        return log
-
-    def wasb_log_exists(self, remote_log_location):
-        """
-        Check if remote_log_location exists in remote storage
-        :param remote_log_location: log's location in remote storage
-        :return: True if location exists else False
-        """
-        try:
-            return self.hook.check_for_blob(self.wasb_container, 
remote_log_location)
-        except Exception:
-            pass
-        return False
-
-    def wasb_read(self, remote_log_location, return_error=False):
-        """
-        Returns the log found at the remote_log_location. Returns '' if no
-        logs are found or there is an error.
-        :param remote_log_location: the log's location in remote storage
-        :type remote_log_location: string (path)
-        :param return_error: if True, returns a string error message if an
-            error occurs. Otherwise returns '' when an error occurs.
-        :type return_error: bool
-        """
-        try:
-            return self.hook.read_file(self.wasb_container, 
remote_log_location)
-        except AzureHttpError:
-            msg = 'Could not read logs from {}'.format(remote_log_location)
-            self.log.exception(msg)
-            # return error if needed
-            if return_error:
-                return msg
-
-    def wasb_write(self, log, remote_log_location, append=True):
-        """
-        Writes the log to the remote_log_location. Fails silently if no hook
-        was created.
-        :param log: the log to write to the remote_log_location
-        :type log: string
-        :param remote_log_location: the log's location in remote storage
-        :type remote_log_location: string (path)
-        :param append: if False, any existing log file is overwritten. If True,
-            the new log is appended to any existing logs.
-        :type append: bool
-        """
-        if append and self.wasb_log_exists(remote_log_location):
-            old_log = self.wasb_read(remote_log_location)
-            log = '\n'.join([old_log, log]) if old_log else log
-
-        try:
-            self.hook.load_string(
-                log,
-                self.wasb_container,
-                remote_log_location,
-            )
-        except AzureHttpError:
-            self.log.exception('Could not write logs to %s',
-                               remote_log_location)
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import shutil
+
+from airflow import configuration
+from airflow.contrib.hooks.wasb_hook import WasbHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from azure.common import AzureHttpError
+
+
+class WasbTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    WasbTaskHandler is a python log handler that handles and reads
+    task instance logs. It extends airflow FileTaskHandler and
+    uploads to and reads from Wasb remote storage.
+    """
+
+    def __init__(self, base_log_folder, wasb_log_folder, wasb_container,
+                 filename_template, delete_local_copy):
+        super(WasbTaskHandler, self).__init__(base_log_folder, 
filename_template)
+        self.wasb_container = wasb_container
+        self.remote_base = wasb_log_folder
+        self.log_relative_path = ''
+        self._hook = None
+        self.closed = False
+        self.upload_on_close = True
+        self.delete_local_copy = delete_local_copy
+
+    def _build_hook(self):
+        remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
+        try:
+            return WasbHook(remote_conn_id)
+        except AzureHttpError:
+            self.log.error(
+                'Could not create an WasbHook with connection id "%s". '
+                'Please make sure that airflow[azure] is installed and '
+                'the Wasb connection exists.', remote_conn_id
+            )
+
+    @property
+    def hook(self):
+        if self._hook is None:
+            self._hook = self._build_hook()
+        return self._hook
+
+    def set_context(self, ti):
+        super(WasbTaskHandler, self).set_context(ti)
+        # Local location and remote location is needed to open and
+        # upload local log file to Wasb remote storage.
+        self.log_relative_path = self._render_filename(ti, ti.try_number)
+        self.upload_on_close = not ti.raw
+
+    def close(self):
+        """
+        Close and upload local log file to remote storage Wasb.
+        """
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        super(WasbTaskHandler, self).close()
+
+        if not self.upload_on_close:
+            return
+
+        local_loc = os.path.join(self.local_base, self.log_relative_path)
+        remote_loc = os.path.join(self.remote_base, self.log_relative_path)
+        if os.path.exists(local_loc):
+            # read log and remove old logs to get just the latest additions
+            with open(local_loc, 'r') as logfile:
+                log = logfile.read()
+            self.wasb_write(log, remote_loc, append=True)
+
+            if self.delete_local_copy:
+                shutil.rmtree(os.path.dirname(local_loc))
+        # Mark closed so we don't double write if close is called twice
+        self.closed = True
+
+    def _read(self, ti, try_number, metadata=None):
+        """
+        Read logs of given task instance and try_number from Wasb remote 
storage.
+        If failed, read the log from task instance host machine.
+        :param ti: task instance object
+        :param try_number: task instance try_number to read logs from
+        :param metadata: log metadata,
+                         can be used for steaming log reading and auto-tailing.
+        """
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        log_relative_path = self._render_filename(ti, try_number)
+        remote_loc = os.path.join(self.remote_base, log_relative_path)
+
+        if self.wasb_log_exists(remote_loc):
+            # If Wasb remote file exists, we do not fetch logs from task 
instance
+            # local machine even if there are errors reading remote logs, as
+            # returned remote_log will contain error messages.
+            remote_log = self.wasb_read(remote_loc, return_error=True)
+            log = '*** Reading remote log from {}.\n{}\n'.format(
+                remote_loc, remote_log)
+        else:
+            log = super(WasbTaskHandler, self)._read(ti, try_number)
+
+        return log, {'end_of_log': True}
+
+    def wasb_log_exists(self, remote_log_location):
+        """
+        Check if remote_log_location exists in remote storage
+        :param remote_log_location: log's location in remote storage
+        :return: True if location exists else False
+        """
+        try:
+            return self.hook.check_for_blob(self.wasb_container, 
remote_log_location)
+        except Exception:
+            pass
+        return False
+
+    def wasb_read(self, remote_log_location, return_error=False):
+        """
+        Returns the log found at the remote_log_location. Returns '' if no
+        logs are found or there is an error.
+        :param remote_log_location: the log's location in remote storage
+        :type remote_log_location: string (path)
+        :param return_error: if True, returns a string error message if an
+            error occurs. Otherwise returns '' when an error occurs.
+        :type return_error: bool
+        """
+        try:
+            return self.hook.read_file(self.wasb_container, 
remote_log_location)
+        except AzureHttpError:
+            msg = 'Could not read logs from {}'.format(remote_log_location)
+            self.log.exception(msg)
+            # return error if needed
+            if return_error:
+                return msg
+
+    def wasb_write(self, log, remote_log_location, append=True):
+        """
+        Writes the log to the remote_log_location. Fails silently if no hook
+        was created.
+        :param log: the log to write to the remote_log_location
+        :type log: string
+        :param remote_log_location: the log's location in remote storage
+        :type remote_log_location: string (path)
+        :param append: if False, any existing log file is overwritten. If True,
+            the new log is appended to any existing logs.
+        :type append: bool
+        """
+        if append and self.wasb_log_exists(remote_log_location):
+            old_log = self.wasb_read(remote_log_location)
+            log = '\n'.join([old_log, log]) if old_log else log
+
+        try:
+            self.hook.load_string(
+                log,
+                self.wasb_container,
+                remote_log_location,
+            )
+        except AzureHttpError:
+            self.log.exception('Could not write logs to %s',
+                               remote_log_location)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py 
b/airflow/www/api/experimental/endpoints.py
index ec1ac5b..f72b5ec 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -11,23 +11,21 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import airflow.api
+from flask import (
+    g, Blueprint, jsonify, request, url_for
+)
 
+import airflow.api
+from airflow.api.common.experimental import delete_dag as delete
 from airflow.api.common.experimental import pool as pool_api
 from airflow.api.common.experimental import trigger_dag as trigger
-from airflow.api.common.experimental import delete_dag as delete
 from airflow.api.common.experimental.get_task import get_task
 from airflow.api.common.experimental.get_task_instance import get_task_instance
 from airflow.exceptions import AirflowException
-from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils import timezone
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.www.app import csrf
 
-from flask import (
-    g, Markup, Blueprint, redirect, jsonify, abort,
-    request, current_app, send_file, url_for
-)
-
 _log = LoggingMixin().log
 
 requires_authentication = airflow.api.api_auth.requires_authentication
@@ -63,8 +61,8 @@ def trigger_dag(dag_id):
         except ValueError:
             error_message = (
                 'Given execution date, {}, could not be identified '
-                'as a date. Example date format: 2015-11-16T14:34:15+00:00'
-                .format(execution_date))
+                'as a date. Example date format: 
2015-11-16T14:34:15+00:00'.format(
+                    execution_date))
             _log.info(error_message)
             response = jsonify({'error': error_message})
             response.status_code = 400
@@ -128,7 +126,9 @@ def task_info(dag_id, task_id):
     return jsonify(fields)
 
 
-@api_experimental.route('/dags/<string:dag_id>/dag_runs/<string:execution_date>/tasks/<string:task_id>',
 methods=['GET'])
+@api_experimental.route(
+    
'/dags/<string:dag_id>/dag_runs/<string:execution_date>/tasks/<string:task_id>',
+    methods=['GET'])
 @requires_authentication
 def task_instance_info(dag_id, execution_date, task_id):
     """
@@ -144,8 +144,8 @@ def task_instance_info(dag_id, execution_date, task_id):
     except ValueError:
         error_message = (
             'Given execution date, {}, could not be identified '
-            'as a date. Example date format: 2015-11-16T14:34:15+00:00'
-            .format(execution_date))
+            'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
+                execution_date))
         _log.info(error_message)
         response = jsonify({'error': error_message})
         response.status_code = 400

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/www/templates/airflow/ti_log.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/ti_log.html 
b/airflow/www/templates/airflow/ti_log.html
index 03c0ed3..2615c45 100644
--- a/airflow/www/templates/airflow/ti_log.html
+++ b/airflow/www/templates/airflow/ti_log.html
@@ -1,40 +1,141 @@
 {#
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
 
-    http://www.apache.org/licenses/LICENSE-2.0
+http://www.apache.org/licenses/LICENSE-2.0
 
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 
 #}
 {% extends "airflow/task_instance.html" %}
 {% block title %}Airflow - DAGs{% endblock %}
 
 {% block body %}
-  {{ super() }}
-  <h4>{{ title }}</h4>
-  <ul class="nav nav-pills" role="tablist">
-    {% for log in logs %}
-      <li role="presentation" class="{{ 'active' if loop.last else '' }}">
-        <a href="#{{ loop.index }}" aria-controls="{{ loop.index }}" 
role="tab" data-toggle="tab">
-          {{ loop.index }}
-        </a>
-      </li>
-    {% endfor %}
-  </ul>
-  <div class="tab-content">
-    {% for log in logs %}
-      <div role="tabpanel" class="tab-pane {{ 'active' if loop.last else '' 
}}" id="{{ loop.index }}">
-        <pre id="attempt-{{ loop.index }}">{{ log }}</pre>
-      </div>
-    {% endfor %}
+{{ super() }}
+<h4>{{ title }}</h4>
+<ul class="nav nav-pills" role="tablist">
+  {% for log in logs %}
+  <li role="presentation" class="{{ 'active' if loop.last else '' }}">
+    <a href="#{{ loop.index }}" aria-controls="{{ loop.index }}" role="tab" 
data-toggle="tab">
+      {{ loop.index }}
+    </a>
+  </li>
+  {% endfor %}
+</ul>
+<div class="tab-content">
+  {% for log in logs %}
+  <div role="tabpanel" class="tab-pane {{ 'active' if loop.last else '' }}" 
id="{{ loop.index }}">
+    <img id="loading-{{ loop.index }}" style="margin-top:0%; margin-left:50%; 
height:50px; width:50px; position: absolute;"
+         alt="spinner" src="{{ url_for('static', filename='loading.gif') }}">
+    <pre><code id="try-{{ loop.index }}">{{ log }}</code></pre>
   </div>
+  {% endfor %}
+  </div>
+{% endblock %}
+{% block tail %}
+{{ lib.form_js() }}
+{{ super() }}
+<script>
+    // TODO: make those constants configurable.
+    // Time interval to wait before next log fetching. Default 2s.
+    const DELAY = 2e3;
+    // Distance away from page bottom to enable auto tailing.
+    const AUTO_TAILING_OFFSET = 30;
+    // Animation speed for auto tailing log display.
+    const ANIMATION_SPEED = 1000;
+    // Total number of tabs to show.
+    const TOTAL_ATTEMPTS = "{{ logs|length }}";
+
+    // Recursively fetch logs from flask endpoint.
+    function recurse(delay=DELAY) {
+      return new Promise((resolve) => setTimeout(resolve, delay));
+    }
+
+    // Enable auto tailing only when users scroll down to the bottom
+    // of the page. This prevent auto tailing the page if users want
+    // to view earlier rendered messages.
+    function checkAutoTailingCondition() {
+      const docHeight = $(document).height();
+      console.debug($(window).scrollTop())
+      console.debug($(window).height())
+      console.debug($(document).height())
+      return $(window).scrollTop() != 0
+             && ($(window).scrollTop() + $(window).height() > docHeight - 
AUTO_TAILING_OFFSET);
+    }
+
+    // Streaming log with auto-tailing.
+    function autoTailingLog(try_number, metadata=null, auto_tailing=false) {
+      console.debug("Auto-tailing log for dag_id: {{ dag_id }}, task_id: {{ 
task_id }}, \
+       execution_date: {{ execution_date }}, try_number: " + try_number + ", 
metadata: " + JSON.stringify(metadata));
+
+      return Promise.resolve(
+        $.ajax({
+          url: "{{ url_for("airflow.get_logs_with_metadata") }}",
+          data: {
+            dag_id: "{{ dag_id }}",
+            task_id: "{{ task_id }}",
+            execution_date: "{{ execution_date }}",
+            try_number: try_number,
+            metadata: JSON.stringify(metadata),
+          },
+        })).then(res => {
+          // Stop recursive call to backend when error occurs.
+          if (!res) {
+            document.getElementById("loading-"+try_number).style.display = 
"none";
+            return;
+          }
+          // res.error is a boolean
+          // res.message is the log itself or the error message
+          if (res.error) {
+            if (res.message) {
+              console.error("Error while retrieving log: " + res.message);
+            }
+            document.getElementById("loading-"+try_number).style.display = 
"none";
+            return;
+          }
+
+          if (res.message) {
+            // Auto scroll window to the end if current window location is 
near the end.
+            if(auto_tailing && checkAutoTailingCondition()) {
+              var should_scroll = true
+            }
+            // The message may contain HTML, so either have to escape it or 
write it as text.
+            document.getElementById(`try-${try_number}`).textContent += 
res.message + "\n";
+            // Auto scroll window to the end if current window location is 
near the end.
+            if(should_scroll) {
+              $("html, body").animate({ scrollTop: $(document).height() }, 
ANIMATION_SPEED);
+            }
+          }
+
+          if (res.metadata.end_of_log) {
+            document.getElementById("loading-"+try_number).style.display = 
"none";
+            return;
+          }
+          return recurse().then(() => autoTailingLog(
+            try_number, res.metadata, auto_tailing));
+        });
+    }
+    $(document).ready(function() {
+      // Lazily load all past task instance logs.
+      // TODO: We only need to have recursive queries for
+      // latest running task instances. Currently it does not
+      // work well with ElasticSearch because ES query only
+      // returns at most 10k documents. We want the ability
+      // to display all logs in the front-end.
+      // An optimization here is to render from latest attempt.
+      for(let i = TOTAL_ATTEMPTS; i >= 1; i--) {
+        // Only auto_tailing the page when streaming the latest attempt.
+        autoTailingLog(i, null, auto_tailing=(i == TOTAL_ATTEMPTS));
+      }
+    });
+
+</script>
 {% endblock %}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 83e567a..799e1bd 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -39,8 +39,8 @@ import sqlalchemy as sqla
 from sqlalchemy import or_, desc, and_, union_all
 
 from flask import (
-    abort, redirect, url_for, request, Markup, Response, current_app, 
render_template,
-    make_response)
+    abort, jsonify, redirect, url_for, request, Markup, Response,
+    current_app, render_template, make_response)
 from flask_admin import BaseView, expose, AdminIndexView
 from flask_admin.contrib.sqla import ModelView
 from flask_admin.actions import action
@@ -69,7 +69,6 @@ from airflow import models
 from airflow import settings
 from airflow.api.common.experimental.mark_tasks import set_dag_run_state
 from airflow.exceptions import AirflowException
-from airflow.settings import Session
 from airflow.models import XCom, DagRun
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
 
@@ -719,6 +718,66 @@ class Airflow(BaseView):
             form=form,
             title=title, )
 
+    @expose('/get_logs_with_metadata')
+    @login_required
+    @wwwutils.action_logging
+    @provide_session
+    def get_logs_with_metadata(self, session=None):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        dttm = pendulum.parse(execution_date)
+        try_number = int(request.args.get('try_number'))
+        # metadata may be None
+        metadata = request.args.get('metadata')
+        if metadata:
+            metadata = json.loads(metadata)
+
+        # Convert string datetime into actual datetime
+        try:
+            execution_date = timezone.parse(execution_date)
+        except ValueError:
+            error_message = (
+                'Given execution date, {}, could not be identified '
+                'as a date. Example date format: 
2015-11-16T14:34:15+00:00'.format(
+                    execution_date))
+            response = jsonify({'error': error_message})
+            response.status_code = 400
+
+            return response
+
+        logger = logging.getLogger('airflow.task')
+        task_log_reader = conf.get('core', 'task_log_reader')
+        handler = next((handler for handler in logger.handlers
+                        if handler.name == task_log_reader), None)
+
+        ti = session.query(models.TaskInstance).filter(
+            models.TaskInstance.dag_id == dag_id,
+            models.TaskInstance.task_id == task_id,
+            models.TaskInstance.execution_date == dttm).first()
+        try:
+            if ti is None:
+                logs = ["*** Task instance did not exist in the DB\n"]
+                metadata['end_of_log'] = True
+            else:
+                dag = dagbag.get_dag(dag_id)
+                ti.task = dag.get_task(ti.task_id)
+                logs, metadatas = handler.read(ti, try_number, 
metadata=metadata)
+                metadata = metadatas[0]
+            for i, log in enumerate(logs):
+                if PY2 and not isinstance(log, unicode):
+                    logs[i] = log.decode('utf-8')
+            message = logs[0]
+            return jsonify(message=message, metadata=metadata)
+        except AttributeError as e:
+            error_message = ["Task log handler {} does not support read 
logs.\n{}\n"
+                             .format(task_log_reader, str(e))]
+            metadata['end_of_log'] = True
+            return jsonify(message=error_message, error=True, 
metadata=metadata)
+        except AirflowException as e:
+            metadata['end_of_log'] = True
+            return jsonify(message=str(e), error=True, metadata=metadata)
+
     @expose('/log')
     @login_required
     @wwwutils.action_logging
@@ -730,31 +789,17 @@ class Airflow(BaseView):
         dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
+
         ti = session.query(models.TaskInstance).filter(
             models.TaskInstance.dag_id == dag_id,
             models.TaskInstance.task_id == task_id,
             models.TaskInstance.execution_date == dttm).first()
-        if ti is None:
-            logs = ["*** Task instance did not exist in the DB\n"]
-        else:
-            logger = logging.getLogger('airflow.task')
-            task_log_reader = conf.get('core', 'task_log_reader')
-            handler = next((handler for handler in logger.handlers
-                            if handler.name == task_log_reader), None)
-            try:
-                ti.task = dag.get_task(ti.task_id)
-                logs = handler.read(ti)
-            except AttributeError as e:
-                logs = ["Task log handler {} does not support read 
logs.\n{}\n" \
-                            .format(task_log_reader, str(e))]
-
-        for i, log in enumerate(logs):
-            if PY2 and not isinstance(log, unicode):
-                logs[i] = log.decode('utf-8')
 
+        logs = [''] * (ti.next_try_number - 1 if ti is not None else 0)
         return self.render(
             'airflow/ti_log.html',
-            logs=logs, dag=dag, title="Log by attempts", task_id=task_id,
+            logs=logs, dag=dag, title="Log by attempts",
+            dag_id=dag.dag_id, task_id=task_id,
             execution_date=execution_date, form=form)
 
     @expose('/task')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/licenses/LICENSE-elasticmock.txt
----------------------------------------------------------------------
diff --git a/licenses/LICENSE-elasticmock.txt b/licenses/LICENSE-elasticmock.txt
new file mode 100644
index 0000000..ea757fb
--- /dev/null
+++ b/licenses/LICENSE-elasticmock.txt
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2016 Marcos Cardoso
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 71a9113..538cb2c 100644
--- a/setup.py
+++ b/setup.py
@@ -119,6 +119,10 @@ doc = [
 ]
 docker = ['docker-py>=1.6.0']
 druid = ['pydruid>=0.4.1']
+elasticsearch = [
+    'elasticsearch>=5.0.0,<6.0.0',
+    'elasticsearch-dsl>=5.0.0,<6.0.0'
+]
 emr = ['boto3>=1.0.0']
 gcp_api = [
     'httplib2',
@@ -193,7 +197,7 @@ devel_hadoop = devel_minreq + hive + hdfs + webhdfs + 
kerberos
 devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + 
oracle +
              docker + ssh + kubernetes + celery + azure + redis + gcp_api + 
datadog +
              zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
-             druid + snowflake)
+             druid + snowflake + elasticsearch)
 
 # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
 if PY3:
@@ -274,6 +278,7 @@ def do_setup():
             'doc': doc,
             'docker': docker,
             'druid': druid,
+            'elasticsearch': elasticsearch,
             'emr': emr,
             'gcp_api': gcp_api,
             'github_enterprise': github_enterprise,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/tests/utils/log/elasticmock/__init__.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/elasticmock/__init__.py 
b/tests/utils/log/elasticmock/__init__.py
new file mode 100644
index 0000000..73c4879
--- /dev/null
+++ b/tests/utils/log/elasticmock/__init__.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# The MIT License (MIT)
+#
+# Copyright (c) 2016 Marcos Cardoso
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in 
all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+from functools import wraps
+
+from elasticsearch.client import _normalize_hosts
+from mock import patch
+
+from .fake_elasticsearch import FakeElasticsearch
+
+ELASTIC_INSTANCES = {}
+
+
+def _get_elasticmock(hosts=None, *args, **kwargs):
+    host = _normalize_hosts(hosts)[0]
+    elastic_key = '{0}:{1}'.format(
+        host.get('host', 'localhost'), host.get('port', 9200)
+    )
+
+    if elastic_key in ELASTIC_INSTANCES:
+        connection = ELASTIC_INSTANCES.get(elastic_key)
+    else:
+        connection = FakeElasticsearch()
+        ELASTIC_INSTANCES[elastic_key] = connection
+    return connection
+
+
+def elasticmock(f):
+    @wraps(f)
+    def decorated(*args, **kwargs):
+        ELASTIC_INSTANCES.clear()
+        with patch('elasticsearch.Elasticsearch', _get_elasticmock):
+            result = f(*args, **kwargs)
+        return result
+    return decorated

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/tests/utils/log/elasticmock/fake_elasticsearch.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/elasticmock/fake_elasticsearch.py 
b/tests/utils/log/elasticmock/fake_elasticsearch.py
new file mode 100644
index 0000000..0e29e91
--- /dev/null
+++ b/tests/utils/log/elasticmock/fake_elasticsearch.py
@@ -0,0 +1,295 @@
+# -*- coding: utf-8 -*-
+#
+# The MIT License (MIT)
+#
+# Copyright (c) 2016 Marcos Cardoso
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in 
all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+import json
+import six
+
+from elasticsearch import Elasticsearch
+from elasticsearch.client.utils import query_params
+from elasticsearch.exceptions import NotFoundError
+
+from .utilities import get_random_id
+
+
+class FakeElasticsearch(Elasticsearch):
+    __documents_dict = None
+
+    def __init__(self):
+        self.__documents_dict = {}
+
+    @query_params()
+    def ping(self, params=None):
+        return True
+
+    @query_params()
+    def info(self, params=None):
+        return {
+            'status': 200,
+            'cluster_name': 'elasticmock',
+            'version':
+                {
+                    'lucene_version': '4.10.4',
+                    'build_hash': '00f95f4ffca6de89d68b7ccaf80d148f1f70e4d4',
+                    'number': '1.7.5',
+                    'build_timestamp': '2016-02-02T09:55:30Z',
+                    'build_snapshot': False
+                },
+            'name': 'Nightwatch',
+            'tagline': 'You Know, for Search'
+        }
+
+    @query_params('consistency', 'op_type', 'parent', 'refresh', 'replication',
+                  'routing', 'timeout', 'timestamp', 'ttl', 'version', 
'version_type')
+    def index(self, index, doc_type, body, id=None, params=None):
+        if index not in self.__documents_dict:
+            self.__documents_dict[index] = list()
+
+        if id is None:
+            id = get_random_id()
+
+        version = 1
+
+        self.__documents_dict[index].append({
+            '_type': doc_type,
+            '_id': id,
+            '_source': body,
+            '_index': index,
+            '_version': version
+        })
+
+        return {
+            '_type': doc_type,
+            '_id': id,
+            'created': True,
+            '_version': version,
+            '_index': index
+        }
+
+    @query_params('parent', 'preference', 'realtime', 'refresh', 'routing')
+    def exists(self, index, doc_type, id, params=None):
+        result = False
+        if index in self.__documents_dict:
+            for document in self.__documents_dict[index]:
+                if document.get('_id') == id and document.get('_type') == 
doc_type:
+                    result = True
+                    break
+        return result
+
+    @query_params('_source', '_source_exclude', '_source_include', 'fields',
+                  'parent', 'preference', 'realtime', 'refresh', 'routing', 
'version',
+                  'version_type')
+    def get(self, index, id, doc_type='_all', params=None):
+        result = None
+        if index in self.__documents_dict:
+            for document in self.__documents_dict[index]:
+                if document.get('_id') == id:
+                    if doc_type == '_all':
+                        result = document
+                        break
+                    else:
+                        if document.get('_type') == doc_type:
+                            result = document
+                            break
+
+        if result:
+            result['found'] = True
+        else:
+            error_data = {
+                '_index': index,
+                '_type': doc_type,
+                '_id': id,
+                'found': False
+            }
+            raise NotFoundError(404, json.dumps(error_data))
+
+        return result
+
+    @query_params('_source', '_source_exclude', '_source_include', 'parent',
+                  'preference', 'realtime', 'refresh', 'routing', 'version',
+                  'version_type')
+    def get_source(self, index, doc_type, id, params=None):
+        document = self.get(index=index, doc_type=doc_type, id=id, 
params=params)
+        return document.get('_source')
+
+    @query_params('_source', '_source_exclude', '_source_include',
+                  'allow_no_indices', 'analyze_wildcard', 'analyzer', 
'default_operator',
+                  'df', 'expand_wildcards', 'explain', 'fielddata_fields', 
'fields',
+                  'from_', 'ignore_unavailable', 'lenient', 
'lowercase_expanded_terms',
+                  'preference', 'q', 'request_cache', 'routing', 'scroll', 
'search_type',
+                  'size', 'sort', 'stats', 'suggest_field', 'suggest_mode',
+                  'suggest_size', 'suggest_text', 'terminate_after', 'timeout',
+                  'track_scores', 'version')
+    def count(self, index=None, doc_type=None, body=None, params=None):
+        searchable_indexes = self._normalize_index_to_list(index)
+        searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
+
+        i = 0
+        for searchable_index in searchable_indexes:
+            for document in self.__documents_dict[searchable_index]:
+                if searchable_doc_types\
+                   and document.get('_type') not in searchable_doc_types:
+                    continue
+                i += 1
+        result = {
+            'count': i,
+            '_shards': {
+                'successful': 1,
+                'failed': 0,
+                'total': 1
+            }
+        }
+
+        return result
+
+    @query_params('_source', '_source_exclude', '_source_include',
+                  'allow_no_indices', 'analyze_wildcard', 'analyzer', 
'default_operator',
+                  'df', 'expand_wildcards', 'explain', 'fielddata_fields', 
'fields',
+                  'from_', 'ignore_unavailable', 'lenient', 
'lowercase_expanded_terms',
+                  'preference', 'q', 'request_cache', 'routing', 'scroll', 
'search_type',
+                  'size', 'sort', 'stats', 'suggest_field', 'suggest_mode',
+                  'suggest_size', 'suggest_text', 'terminate_after', 'timeout',
+                  'track_scores', 'version')
+    def search(self, index=None, doc_type=None, body=None, params=None):
+        searchable_indexes = self._normalize_index_to_list(index)
+        searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
+
+        matches = []
+        for searchable_index in searchable_indexes:
+            for document in self.__documents_dict[searchable_index]:
+                if searchable_doc_types\
+                   and document.get('_type') not in searchable_doc_types:
+                    continue
+                matches.append(document)
+
+        result = {
+            'hits': {
+                'total': len(matches),
+                'max_score': 1.0
+            },
+            '_shards': {
+                # Simulate indexes with 1 shard each
+                'successful': len(searchable_indexes),
+                'failed': 0,
+                'total': len(searchable_indexes)
+            },
+            'took': 1,
+            'timed_out': False
+        }
+
+        hits = []
+        for match in matches:
+            match['_score'] = 1.0
+            hits.append(match)
+        result['hits']['hits'] = hits
+
+        return result
+
+    @query_params('consistency', 'parent', 'refresh', 'replication', 'routing',
+                  'timeout', 'version', 'version_type')
+    def delete(self, index, doc_type, id, params=None):
+
+        found = False
+
+        if index in self.__documents_dict:
+            for document in self.__documents_dict[index]:
+                if document.get('_type') == doc_type and document.get('_id') 
== id:
+                    found = True
+                    self.__documents_dict[index].remove(document)
+                    break
+
+        result_dict = {
+            'found': found,
+            '_index': index,
+            '_type': doc_type,
+            '_id': id,
+            '_version': 1,
+        }
+
+        if found:
+            return result_dict
+        else:
+            raise NotFoundError(404, json.dumps(result_dict))
+
+    @query_params('allow_no_indices', 'expand_wildcards', 'ignore_unavailable',
+                  'preference', 'routing')
+    def suggest(self, body, index=None, params=None):
+        if index is not None and index not in self.__documents_dict:
+            raise NotFoundError(404, 'IndexMissingException[[{0}] 
missing]'.format(index))
+
+        result_dict = {}
+        for key, value in body.items():
+            text = value.get('text')
+            suggestion = int(text) + 1 if isinstance(text, int) \
+                else '{0}_suggestion'.format(text)
+            result_dict[key] = [
+                {
+                    'text': text,
+                    'length': 1,
+                    'options': [
+                        {
+                            'text': suggestion,
+                            'freq': 1,
+                            'score': 1.0
+                        }
+                    ],
+                    'offset': 0
+                }
+            ]
+        return result_dict
+
+    def _normalize_index_to_list(self, index):
+        # Ensure to have a list of index
+        if index is None:
+            searchable_indexes = self.__documents_dict.keys()
+        elif isinstance(index, six.string_types):
+            searchable_indexes = [index]
+        elif isinstance(index, list):
+            searchable_indexes = index
+        else:
+            # Is it the correct exception to use ?
+            raise ValueError("Invalid param 'index'")
+
+        # Check index(es) exists
+        for searchable_index in searchable_indexes:
+            if searchable_index not in self.__documents_dict:
+                raise NotFoundError(404,
+                                    'IndexMissingException[[{0}] missing]'
+                                    .format(searchable_index))
+
+        return searchable_indexes
+
+    @staticmethod
+    def _normalize_doc_type_to_list(doc_type):
+        # Ensure to have a list of index
+        if doc_type is None:
+            searchable_doc_types = []
+        elif isinstance(doc_type, six.string_types):
+            searchable_doc_types = [doc_type]
+        elif isinstance(doc_type, list):
+            searchable_doc_types = doc_type
+        else:
+            # Is it the correct exception to use ?
+            raise ValueError("Invalid param 'index'")
+
+        return searchable_doc_types

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/tests/utils/log/elasticmock/utilities/__init__.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/elasticmock/utilities/__init__.py 
b/tests/utils/log/elasticmock/utilities/__init__.py
new file mode 100644
index 0000000..19438ba
--- /dev/null
+++ b/tests/utils/log/elasticmock/utilities/__init__.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+#
+# The MIT License (MIT)
+#
+# Copyright (c) 2016 Marcos Cardoso
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in 
all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+import random
+import string
+
+DEFAULT_ELASTICSEARCH_ID_SIZE = 20
+CHARSET_FOR_ELASTICSEARCH_ID = string.ascii_letters + string.digits
+
+
+def get_random_id(size=DEFAULT_ELASTICSEARCH_ID_SIZE):
+    return ''.join(random.choice(CHARSET_FOR_ELASTICSEARCH_ID) for _ in 
range(size))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ec38ba95/tests/utils/log/test_es_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_es_task_handler.py 
b/tests/utils/log/test_es_task_handler.py
new file mode 100644
index 0000000..cd98717
--- /dev/null
+++ b/tests/utils/log/test_es_task_handler.py
@@ -0,0 +1,255 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import shutil
+import unittest
+
+import elasticsearch
+import mock
+import pendulum
+
+from airflow import configuration
+from airflow.models import TaskInstance, DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.log.es_task_handler import ElasticsearchTaskHandler
+from airflow.utils.state import State
+from airflow.utils.timezone import datetime
+from .elasticmock import elasticmock
+
+
+class TestElasticsearchTaskHandler(unittest.TestCase):
+    DAG_ID = 'dag_for_testing_file_task_handler'
+    TASK_ID = 'task_for_testing_file_log_handler'
+    EXECUTION_DATE = datetime(2016, 1, 1)
+    LOG_ID = 'dag_for_testing_file_task_handler-task_for_testing' \
+             '_file_log_handler-2016-01-01T00:00:00+00:00-1'
+
+    @elasticmock
+    def setUp(self):
+        super(TestElasticsearchTaskHandler, self).setUp()
+        self.local_log_location = 'local/log/location'
+        self.filename_template = '{try_number}.log'
+        self.log_id_template = 
'{dag_id}-{task_id}-{execution_date}-{try_number}'
+        self.end_of_log_mark = 'end_of_log\n'
+        self.es_task_handler = ElasticsearchTaskHandler(
+            self.local_log_location,
+            self.filename_template,
+            self.log_id_template,
+            self.end_of_log_mark
+        )
+
+        self.es = elasticsearch.Elasticsearch(hosts=[{'host': 'localhost', 
'port': 9200}])
+        self.index_name = 'test_index'
+        self.doc_type = 'log'
+        self.test_message = 'some random stuff'
+        self.body = {'message': self.test_message, 'log_id': self.LOG_ID,
+                     'offset': 1}
+
+        self.es.index(index=self.index_name, doc_type=self.doc_type,
+                      body=self.body, id=1)
+
+        configuration.load_test_config()
+        self.dag = DAG(self.DAG_ID, start_date=self.EXECUTION_DATE)
+        task = DummyOperator(task_id=self.TASK_ID, dag=self.dag)
+        self.ti = TaskInstance(task=task, execution_date=self.EXECUTION_DATE)
+        self.ti.try_number = 1
+        self.ti.state = State.RUNNING
+        self.addCleanup(self.dag.clear)
+
+    def tearDown(self):
+        shutil.rmtree(self.local_log_location.split(os.path.sep)[0], 
ignore_errors=True)
+
+    def test_client(self):
+        self.assertIsInstance(self.es_task_handler.client, 
elasticsearch.Elasticsearch)
+
+    def test_read(self):
+        ts = pendulum.now()
+        logs, metadatas = self.es_task_handler.read(self.ti,
+                                                    1,
+                                                    {'offset': 0,
+                                                     'last_log_timestamp': 
str(ts),
+                                                     'end_of_log': False})
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual(self.test_message, logs[0])
+        self.assertFalse(metadatas[0]['end_of_log'])
+        self.assertEqual(1, metadatas[0]['offset'])
+        self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > 
ts)
+
+    def test_read_with_none_meatadata(self):
+        logs, metadatas = self.es_task_handler.read(self.ti, 1)
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual(self.test_message, logs[0])
+        self.assertFalse(metadatas[0]['end_of_log'])
+        self.assertEqual(1, metadatas[0]['offset'])
+        self.assertTrue(
+            timezone.parse(metadatas[0]['last_log_timestamp']) < 
pendulum.now())
+
+    def test_read_nonexistent_log(self):
+        ts = pendulum.now()
+        # In ElasticMock, search is going to return all documents with 
matching index
+        # and doc_type regardless of match filters, so we delete the log entry 
instead
+        # of making a new TaskInstance to query.
+        self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
+        logs, metadatas = self.es_task_handler.read(self.ti,
+                                                    1,
+                                                    {'offset': 0,
+                                                     'last_log_timestamp': 
str(ts),
+                                                     'end_of_log': False})
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual([''], logs)
+        self.assertFalse(metadatas[0]['end_of_log'])
+        self.assertEqual(0, metadatas[0]['offset'])
+        # last_log_timestamp won't change if no log lines read.
+        self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) == 
ts)
+
+    def test_read_with_empty_metadata(self):
+        ts = pendulum.now()
+        logs, metadatas = self.es_task_handler.read(self.ti, 1, {})
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual(self.test_message, logs[0])
+        self.assertFalse(metadatas[0]['end_of_log'])
+        # offset should be initialized to 0 if not provided.
+        self.assertEqual(1, metadatas[0]['offset'])
+        # last_log_timestamp will be initialized using log reading time
+        # if not last_log_timestamp is provided.
+        self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > 
ts)
+
+        # case where offset is missing but metadata not empty.
+        self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
+        logs, metadatas = self.es_task_handler.read(self.ti, 1, {'end_of_log': 
False})
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual([''], logs)
+        self.assertFalse(metadatas[0]['end_of_log'])
+        # offset should be initialized to 0 if not provided.
+        self.assertEqual(0, metadatas[0]['offset'])
+        # last_log_timestamp will be initialized using log reading time
+        # if not last_log_timestamp is provided.
+        self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > 
ts)
+
+    def test_read_timeout(self):
+        ts = pendulum.now().subtract(minutes=5)
+
+        self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
+        logs, metadatas = self.es_task_handler.read(self.ti,
+                                                    1,
+                                                    {'offset': 0,
+                                                     'last_log_timestamp': 
str(ts),
+                                                     'end_of_log': False})
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual([''], logs)
+        self.assertTrue(metadatas[0]['end_of_log'])
+        # offset should be initialized to 0 if not provided.
+        self.assertEqual(0, metadatas[0]['offset'])
+        self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) == 
ts)
+
+    def test_read_raises(self):
+        with mock.patch.object(self.es_task_handler.log, 'exception') as 
mock_exception:
+            with mock.patch("elasticsearch_dsl.Search.execute") as 
mock_execute:
+                mock_execute.side_effect = Exception('Failed to read')
+                logs, metadatas = self.es_task_handler.read(self.ti, 1)
+            msg = "Could not read log with log_id: {}".format(self.LOG_ID)
+            mock_exception.assert_called_once()
+            args, kwargs = mock_exception.call_args
+            self.assertIn(msg, args[0])
+
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual([''], logs)
+        self.assertFalse(metadatas[0]['end_of_log'])
+        self.assertEqual(0, metadatas[0]['offset'])
+
+    def test_set_context(self):
+        self.es_task_handler.set_context(self.ti)
+        self.assertTrue(self.es_task_handler.mark_end_on_close)
+
+    def test_close(self):
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.close()
+        with open(os.path.join(self.local_log_location,
+                               self.filename_template.format(try_number=1)),
+                  'r') as log_file:
+            self.assertIn(self.end_of_log_mark, log_file.read())
+        self.assertTrue(self.es_task_handler.closed)
+
+    def test_close_no_mark_end(self):
+        self.ti.raw = True
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.close()
+        with open(os.path.join(self.local_log_location,
+                               self.filename_template.format(try_number=1)),
+                  'r') as log_file:
+            self.assertNotIn(self.end_of_log_mark, log_file.read())
+        self.assertTrue(self.es_task_handler.closed)
+
+    def test_close_closed(self):
+        self.es_task_handler.closed = True
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.close()
+        with open(os.path.join(self.local_log_location,
+                               self.filename_template.format(try_number=1)),
+                  'r') as log_file:
+            self.assertEqual(0, len(log_file.read()))
+
+    def test_close_with_no_handler(self):
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.handler = None
+        self.es_task_handler.close()
+        with open(os.path.join(self.local_log_location,
+                               self.filename_template.format(try_number=1)),
+                  'r') as log_file:
+            self.assertEqual(0, len(log_file.read()))
+        self.assertTrue(self.es_task_handler.closed)
+
+    def test_close_with_no_stream(self):
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.handler.stream = None
+        self.es_task_handler.close()
+        with open(os.path.join(self.local_log_location,
+                               self.filename_template.format(try_number=1)),
+                  'r') as log_file:
+            self.assertIn(self.end_of_log_mark, log_file.read())
+        self.assertTrue(self.es_task_handler.closed)
+
+        self.es_task_handler.set_context(self.ti)
+        self.es_task_handler.handler.stream.close()
+        self.es_task_handler.close()
+        with open(os.path.join(self.local_log_location,
+                               self.filename_template.format(try_number=1)),
+                  'r') as log_file:
+            self.assertIn(self.end_of_log_mark, log_file.read())
+        self.assertTrue(self.es_task_handler.closed)
+
+    def test_render_log_id(self):
+        expected_log_id = 'dag_for_testing_file_task_handler-' \
+                          
'task_for_testing_file_log_handler-2016-01-01T00:00:00+00:00-1'
+        log_id = self.es_task_handler._render_log_id(self.ti, 1)
+        self.assertEqual(expected_log_id, log_id)
+
+        # Switch to use jinja template.
+        self.es_task_handler = ElasticsearchTaskHandler(
+            self.local_log_location,
+            self.filename_template,
+            '{{ ti.dag_id }}-{{ ti.task_id }}-{{ ts }}-{{ try_number }}',
+            self.end_of_log_mark
+        )
+        log_id = self.es_task_handler._render_log_id(self.ti, 1)
+        self.assertEqual(expected_log_id, log_id)


Reply via email to