Repository: incubator-airflow
Updated Branches:
  refs/heads/master 50702d061 -> 974b75e93


[AIRFLOW-880] Make webserver serve logs in a sane way for remote logs

There are two major problems with remote logs in
Airflow right now:
1. Lack of Complete Logs: Remote logs should be
the default instead of the log that is only loaded
if the local log is not present, because the
remote log will have the logs for all of the tries
of a task, whereas the local log is only
guaranteed to have the most recent one
2. Lack of Consistency: The logs returned will
always be the same from all the webservers (right
now different logs can be returned if a webserver
has a log vs doesn't, and there can be different
logs between webservers that have the log).
Right now log functionality is not consistent when
it comes to remote logs.

This PR addresses these issues by ALWAYS reading
from remote logs and then also reading logs from
worker hosts if the task is already running (to
get in-flight logs). The one issue with this PR is
that if a task is running on a worker it already
ran on, then you will get duplicate logs for all
of the previous runs of the task that already
completed (delimited by something like "***
Getting remote logs" "*** Getting logs on local
worker"). This can be fixed later (either by
streaming logs to the log server or by creating a
proper abstraction for multiple task instance
runs), and is still better than the current
behavior (duplicate info is better than omitting
previous task instance logs from the webserver
log).

Testing Done:
Tested on staging cluster:
- Task instance doesn't exist
- Task instance is running and has previous remote
log
- Task instance is running for first time
- Task instance has completed and has remote log

Closes #2086 from aoen/ddavydov/fix_s3_logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/974b75e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/974b75e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/974b75e9

Branch: refs/heads/master
Commit: 974b75e93ec827b5f45c273f141fee9e188d46ee
Parents: 50702d0
Author: Dan Davydov <[email protected]>
Authored: Tue Feb 21 13:49:40 2017 -0800
Committer: Dan Davydov <[email protected]>
Committed: Tue Feb 21 13:49:42 2017 -0800

----------------------------------------------------------------------
 airflow/www/views.py | 178 +++++++++++++++++++++++++---------------------
 1 file changed, 97 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/974b75e9/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 8c9b2df..0b1db61 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -711,79 +711,94 @@ class Airflow(BaseView):
         loc = loc.format(**locals())
         log = ""
         TI = models.TaskInstance
-        session = Session()
         dttm = dateutil.parser.parse(execution_date)
+        form = DateTimeForm(data={'execution_date': dttm})
+        session = Session()
         ti = session.query(TI).filter(
             TI.dag_id == dag_id, TI.task_id == task_id,
             TI.execution_date == dttm).first()
-        dttm = dateutil.parser.parse(execution_date)
-        form = DateTimeForm(data={'execution_date': dttm})
-
-        if ti:
-            host = ti.hostname
-            log_loaded = False
 
-            if os.path.exists(loc):
-                try:
-                    f = open(loc)
-                    log += "".join(f.readlines())
-                    f.close()
-                    log_loaded = True
-                except:
-                    log = "*** Failed to load local log file: 
{0}.\n".format(loc)
-            else:
-                WORKER_LOG_SERVER_PORT = \
-                    conf.get('celery', 'WORKER_LOG_SERVER_PORT')
-                url = os.path.join(
-                    "http://{host}:{WORKER_LOG_SERVER_PORT}/log";, log_relative
-                    ).format(**locals())
-                log += "*** Log file isn't local.\n"
-                log += "*** Fetching here: {url}\n".format(**locals())
-                try:
-                    import requests
-                    timeout = None  # No timeout
-                    try:
-                        timeout = conf.getint('webserver', 
'log_fetch_timeout_sec')
-                    except (AirflowConfigException, ValueError):
-                        pass
-
-                    response = requests.get(url, timeout=timeout)
-                    response.raise_for_status()
-                    log += '\n' + response.text
-                    log_loaded = True
-                except:
-                    log += "*** Failed to fetch log file from 
worker.\n".format(
-                        **locals())
-
-            if not log_loaded:
-                # load remote logs
-                remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
-                remote_log = os.path.join(remote_log_base, log_relative)
-                log += '\n*** Reading remote logs...\n'
+        if ti is None:
+            log = "*** Task instance did not exist in the DB\n"
+        else:
+            # load remote logs
+            remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
+            remote_log_loaded = False
+            if remote_log_base:
+                remote_log_path = os.path.join(remote_log_base, log_relative)
+                remote_log = ""
+
+                # Only display errors reading the log if the task completed or 
ran at least
+                # once before (otherwise there won't be any remote log stored).
+                ti_execution_completed = ti.state in {State.SUCCESS, 
State.FAILED}
+                ti_ran_more_than_once = ti.try_number > 1
+                surface_log_retrieval_errors = (
+                    ti_execution_completed or ti_ran_more_than_once)
 
                 # S3
-                if remote_log.startswith('s3:/'):
-                    log += log_utils.S3Log().read(remote_log, 
return_error=True)
-
+                if remote_log_path.startswith('s3:/'):
+                    remote_log += log_utils.S3Log().read(
+                        remote_log_path, 
return_error=surface_log_retrieval_errors)
+                    remote_log_loaded = True
                 # GCS
-                elif remote_log.startswith('gs:/'):
-                    log += log_utils.GCSLog().read(remote_log, 
return_error=True)
-
+                elif remote_log_path.startswith('gs:/'):
+                    remote_log += log_utils.GCSLog().read(
+                        remote_log_path, 
return_error=surface_log_retrieval_errors)
+                    remote_log_loaded = True
                 # unsupported
-                elif remote_log:
-                    log += '*** Unsupported remote log location.'
-
-            session.commit()
-            session.close()
+                else:
+                    remote_log += '*** Unsupported remote log location.'
+
+                if remote_log:
+                    log += ('*** Reading remote log from {}.\n{}\n'.format(
+                        remote_log_path, remote_log))
+
+            # We only want to display the
+            # local logs while the task is running if a remote log 
configuration is set up
+            # since the logs will be transfered there after the run completes.
+            # TODO(aoen): One problem here is that if a task is running on a 
worker it
+            # already ran on, then duplicate logs will be printed for all of 
the previous
+            # runs of the task that already completed since they will have 
been printed as
+            # part of the remote log section above. This can be fixed either 
by streaming
+            # logs to the log servers as tasks are running, or by creating a 
proper
+            # abstraction for multiple task instance runs).
+            if not remote_log_loaded or ti.state == State.RUNNING:
+                if os.path.exists(loc):
+                    try:
+                        f = open(loc)
+                        log += "*** Reading local log.\n" + 
"".join(f.readlines())
+                        f.close()
+                    except:
+                        log = "*** Failed to load local log file: 
{0}.\n".format(loc)
+                else:
+                    WORKER_LOG_SERVER_PORT = \
+                        conf.get('celery', 'WORKER_LOG_SERVER_PORT')
+                    url = os.path.join(
+                        "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log";, 
log_relative
+                    ).format(**locals())
+                    log += "*** Log file isn't local.\n"
+                    log += "*** Fetching here: {url}\n".format(**locals())
+                    try:
+                        import requests
+                        timeout = None  # No timeout
+                        try:
+                            timeout = conf.getint('webserver', 
'log_fetch_timeout_sec')
+                        except (AirflowConfigException, ValueError):
+                            pass
+
+                        response = requests.get(url, timeout=timeout)
+                        response.raise_for_status()
+                        log += '\n' + response.text
+                    except:
+                        log += "*** Failed to fetch log file from 
worker.\n".format(
+                            **locals())
 
         if PY2 and not isinstance(log, unicode):
             log = log.decode('utf-8')
 
-        title = "Log"
-
         return self.render(
             'airflow/ti_code.html',
-            code=log, dag=dag, title=title, task_id=task_id,
+            code=log, dag=dag, title="Log", task_id=task_id,
             execution_date=execution_date, form=form)
 
     @expose('/task')
@@ -824,7 +839,7 @@ class Airflow(BaseView):
             if not attr_name.startswith('_'):
                 attr = getattr(task, attr_name)
                 if type(attr) != type(self.task) and \
-                                attr_name not in attr_renderer:
+                        attr_name not in attr_renderer:
                     task_attrs.append((attr_name, str(attr)))
 
         # Color coding the special attributes that are code
@@ -1172,7 +1187,7 @@ class Airflow(BaseView):
         max_date = max(dates) if dates else None
 
         tis = dag.get_task_instances(
-                session, start_date=min_date, end_date=base_date)
+            session, start_date=min_date, end_date=base_date)
         task_instances = {}
         for ti in tis:
             tid = alchemy_to_dict(ti)
@@ -1213,10 +1228,10 @@ class Airflow(BaseView):
             return {
                 'name': task.task_id,
                 'instances': [
-                        set_duration(task_instances.get((task.task_id, d))) or 
{
-                            'execution_date': d.isoformat(),
-                            'task_id': task.task_id
-                        }
+                    set_duration(task_instances.get((task.task_id, d))) or {
+                        'execution_date': d.isoformat(),
+                        'task_id': task.task_id
+                    }
                     for d in dates],
                 children_key: children,
                 'num_dep': len(task.upstream_list),
@@ -1445,9 +1460,9 @@ class Airflow(BaseView):
         chart.buildcontent()
         cum_chart.buildcontent()
         s_index = cum_chart.htmlcontent.rfind('});')
-        cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index]
-                                 + "$( document ).trigger('chartload')"
-                                 + cum_chart.htmlcontent[s_index:])
+        cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index] +
+                                 "$( document ).trigger('chartload')" +
+                                 cum_chart.htmlcontent[s_index:])
 
         return self.render(
             'airflow/duration_chart.html',
@@ -1577,7 +1592,7 @@ class Airflow(BaseView):
                                 y=scale_time_units(y[task.task_id], y_unit))
 
         tis = dag.get_task_instances(
-                session, start_date=min_date, end_date=base_date)
+            session, start_date=min_date, end_date=base_date)
         dates = sorted(list({ti.execution_date for ti in tis}))
         max_date = max([ti.execution_date for ti in tis]) if dates else None
 
@@ -1687,7 +1702,7 @@ class Airflow(BaseView):
                 'status': ti.state,
                 'executionDate': ti.execution_date.isoformat(),
             })
-        states = {ti.state:ti.state for ti in tis}
+        states = {ti.state: ti.state for ti in tis}
         data = {
             'taskNames': [ti.task_id for ti in tis],
             'tasks': tasks,
@@ -1765,6 +1780,7 @@ class Airflow(BaseView):
             flash("{} variable(s) successfully updated.".format(len(d)))
         return redirect('/admin/variable')
 
+
 class HomeView(AdminIndexView):
     @expose("/")
     @login_required
@@ -2058,9 +2074,9 @@ class ChartModelView(wwwutils.DataProfilingMixin, 
AirflowModelView):
             (c.conn_id, c.conn_id)
             for c in (
                 Session().query(models.Connection.conn_id)
-                    .group_by(models.Connection.conn_id)
+                .group_by(models.Connection.conn_id)
             )
-            ]
+        ]
     }
 
     def on_model_change(self, form, model, is_created=True):
@@ -2243,8 +2259,8 @@ class DagRunModelView(ModelViewOnly):
     def action_new_delete(self, ids):
         session = settings.Session()
         deleted = set(session.query(models.DagRun)
-            .filter(models.DagRun.id.in_(ids))
-            .all())
+                      .filter(models.DagRun.id.in_(ids))
+                      .all())
         session.query(models.DagRun)\
             .filter(models.DagRun.id.in_(ids))\
             .delete(synchronize_session='fetch')
@@ -2465,7 +2481,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, 
AirflowModelView):
         formdata = form.data
         if formdata['conn_type'] in ['jdbc', 'google_cloud_platform']:
             extra = {
-                key:formdata[key]
+                key: formdata[key]
                 for key in self.form_extra_fields.keys() if key in formdata}
             model.extra = json.dumps(extra)
 
@@ -2496,7 +2512,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, 
AirflowModelView):
     def on_form_prefill(self, form, id):
         try:
             d = json.loads(form.data.get('extra', '{}'))
-        except Exception as e:
+        except Exception:
             d = {}
 
         for field in list(self.form_extra_fields.keys()):
@@ -2609,9 +2625,9 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView):
         """
         return (
             super(DagModelView, self)
-                .get_query()
-                .filter(or_(models.DagModel.is_active, 
models.DagModel.is_paused))
-                .filter(~models.DagModel.is_subdag)
+            .get_query()
+            .filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
+            .filter(~models.DagModel.is_subdag)
         )
 
     def get_count_query(self):
@@ -2620,7 +2636,7 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView):
         """
         return (
             super(DagModelView, self)
-                .get_count_query()
-                .filter(models.DagModel.is_active)
-                .filter(~models.DagModel.is_subdag)
+            .get_count_query()
+            .filter(models.DagModel.is_active)
+            .filter(~models.DagModel.is_subdag)
         )

Reply via email to