This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 281e54b442 fix TestLocalTaskJob tests (#24432)
281e54b442 is described below

commit 281e54b442f0a02bda53ae847aae9f371306f246
Author: Andrey Anshin <[email protected]>
AuthorDate: Wed Jun 22 10:15:49 2022 +0400

    fix TestLocalTaskJob tests (#24432)
---
 tests/jobs/test_local_task_job.py | 26 +++++++++++++++-----------
 1 file changed, 15 insertions(+), 11 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py 
b/tests/jobs/test_local_task_job.py
index 72246086a9..362bba1828 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -18,6 +18,7 @@
 #
 import datetime
 import os
+import re
 import signal
 import threading
 import time
@@ -401,8 +402,9 @@ class TestLocalTaskJob:
 
         ti.refresh_from_db()
         assert ti.state == State.FAILED
-        assert "State of this instance has been externally set to failed. "
-        "Terminating instance." in caplog.text
+        assert (
+            "State of this instance has been externally set to failed. 
Terminating instance."
+        ) in caplog.text
 
     def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag):
         """
@@ -461,8 +463,9 @@ class TestLocalTaskJob:
             lines = f.readlines()
         assert len(lines) == 1  # invoke once
         assert lines[0].startswith(ti.key.primary)
-        this_pid = str(os.getpid())
-        assert f"pid: {this_pid}" not in lines[0]
+        m = re.match(r"^.+pid: (\d+)$", lines[0])
+        assert m, "pid expected in output."
+        assert os.getpid() != int(m.group(1))
 
     def test_mark_success_on_success_callback(self, caplog, get_test_dag):
         """
@@ -487,8 +490,9 @@ class TestLocalTaskJob:
         with timeout(30):
             job.run()  # This should run fast because of the return_code=None
         ti.refresh_from_db()
-        assert "State of this instance has been externally set to success. "
-        "Terminating instance." in caplog.text
+        assert (
+            "State of this instance has been externally set to success. 
Terminating instance." in caplog.text
+        )
 
     @pytest.mark.parametrize("signal_type", [signal.SIGTERM, signal.SIGKILL])
     def test_process_os_signal_calls_on_failure_callback(
@@ -567,11 +571,11 @@ class TestLocalTaskJob:
             assert len(lines) == 1
             assert lines[0].startswith(ti.key.primary)
 
-            this_pid = str(os.getpid())
-            assert f"pid: {this_pid}" not in lines[0]  # ensures callback is 
NOT run by LocalTaskJob
-            assert (
-                str(ti.pid) in lines[0]
-            )  # ensures callback is run by airflow run --raw 
(TaskInstance#_run_raw_task)
+            m = re.match(r"^.+pid: (\d+)$", lines[0])
+            assert m, "pid expected in output."
+            pid = int(m.group(1))
+            assert os.getpid() != pid  # ensures callback is NOT run by 
LocalTaskJob
+            assert ti.pid == pid  # ensures callback is run by airflow run 
--raw (TaskInstance#_run_raw_task)
         elif signal_type == signal.SIGKILL:
             assert (
                 ti.state == State.RUNNING

Reply via email to