This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9319a31 Fixing failing quarantined test cases in test_task_command
(#19864)
9319a31 is described below
commit 9319a31ab11e83fd281b8ed5d8469b038ddad172
Author: Khalid Mammadov <[email protected]>
AuthorDate: Sun Dec 5 21:30:52 2021 +0000
Fixing failing quarantined test cases in test_task_command (#19864)
---
tests/cli/commands/test_task_command.py | 70 +++++++++++----------------------
1 file changed, 23 insertions(+), 47 deletions(-)
diff --git a/tests/cli/commands/test_task_command.py
b/tests/cli/commands/test_task_command.py
index a840eea..0cc548e 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -379,8 +379,6 @@ class TestCliTasks(unittest.TestCase):
task_command.task_clear(args)
-# For this test memory spins out of control on Python 3.6. TODO(potiuk):
FIXME")
[email protected]
class TestLogsfromTaskRunCommand(unittest.TestCase):
def setUp(self) -> None:
self.dag_id = "test_logging_dag"
@@ -444,23 +442,10 @@ class TestLogsfromTaskRunCommand(unittest.TestCase):
@mock.patch("airflow.cli.commands.task_command.LocalTaskJob")
def test_external_executor_id_present_for_fork_run_task(self,
mock_local_job):
- naive_date = datetime(2016, 1, 1)
- dag_id = 'test_run_fork_has_external_executor_id'
- task0_id = 'test_run_fork_task'
-
- dag = self.dagbag.get_dag(dag_id)
- args_list = [
- 'tasks',
- 'run',
- '--local',
- dag_id,
- task0_id,
- naive_date.isoformat(),
- ]
- args = self.parser.parse_args(args_list)
+ args = self.parser.parse_args(self.task_args)
args.external_executor_id = "ABCD12345"
- task_command.task_run(args, dag=dag)
+ task_command.task_run(args)
mock_local_job.assert_called_once_with(
task_instance=mock.ANY,
mark_success=False,
@@ -475,22 +460,11 @@ class TestLogsfromTaskRunCommand(unittest.TestCase):
@mock.patch("airflow.cli.commands.task_command.LocalTaskJob")
def test_external_executor_id_present_for_process_run_task(self,
mock_local_job):
- naive_date = datetime(2016, 1, 1)
- dag_id = 'test_run_process_has_external_executor_id'
- task0_id = 'test_run_process_task'
+ args = self.parser.parse_args(self.task_args)
+ args.external_executor_id = "ABCD12345"
- dag = self.dagbag.get_dag(dag_id)
- args_list = [
- 'tasks',
- 'run',
- '--local',
- dag_id,
- task0_id,
- naive_date.isoformat(),
- ]
- args = self.parser.parse_args(args_list)
with mock.patch.dict(os.environ, {"external_executor_id":
"12345FEDCBA"}):
- task_command.task_run(args, dag=dag)
+ task_command.task_run(args)
mock_local_job.assert_called_once_with(
task_instance=mock.ANY,
mark_success=False,
@@ -522,7 +496,7 @@ class TestLogsfromTaskRunCommand(unittest.TestCase):
assert "standard_task_runner.py" in logs
assert (
f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', "
- f"'{self.task_id}', '{self.execution_date_str}'," in logs
+ f"'{self.task_id}', '{self.run_id}'," in logs
)
self.assert_log_line("Log from DAG Logger", logs_list)
@@ -534,6 +508,8 @@ class TestLogsfromTaskRunCommand(unittest.TestCase):
f"task_id={self.task_id}, execution_date=20170101T000000" in logs
)
+ # For this test memory spins out of control on Python 3.6. TODO(potiuk):
FIXME")
+ @pytest.mark.quarantined
@mock.patch("airflow.task.task_runner.standard_task_runner.CAN_FORK",
False)
def test_logging_with_run_task_subprocess(self):
# We are not using self.assertLogs as we want to verify what actually
is stored in the Log file
@@ -565,24 +541,23 @@ class TestLogsfromTaskRunCommand(unittest.TestCase):
def test_log_file_template_with_run_task(self):
"""Verify that the taskinstance has the right context for
log_filename_template"""
- with mock.patch.object(task_command, "_run_task_by_selected_method"):
- with conf_vars({('core', 'dags_folder'): self.dag_path}):
- # increment the try_number of the task to be run
- with create_session() as session:
- ti =
session.query(TaskInstance).filter_by(run_id=self.run_id)
- ti.try_number = 1
+ with conf_vars({('core', 'dags_folder'): self.dag_path}):
+ # increment the try_number of the task to be run
+ with create_session() as session:
+ ti =
session.query(TaskInstance).filter_by(run_id=self.run_id).first()
+ ti.try_number = 1
- log_file_path =
os.path.join(os.path.dirname(self.ti_log_file_path), "2.log")
+ log_file_path =
os.path.join(os.path.dirname(self.ti_log_file_path), "2.log")
- try:
-
task_command.task_run(self.parser.parse_args(self.task_args))
+ try:
+ task_command.task_run(self.parser.parse_args(self.task_args))
- assert os.path.exists(log_file_path)
- finally:
- try:
- os.remove(log_file_path)
- except OSError:
- pass
+ assert os.path.exists(log_file_path)
+ finally:
+ try:
+ os.remove(log_file_path)
+ except OSError:
+ pass
@mock.patch.object(task_command, "_run_task_by_selected_method")
def test_root_logger_restored(self, run_task_mock):
@@ -610,6 +585,7 @@ class TestLogsfromTaskRunCommand(unittest.TestCase):
assert self.root_logger.handlers == self.root_handlers
+ @pytest.mark.quarantined
@mock.patch.object(task_command, "_run_task_by_selected_method")
def test_disable_handler_modifying(self, run_task_mock):
"""If [core] donot_modify_handlers is set to True, the root logger is
untouched"""