This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 20ac7c196e9e8ba420fd89ce23c02c6ab6bdb491 Author: Jarek Potiuk <[email protected]> AuthorDate: Sun Dec 5 20:06:26 2021 +0100 Fix flaky on_kill (#20054) The previous fix in #20040 improved forked tests but also caused instability in the "on_kill" test for standard task runner. This PR fixes the instability by signalling when the task started rather than waiting for fixed amount of time and it adds better diagnostics for the test. (cherry picked from commit e2345ffca9013de8dedaa6c75dbecb48c073353f) --- tests/dags/test_on_kill.py | 10 ++++- .../task/task_runner/test_standard_task_runner.py | 44 ++++++++++++++-------- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/tests/dags/test_on_kill.py b/tests/dags/test_on_kill.py index 3e53df6..5b6a4e3 100644 --- a/tests/dags/test_on_kill.py +++ b/tests/dags/test_on_kill.py @@ -26,6 +26,12 @@ class DummyWithOnKill(DummyOperator): def execute(self, context): import os + self.log.info("Signalling that I am running") + # signal to the test that we've started + with open("/tmp/airflow_on_kill_running", "w") as f: + f.write("ON_KILL_RUNNING") + self.log.info("Signalled") + # This runs extra processes, so that we can be sure that we correctly # tidy up all processes launched by a task when killing if not os.fork(): @@ -34,11 +40,13 @@ class DummyWithOnKill(DummyOperator): def on_kill(self): self.log.info("Executing on_kill") - with open("/tmp/airflow_on_kill", "w") as f: + with open("/tmp/airflow_on_kill_killed", "w") as f: f.write("ON_KILL_TEST") + self.log.info("Executed on_kill") # DAG tests backfill with pooled tasks # Previously backfill would queue the task but never run it dag1 = DAG(dag_id='test_on_kill', start_date=datetime(2015, 1, 1)) + dag1_task1 = DummyWithOnKill(task_id='task1', dag=dag1, owner='airflow') diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py index 4be1d66..abd9f71 100644 --- a/tests/task/task_runner/test_standard_task_runner.py +++ b/tests/task/task_runner/test_standard_task_runner.py @@ -177,9 +177,14 @@ class TestStandardTaskRunner: Test that ensures that clearing in the UI SIGTERMS the task """ - path = "/tmp/airflow_on_kill" + path_on_kill_running = "/tmp/airflow_on_kill_running" + path_on_kill_killed = "/tmp/airflow_on_kill_killed" try: - os.unlink(path) + os.unlink(path_on_kill_running) + except OSError: + pass + try: + os.unlink(path_on_kill_killed) except OSError: pass @@ -205,27 +210,36 @@ class TestStandardTaskRunner: runner = StandardTaskRunner(job1) runner.start() - # give the task some time to startup + with timeout(seconds=3): + while True: + runner_pgid = os.getpgid(runner.process.pid) + if runner_pgid == runner.process.pid: + break + time.sleep(0.01) + + processes = list(self._procs_in_pgroup(runner_pgid)) + + logging.info("Waiting for the task to start") + with timeout(seconds=4): + while True: + if os.path.exists(path_on_kill_running): + break + time.sleep(0.01) + logging.info("Task started. Give the task some time to settle") time.sleep(3) - - pgid = os.getpgid(runner.process.pid) - assert pgid > 0 - assert pgid != os.getpgid(0), "Task should be in a different process group to us" - - processes = list(self._procs_in_pgroup(pgid)) - + logging.info(f"Terminating processes {processes} belonging to {runner_pgid} group") runner.terminate() - session.close() # explicitly close as `create_session`s commit will blow up otherwise - # Wait some time for the result - with timeout(seconds=40): + logging.info("Waiting for the on kill killed file to appear") + with timeout(seconds=4): while True: - if os.path.exists(path): + if os.path.exists(path_on_kill_killed): break time.sleep(0.01) + logging.info("The file appeared") - with open(path) as f: + with open(path_on_kill_killed) as f: assert "ON_KILL_TEST" == f.readline() for process in processes:
