Repository: incubator-airflow Updated Branches: refs/heads/master 50702d061 -> 974b75e93
[AIRFLOW-880] Make webserver serve logs in a sane way for remote logs There are two major problems with remote logs in Airflow right now: 1. Lack of Complete Logs: Remote logs should be the default instead of the log that is only loaded if the local log is not present, because the remote log will have the logs for all of the tries of a task, whereas the local log is only guaranteed to have the most recent one 2. Lack of Consistency: The logs returned will always be the same from all the webservers (right now different logs can be returned if a webserver has a log vs doesn't, and there can be different logs between webservers that have the log). Right now log functionality is not consistent when it comes to remote logs. This PR addresses these issues by ALWAYS reading from remote logs and then also reading logs from worker hosts if the task is already running (to get in-flight logs). The one issue with this PR is that if a task is running on a worker it already ran on, then you will get duplicate logs for all of the previous runs of the task that already completed (delimited by something like "*** Getting remote logs" "*** Getting logs on local worker"). This can be fixed later (either by streaming logs to the log server or by creating a proper abstraction for multiple task instance runs), and is still better than the current behavior (duplicate info is better than omitting previous task instance logs from the webserver log). Testing Done: Tested on staging cluster: - Task instance doesn't exist - Task instance is running and has previous remote log - Task instance is running for first time - Task instance has completed and has remote log Closes #2086 from aoen/ddavydov/fix_s3_logging Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/974b75e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/974b75e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/974b75e9 Branch: refs/heads/master Commit: 974b75e93ec827b5f45c273f141fee9e188d46ee Parents: 50702d0 Author: Dan Davydov <[email protected]> Authored: Tue Feb 21 13:49:40 2017 -0800 Committer: Dan Davydov <[email protected]> Committed: Tue Feb 21 13:49:42 2017 -0800 ---------------------------------------------------------------------- airflow/www/views.py | 178 +++++++++++++++++++++++++--------------------- 1 file changed, 97 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/974b75e9/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 8c9b2df..0b1db61 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -711,79 +711,94 @@ class Airflow(BaseView): loc = loc.format(**locals()) log = "" TI = models.TaskInstance - session = Session() dttm = dateutil.parser.parse(execution_date) + form = DateTimeForm(data={'execution_date': dttm}) + session = Session() ti = session.query(TI).filter( TI.dag_id == dag_id, TI.task_id == task_id, TI.execution_date == dttm).first() - dttm = dateutil.parser.parse(execution_date) - form = DateTimeForm(data={'execution_date': dttm}) - - if ti: - host = ti.hostname - log_loaded = False - if os.path.exists(loc): - try: - f = open(loc) - log += "".join(f.readlines()) - f.close() - log_loaded = True - except: - log = "*** Failed to load local log file: {0}.\n".format(loc) - else: - WORKER_LOG_SERVER_PORT = \ - conf.get('celery', 'WORKER_LOG_SERVER_PORT') - url = os.path.join( - "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative - ).format(**locals()) - log += "*** Log file isn't local.\n" - log += "*** Fetching here: {url}\n".format(**locals()) - try: - import requests - timeout = None # No timeout - try: - timeout = conf.getint('webserver', 'log_fetch_timeout_sec') - except (AirflowConfigException, ValueError): - pass - - response = requests.get(url, timeout=timeout) - response.raise_for_status() - log += '\n' + response.text - log_loaded = True - except: - log += "*** Failed to fetch log file from worker.\n".format( - **locals()) - - if not log_loaded: - # load remote logs - remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') - remote_log = os.path.join(remote_log_base, log_relative) - log += '\n*** Reading remote logs...\n' + if ti is None: + log = "*** Task instance did not exist in the DB\n" + else: + # load remote logs + remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') + remote_log_loaded = False + if remote_log_base: + remote_log_path = os.path.join(remote_log_base, log_relative) + remote_log = "" + + # Only display errors reading the log if the task completed or ran at least + # once before (otherwise there won't be any remote log stored). + ti_execution_completed = ti.state in {State.SUCCESS, State.FAILED} + ti_ran_more_than_once = ti.try_number > 1 + surface_log_retrieval_errors = ( + ti_execution_completed or ti_ran_more_than_once) # S3 - if remote_log.startswith('s3:/'): - log += log_utils.S3Log().read(remote_log, return_error=True) - + if remote_log_path.startswith('s3:/'): + remote_log += log_utils.S3Log().read( + remote_log_path, return_error=surface_log_retrieval_errors) + remote_log_loaded = True # GCS - elif remote_log.startswith('gs:/'): - log += log_utils.GCSLog().read(remote_log, return_error=True) - + elif remote_log_path.startswith('gs:/'): + remote_log += log_utils.GCSLog().read( + remote_log_path, return_error=surface_log_retrieval_errors) + remote_log_loaded = True # unsupported - elif remote_log: - log += '*** Unsupported remote log location.' - - session.commit() - session.close() + else: + remote_log += '*** Unsupported remote log location.' + + if remote_log: + log += ('*** Reading remote log from {}.\n{}\n'.format( + remote_log_path, remote_log)) + + # We only want to display the + # local logs while the task is running if a remote log configuration is set up + # since the logs will be transfered there after the run completes. + # TODO(aoen): One problem here is that if a task is running on a worker it + # already ran on, then duplicate logs will be printed for all of the previous + # runs of the task that already completed since they will have been printed as + # part of the remote log section above. This can be fixed either by streaming + # logs to the log servers as tasks are running, or by creating a proper + # abstraction for multiple task instance runs). + if not remote_log_loaded or ti.state == State.RUNNING: + if os.path.exists(loc): + try: + f = open(loc) + log += "*** Reading local log.\n" + "".join(f.readlines()) + f.close() + except: + log = "*** Failed to load local log file: {0}.\n".format(loc) + else: + WORKER_LOG_SERVER_PORT = \ + conf.get('celery', 'WORKER_LOG_SERVER_PORT') + url = os.path.join( + "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log", log_relative + ).format(**locals()) + log += "*** Log file isn't local.\n" + log += "*** Fetching here: {url}\n".format(**locals()) + try: + import requests + timeout = None # No timeout + try: + timeout = conf.getint('webserver', 'log_fetch_timeout_sec') + except (AirflowConfigException, ValueError): + pass + + response = requests.get(url, timeout=timeout) + response.raise_for_status() + log += '\n' + response.text + except: + log += "*** Failed to fetch log file from worker.\n".format( + **locals()) if PY2 and not isinstance(log, unicode): log = log.decode('utf-8') - title = "Log" - return self.render( 'airflow/ti_code.html', - code=log, dag=dag, title=title, task_id=task_id, + code=log, dag=dag, title="Log", task_id=task_id, execution_date=execution_date, form=form) @expose('/task') @@ -824,7 +839,7 @@ class Airflow(BaseView): if not attr_name.startswith('_'): attr = getattr(task, attr_name) if type(attr) != type(self.task) and \ - attr_name not in attr_renderer: + attr_name not in attr_renderer: task_attrs.append((attr_name, str(attr))) # Color coding the special attributes that are code @@ -1172,7 +1187,7 @@ class Airflow(BaseView): max_date = max(dates) if dates else None tis = dag.get_task_instances( - session, start_date=min_date, end_date=base_date) + session, start_date=min_date, end_date=base_date) task_instances = {} for ti in tis: tid = alchemy_to_dict(ti) @@ -1213,10 +1228,10 @@ class Airflow(BaseView): return { 'name': task.task_id, 'instances': [ - set_duration(task_instances.get((task.task_id, d))) or { - 'execution_date': d.isoformat(), - 'task_id': task.task_id - } + set_duration(task_instances.get((task.task_id, d))) or { + 'execution_date': d.isoformat(), + 'task_id': task.task_id + } for d in dates], children_key: children, 'num_dep': len(task.upstream_list), @@ -1445,9 +1460,9 @@ class Airflow(BaseView): chart.buildcontent() cum_chart.buildcontent() s_index = cum_chart.htmlcontent.rfind('});') - cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index] - + "$( document ).trigger('chartload')" - + cum_chart.htmlcontent[s_index:]) + cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index] + + "$( document ).trigger('chartload')" + + cum_chart.htmlcontent[s_index:]) return self.render( 'airflow/duration_chart.html', @@ -1577,7 +1592,7 @@ class Airflow(BaseView): y=scale_time_units(y[task.task_id], y_unit)) tis = dag.get_task_instances( - session, start_date=min_date, end_date=base_date) + session, start_date=min_date, end_date=base_date) dates = sorted(list({ti.execution_date for ti in tis})) max_date = max([ti.execution_date for ti in tis]) if dates else None @@ -1687,7 +1702,7 @@ class Airflow(BaseView): 'status': ti.state, 'executionDate': ti.execution_date.isoformat(), }) - states = {ti.state:ti.state for ti in tis} + states = {ti.state: ti.state for ti in tis} data = { 'taskNames': [ti.task_id for ti in tis], 'tasks': tasks, @@ -1765,6 +1780,7 @@ class Airflow(BaseView): flash("{} variable(s) successfully updated.".format(len(d))) return redirect('/admin/variable') + class HomeView(AdminIndexView): @expose("/") @login_required @@ -2058,9 +2074,9 @@ class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView): (c.conn_id, c.conn_id) for c in ( Session().query(models.Connection.conn_id) - .group_by(models.Connection.conn_id) + .group_by(models.Connection.conn_id) ) - ] + ] } def on_model_change(self, form, model, is_created=True): @@ -2243,8 +2259,8 @@ class DagRunModelView(ModelViewOnly): def action_new_delete(self, ids): session = settings.Session() deleted = set(session.query(models.DagRun) - .filter(models.DagRun.id.in_(ids)) - .all()) + .filter(models.DagRun.id.in_(ids)) + .all()) session.query(models.DagRun)\ .filter(models.DagRun.id.in_(ids))\ .delete(synchronize_session='fetch') @@ -2465,7 +2481,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): formdata = form.data if formdata['conn_type'] in ['jdbc', 'google_cloud_platform']: extra = { - key:formdata[key] + key: formdata[key] for key in self.form_extra_fields.keys() if key in formdata} model.extra = json.dumps(extra) @@ -2496,7 +2512,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): def on_form_prefill(self, form, id): try: d = json.loads(form.data.get('extra', '{}')) - except Exception as e: + except Exception: d = {} for field in list(self.form_extra_fields.keys()): @@ -2609,9 +2625,9 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView): """ return ( super(DagModelView, self) - .get_query() - .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) - .filter(~models.DagModel.is_subdag) + .get_query() + .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) + .filter(~models.DagModel.is_subdag) ) def get_count_query(self): @@ -2620,7 +2636,7 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView): """ return ( super(DagModelView, self) - .get_count_query() - .filter(models.DagModel.is_active) - .filter(~models.DagModel.is_subdag) + .get_count_query() + .filter(models.DagModel.is_active) + .filter(~models.DagModel.is_subdag) )
