This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 845bcbefa57b6ed12be337128073e1d766910030 Author: Kaxil Naik <[email protected]> AuthorDate: Thu Jun 25 13:32:12 2020 +0100 Fix logging issue when running tasks (#9363) (cherry picked from commit 61f4e9e82a0ac0680971eb6298f089a16c32ceb9) --- airflow/bin/cli.py | 31 +++++++- airflow/settings.py | 7 ++ airflow/task/task_runner/standard_task_runner.py | 6 +- tests/cli/test_cli.py | 93 ++++++++++++++++++++++++ tests/dags/test_logging_in_dag.py | 44 +++++++++++ 5 files changed, 178 insertions(+), 3 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index e70ccdd..b3ffea7 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -555,8 +555,35 @@ def run(args, dag=None): if args.interactive: _run(args, dag, ti) else: - with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN): - _run(args, dag, ti) + if settings.DONOT_MODIFY_HANDLERS: + with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN): + _run(args, dag, ti) + else: + # Get all the Handlers from 'airflow.task' logger + # Add these handlers to the root logger so that we can get logs from + # any custom loggers defined in the DAG + airflow_logger_handlers = logging.getLogger('airflow.task').handlers + root_logger = logging.getLogger() + root_logger_handlers = root_logger.handlers + + # Remove all handlers from Root Logger to avoid duplicate logs + for handler in root_logger_handlers: + root_logger.removeHandler(handler) + + for handler in airflow_logger_handlers: + root_logger.addHandler(handler) + root_logger.setLevel(logging.getLogger('airflow.task').level) + + with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN): + _run(args, dag, ti) + + # We need to restore the handlers to the loggers as celery worker process + # can call this command multiple times, + # so if we don't reset this then logs from next task would go to the wrong place + for handler in airflow_logger_handlers: + root_logger.removeHandler(handler) + for handler in root_logger_handlers: + root_logger.addHandler(handler) logging.shutdown() diff --git a/airflow/settings.py b/airflow/settings.py index bf2fb1d..513f192 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -400,3 +400,10 @@ STORE_SERIALIZED_DAGS = conf.getboolean('core', 'store_serialized_dags', fallbac # write rate. MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint( 'core', 'min_serialized_dag_update_interval', fallback=30) + +# If donot_modify_handlers=True, we do not modify logging handlers in task_run command +# If the flag is set to False, we remove all handlers from the root logger +# and add all handlers from 'airflow.task' logger to the root Logger. This is done +# to get all the logs from the print & log statements in the DAG files before a task is run +# The handlers are restored after the task completes execution. +DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 4557af2..8138cfa 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -17,6 +17,7 @@ # specific language governing permissions and limitations # under the License. +"""Standard task runner""" import os import psutil @@ -25,7 +26,7 @@ from setproctitle import setproctitle from airflow.task.task_runner.base_task_runner import BaseTaskRunner from airflow.utils.helpers import reap_process_group -CAN_FORK = hasattr(os, 'fork') +CAN_FORK = hasattr(os, "fork") class StandardTaskRunner(BaseTaskRunner): @@ -73,6 +74,9 @@ class StandardTaskRunner(BaseTaskRunner): # [1:] - remove "airflow" from the start of the command args = parser.parse_args(self._command[1:]) + self.log.info('Running: %s', self._command) + self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id) + proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}" if hasattr(args, "job_id"): proc_title += " {0.job_id}" diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index e7523dc..a2a81ac 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -22,6 +22,7 @@ import io import logging import os +from airflow.configuration import conf from parameterized import parameterized from six import StringIO, PY2 import sys @@ -530,6 +531,98 @@ class TestCLI(unittest.TestCase): ) +class TestLogsfromTaskRunCommand(unittest.TestCase): + + def setUp(self): + self.dag_id = "test_logging_dag" + self.task_id = "test_task" + reset(self.dag_id) + self.execution_date_str = timezone.make_aware(datetime(2017, 1, 1)).isoformat() + self.log_dir = conf.get('core', 'base_log_folder') + self.log_filename = "{}/{}/{}/1.log".format(self.dag_id, self.task_id, self.execution_date_str) + self.ti_log_file_path = os.path.join(self.log_dir, self.log_filename) + self.parser = cli.CLIFactory.get_parser() + try: + os.remove(self.ti_log_file_path) + except OSError: + pass + + def tearDown(self): + reset(self.dag_id) + try: + os.remove(self.ti_log_file_path) + except OSError: + pass + + def assert_log_line(self, text, logs_list, expect_from_logging_mixin=False): + """ + Get Log Line and assert only 1 Entry exists with the given text. Also check that + "logging_mixin" line does not appear in that log line to avoid duplicate loggigng as below: + [2020-06-24 16:47:23,537] {logging_mixin.py:91} INFO - [2020-06-24 16:47:23,536] {python.py:135} + """ + log_lines = [log for log in logs_list if text in log] + self.assertEqual(len(log_lines), 1) + log_line = log_lines[0] + if not expect_from_logging_mixin: + # Logs from print statement still show with logging_mixing as filename + # Example: [2020-06-24 17:07:00,482] {logging_mixin.py:91} INFO - Log from Print statement + self.assertNotIn("logging_mixin.py", log_line) + return log_line + + @unittest.skipIf(not hasattr(os, 'fork'), "Forking not available") + def test_logging_with_run_task(self): + # We are not using self.assertLogs as we want to verify what actually is stored in the Log file + # as that is what gets displayed + + with conf_vars({('core', 'dags_folder'): os.path.join(TEST_DAG_FOLDER, self.dag_id)}): + cli.run(self.parser.parse_args([ + 'run', self.dag_id, self.task_id, '--local', self.execution_date_str])) + + with open(self.ti_log_file_path) as l_file: + logs = l_file.read() + + print(logs) # In case of a test failures this line would show detailed log + logs_list = logs.splitlines() + + self.assertIn("INFO - Started process", logs) + self.assertIn("Subtask {}".format(self.task_id), logs) + self.assertIn("standard_task_runner.py", logs) + self.assertIn("INFO - Running: ['airflow', 'run', '{}', " + "'{}', '{}',".format(self.dag_id, self.task_id, self.execution_date_str), logs) + + self.assert_log_line("Log from DAG Logger", logs_list) + self.assert_log_line("Log from TI Logger", logs_list) + self.assert_log_line("Log from Print statement", logs_list, expect_from_logging_mixin=True) + + self.assertIn("INFO - Marking task as SUCCESS.dag_id={}, task_id={}, " + "execution_date=20170101T000000".format(self.dag_id, self.task_id), logs) + + @mock.patch("airflow.task.task_runner.standard_task_runner.CAN_FORK", False) + def test_logging_with_run_task_subprocess(self): + # We are not using self.assertLogs as we want to verify what actually is stored in the Log file + # as that is what gets displayed + with conf_vars({('core', 'dags_folder'): os.path.join(TEST_DAG_FOLDER, self.dag_id)}): + cli.run(self.parser.parse_args([ + 'run', self.dag_id, self.task_id, '--local', self.execution_date_str])) + + with open(self.ti_log_file_path) as l_file: + logs = l_file.read() + + print(logs) # In case of a test failures this line would show detailed log + logs_list = logs.splitlines() + + self.assertIn("Subtask {}".format(self.task_id), logs) + self.assertIn("base_task_runner.py", logs) + self.assert_log_line("Log from DAG Logger", logs_list) + self.assert_log_line("Log from TI Logger", logs_list) + self.assert_log_line("Log from Print statement", logs_list, expect_from_logging_mixin=True) + + self.assertIn("INFO - Running: ['airflow', 'run', '{}', " + "'{}', '{}',".format(self.dag_id, self.task_id, self.execution_date_str), logs) + self.assertIn("INFO - Marking task as SUCCESS.dag_id={}, task_id={}, " + "execution_date=20170101T000000".format(self.dag_id, self.task_id), logs) + + @pytest.mark.integration("redis") @pytest.mark.integration("rabbitmq") class TestWorkerServeLogs(unittest.TestCase): diff --git a/tests/dags/test_logging_in_dag.py b/tests/dags/test_logging_in_dag.py new file mode 100644 index 0000000..0659562 --- /dev/null +++ b/tests/dags/test_logging_in_dag.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging + +from airflow.models import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.utils.timezone import datetime + +logger = logging.getLogger(__name__) + + +def test_logging_fn(**kwargs): + logger.info("Log from DAG Logger") + kwargs["ti"].log.info("Log from TI Logger") + print("Log from Print statement") + + +dag = DAG( + dag_id='test_logging_dag', + schedule_interval=None, + start_date=datetime(2016, 1, 1) +) + +PythonOperator( + task_id='test_task', + python_callable=test_logging_fn, + provide_context=True, + dag=dag, +)
