This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e7dedbe95a Fix class instance vs. class type in 
validate_database_executor_compatibility() call (#40626)
e7dedbe95a is described below

commit e7dedbe95af6cb0fd408ce6cea2707dbef1d2682
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun Jul 7 13:23:43 2024 +0200

    Fix class instance vs. class type in 
validate_database_executor_compatibility() call (#40626)
    
    * Fix class instance vs. class type in check call
    
    * Fix class instance vs. class type in check call, adjust pytests
---
 airflow/cli/commands/scheduler_command.py    |  2 +-
 tests/cli/commands/test_scheduler_command.py | 48 +++++++++++++++++++++++++---
 2 files changed, 45 insertions(+), 5 deletions(-)

diff --git a/airflow/cli/commands/scheduler_command.py 
b/airflow/cli/commands/scheduler_command.py
index 2b7c77fda9..2f97d3cae3 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -43,7 +43,7 @@ def _run_scheduler_job(args) -> None:
     job_runner = SchedulerJobRunner(
         job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, 
do_pickle=args.do_pickle
     )
-    
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor)
+    
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__)
     InternalApiConfig.force_database_direct_access()
     enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
     with _serve_logs(args.skip_serve_logs), 
_serve_health_check(enable_health_check):
diff --git a/tests/cli/commands/test_scheduler_command.py 
b/tests/cli/commands/test_scheduler_command.py
index b6d6a9d921..5d3dd54e33 100644
--- a/tests/cli/commands/test_scheduler_command.py
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -49,12 +49,17 @@ class TestSchedulerCommand:
             ("LocalKubernetesExecutor", True),
         ],
     )
+    @mock.patch(
+        
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+        side_effect=None,
+    )
     @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
     @mock.patch("airflow.cli.commands.scheduler_command.Process")
     def test_serve_logs_on_scheduler(
         self,
         mock_process,
         mock_scheduler_job,
+        mock_validate,
         executor,
         expect_serve_logs,
     ):
@@ -70,10 +75,14 @@ class TestSchedulerCommand:
                 with pytest.raises(AssertionError):
                     
mock_process.assert_has_calls([mock.call(target=serve_logs)])
 
+    @mock.patch(
+        
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+        side_effect=None,
+    )
     @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
     @mock.patch("airflow.cli.commands.scheduler_command.Process")
     @pytest.mark.parametrize("executor", ["LocalExecutor", 
"SequentialExecutor"])
-    def test_skip_serve_logs(self, mock_process, mock_scheduler_job, executor):
+    def test_skip_serve_logs(self, mock_process, mock_scheduler_job, 
mock_validate, executor):
         mock_scheduler_job.return_value.job_type = "SchedulerJob"
         args = self.parser.parse_args(["scheduler", "--skip-serve-logs"])
         with conf_vars({("core", "executor"): executor}):
@@ -82,11 +91,17 @@ class TestSchedulerCommand:
             with pytest.raises(AssertionError):
                 mock_process.assert_has_calls([mock.call(target=serve_logs)])
 
+    @mock.patch(
+        
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+        side_effect=None,
+    )
     @mock.patch("airflow.utils.db.check_and_run_migrations")
     @mock.patch("airflow.utils.db.synchronize_log_template")
     @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
     @mock.patch("airflow.cli.commands.scheduler_command.Process")
-    def test_check_migrations_is_false(self, mock_process, mock_scheduler_job, 
mock_log, mock_run_migration):
+    def test_check_migrations_is_false(
+        self, mock_process, mock_scheduler_job, mock_log, mock_run_migration, 
mock_validate
+    ):
         mock_scheduler_job.return_value.job_type = "SchedulerJob"
         args = self.parser.parse_args(["scheduler"])
         with conf_vars({("database", "check_migrations"): "False"}):
@@ -94,11 +109,17 @@ class TestSchedulerCommand:
             mock_run_migration.assert_not_called()
             mock_log.assert_called_once()
 
+    @mock.patch(
+        
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+        side_effect=None,
+    )
     @mock.patch("airflow.utils.db.check_and_run_migrations")
     @mock.patch("airflow.utils.db.synchronize_log_template")
     @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
     @mock.patch("airflow.cli.commands.scheduler_command.Process")
-    def test_check_migrations_is_true(self, mock_process, mock_scheduler_job, 
mock_log, mock_run_migration):
+    def test_check_migrations_is_true(
+        self, mock_process, mock_scheduler_job, mock_log, mock_run_migration, 
mock_validate
+    ):
         mock_scheduler_job.return_value.job_type = "SchedulerJob"
         args = self.parser.parse_args(["scheduler"])
         with conf_vars({("database", "check_migrations"): "True"}):
@@ -106,10 +127,14 @@ class TestSchedulerCommand:
             mock_run_migration.assert_called_once()
             mock_log.assert_called_once()
 
+    @mock.patch(
+        
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+        side_effect=None,
+    )
     @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
     @mock.patch("airflow.cli.commands.scheduler_command.Process")
     @pytest.mark.parametrize("executor", ["LocalExecutor", 
"SequentialExecutor"])
-    def test_graceful_shutdown(self, mock_process, mock_scheduler_job, 
executor):
+    def test_graceful_shutdown(self, mock_process, mock_scheduler_job, 
mock_validate, executor):
         mock_scheduler_job.return_value.job_type = "SchedulerJob"
         args = self.parser.parse_args(["scheduler"])
         with conf_vars({("core", "executor"): executor}):
@@ -120,12 +145,17 @@ class TestSchedulerCommand:
             finally:
                 mock_process().terminate.assert_called()
 
+    @mock.patch(
+        
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+        side_effect=None,
+    )
     @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
     @mock.patch("airflow.cli.commands.scheduler_command.Process")
     def test_enable_scheduler_health(
         self,
         mock_process,
         mock_scheduler_job,
+        mock_validate,
     ):
         with conf_vars({("scheduler", "enable_health_check"): "True"}):
             mock_scheduler_job.return_value.job_type = "SchedulerJob"
@@ -133,12 +163,17 @@ class TestSchedulerCommand:
             scheduler_command.scheduler(args)
             
mock_process.assert_has_calls([mock.call(target=serve_health_check)])
 
+    @mock.patch(
+        
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+        side_effect=None,
+    )
     @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
     @mock.patch("airflow.cli.commands.scheduler_command.Process")
     def test_disable_scheduler_health(
         self,
         mock_process,
         mock_scheduler_job,
+        mock_validate,
     ):
         mock_scheduler_job.return_value.job_type = "SchedulerJob"
         args = self.parser.parse_args(["scheduler"])
@@ -162,6 +197,10 @@ class TestSchedulerCommand:
             serve_health_check()
         assert http_server_mock.call_args.args[0] == (health_check_host, 
health_check_port)
 
+    @mock.patch(
+        
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
+        side_effect=None,
+    )
     @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
     @mock.patch("airflow.cli.commands.scheduler_command.Process")
     @mock.patch("airflow.cli.commands.scheduler_command.run_job", 
side_effect=Exception("run_job failed"))
@@ -170,6 +209,7 @@ class TestSchedulerCommand:
         mock_run_job,
         mock_process,
         mock_scheduler_job,
+        mock_validate,
     ):
         args = self.parser.parse_args(["scheduler"])
         with pytest.raises(Exception, match="run_job failed"):

Reply via email to