Repository: incubator-airflow Updated Branches: refs/heads/v1-9-stable 337c1c323 -> f18e25505
[AIRFLOW-1872] Set context for all handlers including parents Previously setting the context was not propagated to the parent loggers. Unfortnately, in case of a non explicitly defined logger the returned logger is shallow, ie. it does not have handlers defined. So to set the context it is required to walk the tree. Closes #2831 from bolkedebruin/fix_logging (cherry picked from commit 406d738b1cf657b5ee6163bc26ab6fdea891576d) Signed-off-by: Bolke de Bruin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f18e2550 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f18e2550 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f18e2550 Branch: refs/heads/v1-9-stable Commit: f18e2550543e455c9701af0995bc393ee6a97b47 Parents: 337c1c3 Author: Bolke de Bruin <[email protected]> Authored: Sat Dec 2 09:56:13 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Sat Dec 2 09:56:42 2017 +0100 ---------------------------------------------------------------------- airflow/bin/cli.py | 11 +++------- airflow/jobs.py | 10 ++------- airflow/task_runner/base_task_runner.py | 2 +- airflow/utils/log/logging_mixin.py | 32 +++++++++++++++++++--------- tests/utils/test_logging_mixin.py | 19 ++++++++++++++++- 5 files changed, 46 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f18e2550/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 6d01293..1001e05 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -54,7 +54,8 @@ from airflow.models import (DagModel, DagBag, TaskInstance, from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) from airflow.utils import db as db_utils -from airflow.utils.log.logging_mixin import LoggingMixin, redirect_stderr, redirect_stdout +from airflow.utils.log.logging_mixin import (LoggingMixin, redirect_stderr, + redirect_stdout, set_context) from airflow.www.app import cached_app from sqlalchemy import func @@ -367,13 +368,7 @@ def run(args, dag=None): if args.raw: log = logging.getLogger('airflow.task.raw') - for handler in log.handlers: - try: - handler.set_context(ti) - except AttributeError: - # Not all handlers need to have context passed in so we ignore - # the error when handlers do not have set_context defined. - pass + set_context(log, ti) hostname = socket.getfqdn() log.info("Running on host %s", hostname) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f18e2550/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index aef28f1..e7fff31 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -54,7 +54,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor, list_py_file_paths) from airflow.utils.db import provide_session, pessimistic_connection_handling from airflow.utils.email import send_email -from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter +from airflow.utils.log.logging_mixin import LoggingMixin, set_context, StreamLogWriter from airflow.utils.state import State Base = models.Base @@ -348,13 +348,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): stdout = StreamLogWriter(log, logging.INFO) stderr = StreamLogWriter(log, logging.WARN) - for handler in log.handlers: - try: - handler.set_context(file_path) - except AttributeError: - # Not all handlers need to have context passed in so we ignore - # the error when handlers do not have set_context defined. - pass + set_context(log, file_path) try: # redirect stdout/stderr to log http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f18e2550/airflow/task_runner/base_task_runner.py ---------------------------------------------------------------------- diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py index 6a07db2..ef4112d 100644 --- a/airflow/task_runner/base_task_runner.py +++ b/airflow/task_runner/base_task_runner.py @@ -38,8 +38,8 @@ class BaseTaskRunner(LoggingMixin): :type local_task_job: airflow.jobs.LocalTaskJob """ # Pass task instance context into log handlers to setup the logger. + super(BaseTaskRunner, self).__init__(local_task_job.task_instance) self._task_instance = local_task_job.task_instance - self.set_log_contexts(self._task_instance) popen_prepend = [] cfg_path = None http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f18e2550/airflow/utils/log/logging_mixin.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index c12bb8b..892fae5 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -29,6 +29,9 @@ class LoggingMixin(object): """ Convenience super-class to have a logger configured with the class name """ + def __init__(self, context=None): + if context is not None: + set_context(self.log, context) # We want to deprecate the logger property in Airflow 2.0 # The log property is the de facto standard in most programming languages @@ -53,16 +56,6 @@ class LoggingMixin(object): ) return self._log - def set_log_contexts(self, task_instance): - """ - Set the context for all handlers of current logger. - """ - for handler in self.log.handlers: - try: - handler.set_context(task_instance) - except AttributeError: - pass - class StreamLogWriter(object): encoding = False @@ -127,3 +120,22 @@ def redirect_stderr(logger, level): sys.stderr = sys.__stderr__ +def set_context(logger, value): + """ + Walks the tree of loggers and tries to set the context for each handler + :param logger: logger + :param value: value to set + """ + _logger = logger + while _logger: + for handler in _logger.handlers: + try: + handler.set_context(value) + except AttributeError: + # Not all handlers need to have context passed in so we ignore + # the error when handlers do not have set_context defined. + pass + if _logger.propagate is True: + _logger = _logger.parent + else: + _logger = None http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f18e2550/tests/utils/test_logging_mixin.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_logging_mixin.py b/tests/utils/test_logging_mixin.py index 52d8b45..a7c260d 100644 --- a/tests/utils/test_logging_mixin.py +++ b/tests/utils/test_logging_mixin.py @@ -17,7 +17,7 @@ import unittest import warnings from airflow.operators.bash_operator import BashOperator -from airflow.utils.log.logging_mixin import StreamLogWriter +from airflow.utils.log.logging_mixin import set_context, StreamLogWriter from tests.test_utils.reset_warning_registry import reset_warning_registry @@ -48,6 +48,23 @@ class TestLoggingMixin(unittest.TestCase): str(warning.message) ) + def test_set_context(self): + handler1 = mock.MagicMock() + handler2 = mock.MagicMock() + parent = mock.MagicMock() + parent.propagate = False + parent.handlers = [handler1, ] + log = mock.MagicMock() + log.handlers = [handler2, ] + log.parent = parent + log.propagate = True + + value = "test" + set_context(log, value) + + handler1.set_context.assert_called_with(value) + handler2.set_context.assert_called_with(value) + def tearDown(self): warnings.resetwarnings()
