This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8adb12651de6955b1a702f04af5578c88e950603 Author: Andrew Godwin <[email protected]> AuthorDate: Tue Apr 6 13:02:11 2021 -0600 Fixed #14270: Add error message in OOM situations (#15207) In the case where a child process is reaped early (before we get to it) the presumption in the code is that it is due to an OOM error and we set the return code -9. This adds an error message alongside that return code to make it more obvious. (cherry picked from commit 18e2c1de776c8c3bc42c984ea0d31515788b6572) --- airflow/task/task_runner/standard_task_runner.py | 8 +++ .../task/task_runner/test_standard_task_runner.py | 59 ++++++++++++++++++---- 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 505b225..bb566b2 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -121,3 +121,11 @@ class StandardTaskRunner(BaseTaskRunner): if self._rc is None: # Something else reaped it before we had a chance, so let's just "guess" at an error code. self._rc = -9 + + if self._rc == -9: + # If either we or psutil gives out a -9 return code, it likely means + # an OOM happened + self.log.error( + 'Job %s was killed before it finished (likely due to running out of memory)', + self._task_instance.job_id, + ) diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py index fcd4948..6a3ab5d 100644 --- a/tests/task/task_runner/test_standard_task_runner.py +++ b/tests/task/task_runner/test_standard_task_runner.py @@ -19,11 +19,11 @@ import getpass import logging import os import time -import unittest from logging.config import dictConfig from unittest import mock import psutil +import pytest from airflow import models, settings from airflow.jobs.local_task_job import LocalTaskJob @@ -48,22 +48,24 @@ LOGGING_CONFIG = { 'class': 'logging.StreamHandler', 'formatter': 'airflow.task', 'stream': 'ext://sys.stdout', - } + }, }, - 'loggers': {'airflow': {'handlers': ['console'], 'level': 'INFO', 'propagate': False}}, + 'loggers': {'airflow': {'handlers': ['console'], 'level': 'INFO', 'propagate': True}}, } -class TestStandardTaskRunner(unittest.TestCase): - @classmethod - def setUpClass(cls): +class TestStandardTaskRunner: + @pytest.fixture(autouse=True, scope="class") + def logging_and_db(self): + """ + This fixture sets up logging to have a different setup on the way in + (as the test environment does not have enough context for the normal + way to run) and ensures they reset back to normal on the way out. + """ dictConfig(LOGGING_CONFIG) - - @classmethod - def tearDownClass(cls): + yield airflow_logger = logging.getLogger('airflow') airflow_logger.handlers = [] - airflow_logger.propagate = True try: clear_db_runs() except Exception: # noqa pylint: disable=broad-except @@ -131,6 +133,43 @@ class TestStandardTaskRunner(unittest.TestCase): assert runner.return_code() is not None + def test_early_reap_exit(self, caplog): + """ + Tests that when a child process running a task is killed externally + (e.g. by an OOM error, which we fake here), then we get return code + -9 and a log message. + """ + # Set up mock task + local_task_job = mock.Mock() + local_task_job.task_instance = mock.MagicMock() + local_task_job.task_instance.run_as_user = getpass.getuser() + local_task_job.task_instance.command_as_list.return_value = [ + 'airflow', + 'tasks', + 'test', + 'test_on_kill', + 'task1', + '2016-01-01', + ] + + # Kick off the runner + runner = StandardTaskRunner(local_task_job) + runner.start() + time.sleep(0.2) + + # Kill the child process externally from the runner + # Note that we have to do this from ANOTHER process, as if we just + # call os.kill here we're doing it from the parent process and it + # won't be the same as an external kill in terms of OS tracking. + pgid = os.getpgid(runner.process.pid) + os.system(f"kill -s KILL {pgid}") + time.sleep(0.2) + + runner.terminate() + + assert runner.return_code() == -9 + assert "running out of memory" in caplog.text + def test_on_kill(self): """ Test that ensures that clearing in the UI SIGTERMS
