This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 281e54b442 fix TestLocalTaskJob tests (#24432)
281e54b442 is described below
commit 281e54b442f0a02bda53ae847aae9f371306f246
Author: Andrey Anshin <[email protected]>
AuthorDate: Wed Jun 22 10:15:49 2022 +0400
fix TestLocalTaskJob tests (#24432)
---
tests/jobs/test_local_task_job.py | 26 +++++++++++++++-----------
1 file changed, 15 insertions(+), 11 deletions(-)
diff --git a/tests/jobs/test_local_task_job.py
b/tests/jobs/test_local_task_job.py
index 72246086a9..362bba1828 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -18,6 +18,7 @@
#
import datetime
import os
+import re
import signal
import threading
import time
@@ -401,8 +402,9 @@ class TestLocalTaskJob:
ti.refresh_from_db()
assert ti.state == State.FAILED
- assert "State of this instance has been externally set to failed. "
- "Terminating instance." in caplog.text
+ assert (
+ "State of this instance has been externally set to failed.
Terminating instance."
+ ) in caplog.text
def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag):
"""
@@ -461,8 +463,9 @@ class TestLocalTaskJob:
lines = f.readlines()
assert len(lines) == 1 # invoke once
assert lines[0].startswith(ti.key.primary)
- this_pid = str(os.getpid())
- assert f"pid: {this_pid}" not in lines[0]
+ m = re.match(r"^.+pid: (\d+)$", lines[0])
+ assert m, "pid expected in output."
+ assert os.getpid() != int(m.group(1))
def test_mark_success_on_success_callback(self, caplog, get_test_dag):
"""
@@ -487,8 +490,9 @@ class TestLocalTaskJob:
with timeout(30):
job.run() # This should run fast because of the return_code=None
ti.refresh_from_db()
- assert "State of this instance has been externally set to success. "
- "Terminating instance." in caplog.text
+ assert (
+ "State of this instance has been externally set to success.
Terminating instance." in caplog.text
+ )
@pytest.mark.parametrize("signal_type", [signal.SIGTERM, signal.SIGKILL])
def test_process_os_signal_calls_on_failure_callback(
@@ -567,11 +571,11 @@ class TestLocalTaskJob:
assert len(lines) == 1
assert lines[0].startswith(ti.key.primary)
- this_pid = str(os.getpid())
- assert f"pid: {this_pid}" not in lines[0] # ensures callback is
NOT run by LocalTaskJob
- assert (
- str(ti.pid) in lines[0]
- ) # ensures callback is run by airflow run --raw
(TaskInstance#_run_raw_task)
+ m = re.match(r"^.+pid: (\d+)$", lines[0])
+ assert m, "pid expected in output."
+ pid = int(m.group(1))
+ assert os.getpid() != pid # ensures callback is NOT run by
LocalTaskJob
+ assert ti.pid == pid # ensures callback is run by airflow run
--raw (TaskInstance#_run_raw_task)
elif signal_type == signal.SIGKILL:
assert (
ti.state == State.RUNNING