This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b8ce5f0 Revert "Fix intermittent orphan test (#18530)" (#18631)
b8ce5f0 is described below
commit b8ce5f0f8726d25a16ec5d5c95823c7f1489a5af
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Sep 30 11:24:45 2021 +0200
Revert "Fix intermittent orphan test (#18530)" (#18631)
This reverts commit 387c43f625e379a0de8e3527a98833eb5f62d3bf.
---
tests/jobs/test_scheduler_job.py | 43 ++++++++++------------------------------
1 file changed, 10 insertions(+), 33 deletions(-)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 7a75bdd..29da58e 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -18,7 +18,6 @@
#
import datetime
-import logging
import os
import shutil
from datetime import timedelta
@@ -34,7 +33,6 @@ from sqlalchemy import func
import airflow.example_dags
import airflow.smart_sensor_dags
from airflow import settings
-from airflow.configuration import conf
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
@@ -78,8 +76,6 @@ TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
TRY_NUMBER = 1
-log = logging.getLogger(__name__)
-
@pytest.fixture(scope="class")
def disable_load_example():
@@ -169,38 +165,19 @@ class TestSchedulerJob:
self.scheduler_job.heartrate = 0
self.scheduler_job.run()
- @pytest.mark.skipif(
- conf.get('core', 'sql_alchemy_conn').lower().startswith("mssql"),
- reason="MSSQL does not like os._exit()",
- )
- @pytest.mark.skipif(not hasattr(os, 'fork'), reason="Forking not
available")
def test_no_orphan_process_will_be_left(self):
empty_dir = mkdtemp()
current_process = psutil.Process()
- pid = os.fork()
- # Running the test in a fork to avoid side effects from other tests -
those side-effects migh
- # Cause some processes to be running as children
- if pid == 0:
- old_children = current_process.children(recursive=True)
- self.scheduler_job = SchedulerJob(
- subdir=empty_dir, num_runs=1,
executor=MockExecutor(do_update=False)
- )
- self.scheduler_job.run()
- shutil.rmtree(empty_dir)
-
- # Remove potential noise created by previous tests.
- current_children = set(current_process.children(recursive=True)) -
set(old_children)
- if current_children:
- log.error(f"Current children: {current_children}")
- # Exit immediately from the fork without cleanup (avoid Pytest
atexit)
- os._exit(1)
- # Exit immediately from the fork without cleanup (avoid Pytest
atexit)
- os._exit(0)
- else:
- pid, ret_val = os.wait()
- assert (
- not ret_val
- ), "The return value entered from process was non-zero. See error
log above for details."
+ old_children = current_process.children(recursive=True)
+ self.scheduler_job = SchedulerJob(
+ subdir=empty_dir, num_runs=1,
executor=MockExecutor(do_update=False)
+ )
+ self.scheduler_job.run()
+ shutil.rmtree(empty_dir)
+
+ # Remove potential noise created by previous tests.
+ current_children = set(current_process.children(recursive=True)) -
set(old_children)
+ assert not current_children
@mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
@mock.patch('airflow.jobs.scheduler_job.Stats.incr')