Repository: incubator-airflow Updated Branches: refs/heads/master 06b41fbe1 -> 301ce6b4f
[AIRFLOW-1879] Handle ti log entirely within ti Previously logging was setup outside a TaskInstance, this puts everything inside. Also propery closes the logging. Closes #2837 from bolkedebruin/AIRFLOW-1879 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/301ce6b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/301ce6b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/301ce6b4 Branch: refs/heads/master Commit: 301ce6b4f0da9d01734dd3a0360bff535a8acad5 Parents: 06b41fb Author: Bolke de Bruin <[email protected]> Authored: Wed Dec 6 09:46:53 2017 +0100 Committer: Fokko Driesprong <[email protected]> Committed: Wed Dec 6 09:46:53 2017 +0100 ---------------------------------------------------------------------- airflow/bin/cli.py | 17 +++-------------- airflow/models.py | 6 ++++++ airflow/utils/log/logging_mixin.py | 7 +++++-- 3 files changed, 14 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/301ce6b4/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index c23b272..782e58d 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -362,15 +362,10 @@ def run(args, dag=None): task = dag.get_task(task_id=args.task_id) ti = TaskInstance(task, args.execution_date) ti.refresh_from_db() - - log = logging.getLogger('airflow.task') - if args.raw: - log = logging.getLogger('airflow.task.raw') - - set_context(log, ti) + ti.init_run_context() hostname = socket.getfqdn() - log.info("Running on host %s", hostname) + log.info("Running %s on host %s", ti, hostname) with redirect_stdout(log, logging.INFO), redirect_stderr(log, logging.WARN): if args.local: @@ -428,13 +423,7 @@ def run(args, dag=None): if args.raw: return - # Force the log to flush. The flush is important because we - # might subsequently read from the log to insert into S3 or - # Google cloud storage. Explicitly close the handler is - # needed in order to upload to remote storage services. - for handler in log.handlers: - handler.flush() - handler.close() + logging.shutdown() def task_failed_deps(args): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/301ce6b4/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 22cf9f0..5837363 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1855,6 +1855,12 @@ class TaskInstance(Base, LoggingMixin): TI.state == State.RUNNING ).count() + def init_run_context(self): + """ + Sets the log context. + """ + self._set_context(self) + class TaskFail(Base): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/301ce6b4/airflow/utils/log/logging_mixin.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index 892fae5..03437bf 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -30,8 +30,7 @@ class LoggingMixin(object): Convenience super-class to have a logger configured with the class name """ def __init__(self, context=None): - if context is not None: - set_context(self.log, context) + self._set_context(context) # We want to deprecate the logger property in Airflow 2.0 # The log property is the de facto standard in most programming languages @@ -56,6 +55,10 @@ class LoggingMixin(object): ) return self._log + def _set_context(self, context): + if context is not None: + set_context(self.log, context) + class StreamLogWriter(object): encoding = False
