This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a9b36d0d33 Increased test coverage of the cli standalone command 
(#62081)
9a9b36d0d33 is described below

commit 9a9b36d0d33d35feb572effd7e84fffbda20e12b
Author: Dominik <[email protected]>
AuthorDate: Mon Apr 6 23:08:45 2026 +0200

    Increased test coverage of the cli standalone command (#62081)
    
    * Increased test coverage of the cli standalone_command
    
    * Added logic to cover missed cases in code
---
 .../unit/cli/commands/test_standalone_command.py   | 273 ++++++++++++++++++++-
 1 file changed, 272 insertions(+), 1 deletion(-)

diff --git a/airflow-core/tests/unit/cli/commands/test_standalone_command.py 
b/airflow-core/tests/unit/cli/commands/test_standalone_command.py
index b233b20d512..993f51ca2d7 100644
--- a/airflow-core/tests/unit/cli/commands/test_standalone_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_standalone_command.py
@@ -17,12 +17,14 @@
 # under the License.
 from __future__ import annotations
 
+import os
+from collections import deque
 from importlib import reload
 from unittest import mock
 
 import pytest
 
-from airflow.cli.commands.standalone_command import StandaloneCommand
+from airflow.cli.commands.standalone_command import StandaloneCommand, 
SubCommand
 from airflow.executors import executor_loader
 from airflow.executors.executor_constants import (
     CELERY_EXECUTOR,
@@ -48,3 +50,272 @@ class TestStandaloneCommand:
             env = StandaloneCommand().calculate_env()
             # all non local executors will fall back to localesecutor
             assert env["AIRFLOW__CORE__EXECUTOR"] == LOCAL_EXECUTOR
+
+    
@mock.patch("airflow.cli.commands.standalone_command.ExecutorLoader.import_default_executor_cls")
+    @mock.patch("airflow.cli.commands.standalone_command.conf.get")
+    @mock.patch.dict(os.environ, {}, clear=True)
+    def test_calculate_env_force_executor_and_auth(self, mock_conf_get, 
mock_import):
+        class FakeExecutor:
+            is_local = False
+
+        mock_import.return_value = (FakeExecutor, None)
+        mock_conf_get.return_value = "wrong.auth.manager"
+        cmd = StandaloneCommand()
+        env = cmd.calculate_env()
+
+        assert env["AIRFLOW__CORE__EXECUTOR"] == "LocalExecutor"
+        assert "AIRFLOW__CORE__AUTH_MANAGER" in env
+
+    
@mock.patch("airflow.cli.commands.standalone_command.ExecutorLoader.import_default_executor_cls")
+    @mock.patch("airflow.cli.commands.standalone_command.conf.get")
+    @mock.patch.dict(os.environ, {}, clear=True)
+    def test_calculate_env_does_not_override_auth_if_already_set(self, 
mock_conf_get, mock_import):
+        class FakeExecutor:
+            is_local = True
+
+        mock_import.return_value = (FakeExecutor, None)
+        mock_conf_get.return_value = (
+            
"airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager"
+        )
+        cmd = StandaloneCommand()
+        env = cmd.calculate_env()
+
+        assert "AIRFLOW__CORE__AUTH_MANAGER" not in env
+
+    @mock.patch("airflow.cli.commands.standalone_command.os.path.exists", 
return_value=False)
+    @mock.patch("airflow.cli.commands.standalone_command.create_auth_manager")
+    @mock.patch("airflow.cli.commands.standalone_command.conf.get")
+    def test_find_user_info_generates_password(self, mock_conf, mock_auth, 
mock_exists):
+        def conf_side(section, key):
+            if key == "simple_auth_manager_all_admins":
+                return "false"
+            if key == "simple_auth_manager_users":
+                return "admin:admin"
+            return ""
+
+        mock_conf.side_effect = conf_side
+        am = mock.Mock()
+        am.get_generated_password_file.return_value = "/tmp/fake"
+        mock_auth.return_value = am
+        cmd = StandaloneCommand()
+        cmd.find_user_info()
+
+        am.init.assert_called_once()
+
+    @mock.patch("airflow.cli.commands.standalone_command.create_auth_manager")
+    @mock.patch("airflow.cli.commands.standalone_command.conf.get")
+    def test_find_user_info_skips_when_all_admins_true(self, mock_conf_get, 
mock_create_auth):
+        def conf_side(section, key):
+            if key == "simple_auth_manager_all_admins":
+                return "true"
+            return ""
+
+        mock_conf_get.side_effect = conf_side
+        cmd = StandaloneCommand()
+        cmd.find_user_info()
+
+        mock_create_auth.assert_not_called()
+
+    @mock.patch("airflow.cli.commands.standalone_command.create_auth_manager")
+    @mock.patch("airflow.cli.commands.standalone_command.conf.get")
+    def test_find_user_info_skips_when_users_already_configured(self, 
mock_conf_get, mock_create_auth):
+        def conf_side(section, key):
+            if key == "simple_auth_manager_all_admins":
+                return "false"
+            if key == "simple_auth_manager_users":
+                return "custom:password"
+            return ""
+
+        mock_conf_get.side_effect = conf_side
+        cmd = StandaloneCommand()
+        cmd.find_user_info()
+
+        mock_create_auth.assert_not_called()
+
+    @mock.patch("airflow.cli.commands.standalone_command.os.path.exists", 
return_value=True)
+    @mock.patch("airflow.cli.commands.standalone_command.create_auth_manager")
+    @mock.patch("airflow.cli.commands.standalone_command.conf.get")
+    def test_find_user_info_skips_when_password_file_exists(
+        self, mock_conf_get, mock_create_auth, mock_exists
+    ):
+        def conf_side(section, key):
+            if key == "simple_auth_manager_all_admins":
+                return "false"
+            if key == "simple_auth_manager_users":
+                return "admin:admin"
+            return ""
+
+        mock_conf_get.side_effect = conf_side
+        am = mock.Mock()
+        am.get_generated_password_file.return_value = "/tmp/fake"
+        mock_create_auth.return_value = am
+        cmd = StandaloneCommand()
+        cmd.find_user_info()
+
+        am.init.assert_not_called()
+
+    @mock.patch("airflow.cli.commands.standalone_command.db.initdb")
+    def test_initialize_database(self, mock_initdb, monkeypatch):
+        cmd = StandaloneCommand()
+        monkeypatch.setattr(cmd, "print_output", lambda *a: None)
+        cmd.initialize_database()
+
+        mock_initdb.assert_called_once()
+
+    @mock.patch("builtins.print")
+    def test_print_output(self, mock_print):
+        cmd = StandaloneCommand()
+        cmd.print_output("scheduler", "hello\nworld")
+
+        assert mock_print.call_count == 2
+
+    @mock.patch(
+        "airflow.cli.commands.standalone_command.most_recent_job",
+        return_value=None,
+    )
+    def test_job_running_returns_false_if_no_recent_job(self, 
mock_most_recent_job):
+        result = 
StandaloneCommand().job_running(mock.Mock(job_type="scheduler"))
+        assert result is False
+
+    @mock.patch("airflow.cli.commands.standalone_command.most_recent_job")
+    def test_job_running_returns_false_if_job_not_alive(self, 
mock_most_recent_job):
+        fake_job = mock.Mock()
+        fake_job.is_alive.return_value = False
+        mock_most_recent_job.return_value = fake_job
+
+        result = 
StandaloneCommand().job_running(mock.Mock(job_type="scheduler"))
+        assert result is False
+
+    @mock.patch("airflow.cli.commands.standalone_command.most_recent_job")
+    def test_job_running_returns_true_if_alive(self, mock_most_recent_job):
+        fake_job = mock.Mock()
+        fake_job.is_alive.return_value = True
+        mock_most_recent_job.return_value = fake_job
+
+        result = 
StandaloneCommand().job_running(mock.Mock(job_type="scheduler"))
+        assert result is True
+
+    def test_is_ready_true_when_all_components_running(self, monkeypatch):
+        cmd = StandaloneCommand()
+        monkeypatch.setattr(cmd, "job_running", lambda *_: True)
+
+        assert cmd.is_ready() is True
+
+    def test_is_ready_false_when_any_component_missing(self, monkeypatch):
+        cmd = StandaloneCommand()
+        calls = iter([True, False, True])
+        monkeypatch.setattr(cmd, "job_running", lambda *_: next(calls))
+
+        assert cmd.is_ready() is False
+
+    @pytest.mark.parametrize("exc", [OSError, ValueError])
+    def test_port_open_returns_false_on_errors(self, monkeypatch, exc):
+        cmd = StandaloneCommand()
+
+        class FakeSocket:
+            def settimeout(self, *_):
+                pass
+
+            def connect(self, *_):
+                raise exc()
+
+            def close(self):
+                pass
+
+        monkeypatch.setattr(
+            "airflow.cli.commands.standalone_command.socket.socket",
+            lambda *a, **k: FakeSocket(),
+        )
+        assert cmd.port_open(1234) is False
+
+    def test_port_open_returns_true_when_connect_succeeds(self, monkeypatch):
+        cmd = StandaloneCommand()
+
+        class FakeSocket:
+            def settimeout(self, *_):
+                pass
+
+            def connect(self, *_):
+                pass
+
+            def close(self):
+                pass
+
+        monkeypatch.setattr(
+            "airflow.cli.commands.standalone_command.socket.socket",
+            lambda *a, **k: FakeSocket(),
+        )
+
+        assert cmd.port_open(1234) is True
+
+    def test_update_output_drains_queue_and_prints(self, monkeypatch):
+        cmd = StandaloneCommand()
+        printed = deque()
+        monkeypatch.setattr(cmd, "print_output", lambda name, msg: 
printed.append((name, msg)))
+        cmd.output_queue.append(("scheduler", b"hello\n"))
+        cmd.output_queue.append(("api-server", b"world\n"))
+        cmd.update_output()
+
+        assert printed == deque(
+            [
+                ("scheduler", "hello"),
+                ("api-server", "world"),
+            ]
+        )
+        assert len(cmd.output_queue) == 0
+
+    def test_print_error_calls_print_output(self, monkeypatch):
+        cmd = StandaloneCommand()
+        called = []
+        monkeypatch.setattr(cmd, "print_output", lambda name, msg: 
called.append((name, msg)))
+        cmd.print_error("scheduler", "boom")
+
+        assert called
+
+    def test_subcommand_run_appends_output(self, monkeypatch):
+        parent = mock.Mock()
+        parent.output_queue = deque()
+        fake_process = mock.Mock()
+        fake_process.stdout = [b"line1\n", b"line2\n"]
+        monkeypatch.setattr(
+            "airflow.cli.commands.standalone_command.subprocess.Popen",
+            lambda *a, **k: fake_process,
+        )
+        cmd = SubCommand(parent, "scheduler", ["scheduler"], {})
+        cmd.run()
+
+        assert parent.output_queue == deque(
+            [
+                ("scheduler", b"line1\n"),
+                ("scheduler", b"line2\n"),
+            ]
+        )
+
+    def test_subcommand_stop_calls_terminate(self, monkeypatch):
+        parent = mock.Mock()
+        fake_process = mock.Mock()
+        monkeypatch.setattr(
+            "airflow.cli.commands.standalone_command.subprocess.Popen",
+            lambda *a, **k: fake_process,
+        )
+        cmd = SubCommand(parent, "scheduler", ["scheduler"], {})
+        cmd.process = fake_process
+        cmd.stop()
+
+        fake_process.terminate.assert_called_once()
+
+    
@mock.patch("airflow.cli.commands.standalone_command.ExecutorLoader.import_default_executor_cls")
+    @mock.patch("airflow.cli.commands.standalone_command.conf.get")
+    @mock.patch.dict(os.environ, {}, clear=True)
+    def test_calculate_env_does_not_force_executor_when_already_local(self, 
mock_conf_get, mock_import):
+        class LocalExecutor:
+            is_local = True
+
+        mock_import.return_value = (LocalExecutor, None)
+        mock_conf_get.return_value = (
+            
"airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager"
+        )
+        cmd = StandaloneCommand()
+        env = cmd.calculate_env()
+
+        assert "AIRFLOW__CORE__EXECUTOR" not in env

Reply via email to