This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 20ac7c196e9e8ba420fd89ce23c02c6ab6bdb491
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Dec 5 20:06:26 2021 +0100

    Fix flaky on_kill (#20054)
    
    The previous fix in #20040 improved forked tests but also caused
    instability in the "on_kill" test for standard task runner.
    
    This PR fixes the instability by signalling when the task started
    rather than waiting for fixed amount of time and it adds better
    diagnostics for the test.
    
    (cherry picked from commit e2345ffca9013de8dedaa6c75dbecb48c073353f)
---
 tests/dags/test_on_kill.py                         | 10 ++++-
 .../task/task_runner/test_standard_task_runner.py  | 44 ++++++++++++++--------
 2 files changed, 38 insertions(+), 16 deletions(-)

diff --git a/tests/dags/test_on_kill.py b/tests/dags/test_on_kill.py
index 3e53df6..5b6a4e3 100644
--- a/tests/dags/test_on_kill.py
+++ b/tests/dags/test_on_kill.py
@@ -26,6 +26,12 @@ class DummyWithOnKill(DummyOperator):
     def execute(self, context):
         import os
 
+        self.log.info("Signalling that I am running")
+        # signal to the test that we've started
+        with open("/tmp/airflow_on_kill_running", "w") as f:
+            f.write("ON_KILL_RUNNING")
+        self.log.info("Signalled")
+
         # This runs extra processes, so that we can be sure that we correctly
         # tidy up all processes launched by a task when killing
         if not os.fork():
@@ -34,11 +40,13 @@ class DummyWithOnKill(DummyOperator):
 
     def on_kill(self):
         self.log.info("Executing on_kill")
-        with open("/tmp/airflow_on_kill", "w") as f:
+        with open("/tmp/airflow_on_kill_killed", "w") as f:
             f.write("ON_KILL_TEST")
+        self.log.info("Executed on_kill")
 
 
 # DAG tests backfill with pooled tasks
 # Previously backfill would queue the task but never run it
 dag1 = DAG(dag_id='test_on_kill', start_date=datetime(2015, 1, 1))
+
 dag1_task1 = DummyWithOnKill(task_id='task1', dag=dag1, owner='airflow')
diff --git a/tests/task/task_runner/test_standard_task_runner.py 
b/tests/task/task_runner/test_standard_task_runner.py
index 4be1d66..abd9f71 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -177,9 +177,14 @@ class TestStandardTaskRunner:
         Test that ensures that clearing in the UI SIGTERMS
         the task
         """
-        path = "/tmp/airflow_on_kill"
+        path_on_kill_running = "/tmp/airflow_on_kill_running"
+        path_on_kill_killed = "/tmp/airflow_on_kill_killed"
         try:
-            os.unlink(path)
+            os.unlink(path_on_kill_running)
+        except OSError:
+            pass
+        try:
+            os.unlink(path_on_kill_killed)
         except OSError:
             pass
 
@@ -205,27 +210,36 @@ class TestStandardTaskRunner:
             runner = StandardTaskRunner(job1)
             runner.start()
 
-            # give the task some time to startup
+            with timeout(seconds=3):
+                while True:
+                    runner_pgid = os.getpgid(runner.process.pid)
+                    if runner_pgid == runner.process.pid:
+                        break
+                    time.sleep(0.01)
+
+            processes = list(self._procs_in_pgroup(runner_pgid))
+
+            logging.info("Waiting for the task to start")
+            with timeout(seconds=4):
+                while True:
+                    if os.path.exists(path_on_kill_running):
+                        break
+                    time.sleep(0.01)
+            logging.info("Task started. Give the task some time to settle")
             time.sleep(3)
-
-            pgid = os.getpgid(runner.process.pid)
-            assert pgid > 0
-            assert pgid != os.getpgid(0), "Task should be in a different 
process group to us"
-
-            processes = list(self._procs_in_pgroup(pgid))
-
+            logging.info(f"Terminating processes {processes} belonging to 
{runner_pgid} group")
             runner.terminate()
-
             session.close()  # explicitly close as `create_session`s commit 
will blow up otherwise
 
-        # Wait some time for the result
-        with timeout(seconds=40):
+        logging.info("Waiting for the on kill killed file to appear")
+        with timeout(seconds=4):
             while True:
-                if os.path.exists(path):
+                if os.path.exists(path_on_kill_killed):
                     break
                 time.sleep(0.01)
+        logging.info("The file appeared")
 
-        with open(path) as f:
+        with open(path_on_kill_killed) as f:
             assert "ON_KILL_TEST" == f.readline()
 
         for process in processes:

Reply via email to