This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e7dedbe95a Fix class instance vs. class type in
validate_database_executor_compatibility() call (#40626)
e7dedbe95a is described below
commit e7dedbe95af6cb0fd408ce6cea2707dbef1d2682
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun Jul 7 13:23:43 2024 +0200
Fix class instance vs. class type in
validate_database_executor_compatibility() call (#40626)
* Fix class instance vs. class type in check call
* Fix class instance vs. class type in check call, adjust pytests
---
airflow/cli/commands/scheduler_command.py | 2 +-
tests/cli/commands/test_scheduler_command.py | 48 +++++++++++++++++++++++++---
2 files changed, 45 insertions(+), 5 deletions(-)
diff --git a/airflow/cli/commands/scheduler_command.py
b/airflow/cli/commands/scheduler_command.py
index 2b7c77fda9..2f97d3cae3 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -43,7 +43,7 @@ def _run_scheduler_job(args) -> None:
job_runner = SchedulerJobRunner(
job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs,
do_pickle=args.do_pickle
)
-
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor)
+
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__)
InternalApiConfig.force_database_direct_access()
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(args.skip_serve_logs),
_serve_health_check(enable_health_check):
diff --git a/tests/cli/commands/test_scheduler_command.py
b/tests/cli/commands/test_scheduler_command.py
index b6d6a9d921..5d3dd54e33 100644
--- a/tests/cli/commands/test_scheduler_command.py
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -49,12 +49,17 @@ class TestSchedulerCommand:
("LocalKubernetesExecutor", True),
],
)
+ @mock.patch(
+
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+ side_effect=None,
+ )
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
def test_serve_logs_on_scheduler(
self,
mock_process,
mock_scheduler_job,
+ mock_validate,
executor,
expect_serve_logs,
):
@@ -70,10 +75,14 @@ class TestSchedulerCommand:
with pytest.raises(AssertionError):
mock_process.assert_has_calls([mock.call(target=serve_logs)])
+ @mock.patch(
+
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+ side_effect=None,
+ )
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
@pytest.mark.parametrize("executor", ["LocalExecutor",
"SequentialExecutor"])
- def test_skip_serve_logs(self, mock_process, mock_scheduler_job, executor):
+ def test_skip_serve_logs(self, mock_process, mock_scheduler_job,
mock_validate, executor):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler", "--skip-serve-logs"])
with conf_vars({("core", "executor"): executor}):
@@ -82,11 +91,17 @@ class TestSchedulerCommand:
with pytest.raises(AssertionError):
mock_process.assert_has_calls([mock.call(target=serve_logs)])
+ @mock.patch(
+
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+ side_effect=None,
+ )
@mock.patch("airflow.utils.db.check_and_run_migrations")
@mock.patch("airflow.utils.db.synchronize_log_template")
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
- def test_check_migrations_is_false(self, mock_process, mock_scheduler_job,
mock_log, mock_run_migration):
+ def test_check_migrations_is_false(
+ self, mock_process, mock_scheduler_job, mock_log, mock_run_migration,
mock_validate
+ ):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler"])
with conf_vars({("database", "check_migrations"): "False"}):
@@ -94,11 +109,17 @@ class TestSchedulerCommand:
mock_run_migration.assert_not_called()
mock_log.assert_called_once()
+ @mock.patch(
+
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+ side_effect=None,
+ )
@mock.patch("airflow.utils.db.check_and_run_migrations")
@mock.patch("airflow.utils.db.synchronize_log_template")
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
- def test_check_migrations_is_true(self, mock_process, mock_scheduler_job,
mock_log, mock_run_migration):
+ def test_check_migrations_is_true(
+ self, mock_process, mock_scheduler_job, mock_log, mock_run_migration,
mock_validate
+ ):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler"])
with conf_vars({("database", "check_migrations"): "True"}):
@@ -106,10 +127,14 @@ class TestSchedulerCommand:
mock_run_migration.assert_called_once()
mock_log.assert_called_once()
+ @mock.patch(
+
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+ side_effect=None,
+ )
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
@pytest.mark.parametrize("executor", ["LocalExecutor",
"SequentialExecutor"])
- def test_graceful_shutdown(self, mock_process, mock_scheduler_job,
executor):
+ def test_graceful_shutdown(self, mock_process, mock_scheduler_job,
mock_validate, executor):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler"])
with conf_vars({("core", "executor"): executor}):
@@ -120,12 +145,17 @@ class TestSchedulerCommand:
finally:
mock_process().terminate.assert_called()
+ @mock.patch(
+
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+ side_effect=None,
+ )
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
def test_enable_scheduler_health(
self,
mock_process,
mock_scheduler_job,
+ mock_validate,
):
with conf_vars({("scheduler", "enable_health_check"): "True"}):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
@@ -133,12 +163,17 @@ class TestSchedulerCommand:
scheduler_command.scheduler(args)
mock_process.assert_has_calls([mock.call(target=serve_health_check)])
+ @mock.patch(
+
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+ side_effect=None,
+ )
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
def test_disable_scheduler_health(
self,
mock_process,
mock_scheduler_job,
+ mock_validate,
):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler"])
@@ -162,6 +197,10 @@ class TestSchedulerCommand:
serve_health_check()
assert http_server_mock.call_args.args[0] == (health_check_host,
health_check_port)
+ @mock.patch(
+
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+ side_effect=None,
+ )
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
@mock.patch("airflow.cli.commands.scheduler_command.run_job",
side_effect=Exception("run_job failed"))
@@ -170,6 +209,7 @@ class TestSchedulerCommand:
mock_run_job,
mock_process,
mock_scheduler_job,
+ mock_validate,
):
args = self.parser.parse_args(["scheduler"])
with pytest.raises(Exception, match="run_job failed"):