This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cd27389b12f Make Airflow CTL tests more like pytests (#60020)
cd27389b12f is described below
commit cd27389b12fb78b83fa7a45ec8580d8f42c95918
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri Jan 2 10:07:18 2026 +0100
Make Airflow CTL tests more like pytests (#60020)
* Make Airflow CTL tests more like pytests
* Turn off console printing in e2e and sdk tests as well
* Fix static checks check
---
.../tests/airflowctl_tests/conftest.py | 110 ----------
.../airflowctl_tests/test_airflowctl_commands.py | 221 +++++++++++++++------
dev/breeze/src/airflow_breeze/utils/run_tests.py | 5 +-
pyproject.toml | 1 +
.../ci/prek/check_airflowctl_command_coverage.py | 10 +-
5 files changed, 168 insertions(+), 179 deletions(-)
diff --git a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
index 5ace8c57adc..3ac692722f2 100644
--- a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
+++ b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py
@@ -20,7 +20,6 @@ import os
import subprocess
import sys
-import pytest
from python_on_whales import DockerClient, docker
from airflowctl_tests import console
@@ -201,112 +200,3 @@ def pytest_sessionfinish(session, exitstatus):
"""Tear down test environment at the end of the pytest session."""
if not os.environ.get("SKIP_DOCKER_COMPOSE_DELETION"):
docker_compose_down()
-
-
-# Fixtures for tests
[email protected]
-def login_command():
- # Passing password via command line is insecure but acceptable for testing
purposes
- # Please do not do this in production, it enables possibility of exposing
your credentials
- return "auth login --username airflow --password airflow"
-
-
[email protected]
-def login_output():
- return "Login successful! Welcome to airflowctl!"
-
-
[email protected]
-def date_param():
- import random
- from datetime import datetime, timedelta
-
- from dateutil.relativedelta import relativedelta
-
- # original datetime string
- dt_str = "2025-10-25T00:02:00+00:00"
-
- # parse to datetime object
- dt = datetime.fromisoformat(dt_str)
-
- # boundaries
- start = dt - relativedelta(months=1)
- end = dt + relativedelta(months=1)
-
- # pick random time between start and end
- delta = end - start
- random_seconds = random.randint(0, int(delta.total_seconds()))
- random_dt = start + timedelta(seconds=random_seconds)
- return random_dt.isoformat()
-
-
[email protected]
-def test_commands(login_command, date_param):
- # Define test commands to run with actual running API server
- return [
- login_command,
- # Assets commands
- "assets list",
- "assets get --asset-id=1",
- "assets create-event --asset-id=1",
- # Backfill commands
- "backfill list",
- # Config commands
- "config get --section core --option executor",
- "config list",
- "config lint",
- # Connections commands
- "connections create --connection-id=test_con --conn-type=mysql
--password=TEST_PASS -o json",
- "connections list",
- "connections list -o yaml",
- "connections list -o table",
- "connections get --conn-id=test_con",
- "connections get --conn-id=test_con -o json",
- "connections update --connection-id=test_con --conn-type=postgres",
- "connections import
tests/airflowctl_tests/fixtures/test_connections.json",
- "connections delete --conn-id=test_con",
- "connections delete --conn-id=test_import_conn",
- # DAGs commands
- "dags list",
- "dags get --dag-id=example_bash_operator",
- "dags get-details --dag-id=example_bash_operator",
- "dags get-stats --dag-ids=example_bash_operator",
- "dags get-version --dag-id=example_bash_operator --version-number=1",
- "dags list-import-errors",
- "dags list-version --dag-id=example_bash_operator",
- "dags list-warning",
- # Order of trigger and pause/unpause is important for test stability
because state checked
- f"dags trigger --dag-id=example_bash_operator
--logical-date={date_param} --run-after={date_param}",
- "dags pause example_bash_operator",
- "dags unpause example_bash_operator",
- # DAG Run commands
- f'dagrun get --dag-id=example_bash_operator
--dag-run-id="manual__{date_param}"',
- "dags update --dag-id=example_bash_operator --no-is-paused",
- # DAG Run commands
- "dagrun list --dag-id example_bash_operator --state success --limit=1",
- # Jobs commands
- "jobs list",
- # Pools commands
- "pools create --name=test_pool --slots=5",
- "pools list",
- "pools get --pool-name=test_pool",
- "pools get --pool-name=test_pool -o yaml",
- "pools update --pool=test_pool --slots=10",
- "pools import tests/airflowctl_tests/fixtures/test_pools.json",
- "pools delete --pool=test_pool",
- "pools delete --pool=test_import_pool",
- # Providers commands
- "providers list",
- # Variables commands
- "variables create --key=test_key --value=test_value",
- "variables list",
- "variables get --variable-key=test_key",
- "variables get --variable-key=test_key -o table",
- "variables update --key=test_key --value=updated_value",
- "variables import tests/airflowctl_tests/fixtures/test_variables.json",
- "variables delete --variable-key=test_key",
- "variables delete --variable-key=test_import_var",
- "variables delete --variable-key=test_import_var_with_desc",
- # Version command
- "version --remote",
- ]
diff --git
a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py
b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py
index 7e72be0eb6c..2dc11f0a7e8 100644
--- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py
+++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py
@@ -19,72 +19,167 @@ from __future__ import annotations
import os
from subprocess import PIPE, STDOUT, Popen
+import pytest
+
from airflowctl_tests import console
-def test_airflowctl_commands(login_command, login_output, test_commands):
+def date_param():
+ import random
+ from datetime import datetime, timedelta
+
+ from dateutil.relativedelta import relativedelta
+
+ # original datetime string
+ dt_str = "2025-10-25T00:02:00+00:00"
+
+ # parse to datetime object
+ dt = datetime.fromisoformat(dt_str)
+
+ # boundaries
+ start = dt - relativedelta(months=1)
+ end = dt + relativedelta(months=1)
+
+ # pick random time between start and end
+ delta = end - start
+ random_seconds = random.randint(0, int(delta.total_seconds()))
+ random_dt = start + timedelta(seconds=random_seconds)
+ return random_dt.isoformat()
+
+
+LOGIN_COMMAND = "auth login --username airflow --password airflow"
+LOGIN_OUTPUT = "Login successful! Welcome to airflowctl!"
+ONE_DATE_PARAM = date_param()
+TEST_COMMANDS = [
+ # Passing password via command line is insecure but acceptable for testing
purposes
+ # Please do not do this in production, it enables possibility of exposing
your credentials
+ LOGIN_COMMAND,
+ # Assets commands
+ "assets list",
+ "assets get --asset-id=1",
+ "assets create-event --asset-id=1",
+ # Backfill commands
+ "backfill list",
+ # Config commands
+ "config get --section core --option executor",
+ "config list",
+ "config lint",
+ # Connections commands
+ "connections create --connection-id=test_con --conn-type=mysql
--password=TEST_PASS -o json",
+ "connections list",
+ "connections list -o yaml",
+ "connections list -o table",
+ "connections get --conn-id=test_con",
+ "connections get --conn-id=test_con -o json",
+ "connections update --connection-id=test_con --conn-type=postgres",
+ "connections import tests/airflowctl_tests/fixtures/test_connections.json",
+ "connections delete --conn-id=test_con",
+ "connections delete --conn-id=test_import_conn",
+ # DAGs commands
+ "dags list",
+ "dags get --dag-id=example_bash_operator",
+ "dags get-details --dag-id=example_bash_operator",
+ "dags get-stats --dag-ids=example_bash_operator",
+ "dags get-version --dag-id=example_bash_operator --version-number=1",
+ "dags list-import-errors",
+ "dags list-version --dag-id=example_bash_operator",
+ "dags list-warning",
+ # Order of trigger and pause/unpause is important for test stability
because state checked
+ f"dags trigger --dag-id=example_bash_operator
--logical-date={ONE_DATE_PARAM} --run-after={ONE_DATE_PARAM}",
+ "dags pause example_bash_operator",
+ "dags unpause example_bash_operator",
+ # DAG Run commands
+ f'dagrun get --dag-id=example_bash_operator
--dag-run-id="manual__{ONE_DATE_PARAM}"',
+ "dags update --dag-id=example_bash_operator --no-is-paused",
+ # DAG Run commands
+ "dagrun list --dag-id example_bash_operator --state success --limit=1",
+ # Jobs commands
+ "jobs list",
+ # Pools commands
+ "pools create --name=test_pool --slots=5",
+ "pools list",
+ "pools get --pool-name=test_pool",
+ "pools get --pool-name=test_pool -o yaml",
+ "pools update --pool=test_pool --slots=10",
+ "pools import tests/airflowctl_tests/fixtures/test_pools.json",
+ "pools delete --pool=test_pool",
+ "pools delete --pool=test_import_pool",
+ # Providers commands
+ "providers list",
+ # Variables commands
+ "variables create --key=test_key --value=test_value",
+ "variables list",
+ "variables get --variable-key=test_key",
+ "variables get --variable-key=test_key -o table",
+ "variables update --key=test_key --value=updated_value",
+ "variables import tests/airflowctl_tests/fixtures/test_variables.json",
+ "variables delete --variable-key=test_key",
+ "variables delete --variable-key=test_import_var",
+ "variables delete --variable-key=test_import_var_with_desc",
+ # Version command
+ "version --remote",
+]
+
+
[email protected](
+ "command", TEST_COMMANDS, ids=[" ".join(id.split(" ", 2)[:2]) for id in
TEST_COMMANDS]
+)
+def test_airflowctl_commands(command: str):
"""Test airflowctl commands using docker-compose environment."""
host_envs = os.environ.copy()
host_envs["AIRFLOW_CLI_DEBUG_MODE"] = "true"
- # Testing commands of airflowctl
- for command in test_commands:
- command_from_config = f"airflowctl {command}"
- # We need to run auth login first for all commands except login itself
- if command != login_command:
- run_command = f"airflowctl {login_command} &&
{command_from_config}"
- else:
- run_command = command_from_config
- console.print(f"[yellow]Running command: {command}")
-
- # Give some time for the command to execute and output to be ready
- proc = Popen(run_command.encode(), stdout=PIPE, stderr=STDOUT,
stdin=PIPE, shell=True, env=host_envs)
- stdout_result, stderr_result = proc.communicate(timeout=60)
-
- # CLI command gave errors
- if stderr_result:
- console.print(
- f"[red]Errors while executing command
'{command_from_config}':\n{stderr_result.decode()}"
- )
-
- # Decode the output
- stdout_result = stdout_result.decode()
- # We need to trim auth login output if the command is not login itself
and clean backspaces
- if command != login_command:
- if login_output not in stdout_result:
- console.print(
- f"[red]❌ Login output not found before command output for
'{command_from_config}'"
- )
- console.print(f"[red]Full output:\n{stdout_result}\n")
- raise AssertionError("Login output not found before command
output")
- stdout_result = stdout_result.split(f"{login_output}\n")[1].strip()
- else:
- stdout_result = stdout_result.strip()
-
- # Check for non-zero exit code
- if proc.returncode != 0:
- console.print(f"[red]❌ Command '{command_from_config}' exited with
code {proc.returncode}")
- console.print(f"[red]Output:\n{stdout_result}\n")
- raise AssertionError(
- f"Command exited with non-zero code
{proc.returncode}\nOutput:\n{stdout_result}"
- )
-
- # Error patterns to detect failures that might otherwise slip through
- # Please ensure it is aligning with
airflowctl.api.client.get_json_error
- error_patterns = [
- "Server error",
- "command error",
- "unrecognized arguments",
- "invalid choice",
- "Traceback (most recent call last):",
- ]
- matched_error = next((error for error in error_patterns if error in
stdout_result), None)
- if matched_error:
- console.print(f"[red]❌ Output contained unexpected text for
command '{command_from_config}'")
- console.print(f"[red]Matched error pattern: {matched_error}\n")
- console.print(f"[red]Output:\n{stdout_result}\n")
- raise AssertionError(
- f"Output contained error pattern
'{matched_error}'\nOutput:\n{stdout_result}"
- )
- console.print(f"[green]✅ Output did not contain unexpected text for
command '{command_from_config}'")
- console.print(f"[cyan]Result:\n{stdout_result}\n")
- proc.kill()
+
+ command_from_config = f"airflowctl {command}"
+ # We need to run auth login first for all commands except login itself
+ if command != LOGIN_COMMAND:
+ run_command = f"airflowctl {LOGIN_COMMAND} && {command_from_config}"
+ else:
+ run_command = command_from_config
+ console.print(f"[yellow]Running command: {command}")
+
+ # Give some time for the command to execute and output to be ready
+ proc = Popen(run_command.encode(), stdout=PIPE, stderr=STDOUT, stdin=PIPE,
shell=True, env=host_envs)
+ stdout_bytes, stderr_result = proc.communicate(timeout=60)
+
+ # CLI command gave errors
+ assert not stderr_result, (
+ f"Errors while executing command
'{command_from_config}':\n{stderr_result.decode()}"
+ )
+
+ # Decode the output
+ stdout_result = stdout_bytes.decode()
+ # We need to trim auth login output if the command is not login itself and
clean backspaces
+ if command != LOGIN_COMMAND:
+ assert LOGIN_OUTPUT in stdout_result, (
+ f"❌ Login output not found before command output for
'{command_from_config}'",
+ f"\nFull output:\n{stdout_result}",
+ )
+ stdout_result = stdout_result.split(f"{LOGIN_OUTPUT}\n")[1].strip()
+ else:
+ stdout_result = stdout_result.strip()
+
+ # Check for non-zero exit code
+ assert proc.returncode == 0, (
+ f"❌ Command '{command_from_config}' exited with code
{proc.returncode}",
+ f"\nOutput:\n{stdout_result}",
+ )
+
+ # Error patterns to detect failures that might otherwise slip through
+ # Please ensure it is aligning with airflowctl.api.client.get_json_error
+ error_patterns = [
+ "Server error",
+ "command error",
+ "unrecognized arguments",
+ "invalid choice",
+ "Traceback (most recent call last):",
+ ]
+ matched_error = next((error for error in error_patterns if error in
stdout_result), None)
+ assert not matched_error, (
+ f"❌ Output contained unexpected text for command
'{command_from_config}'",
+ f"\nMatched error pattern: {matched_error}",
+ f"\nOutput:\n{stdout_result}",
+ )
+
+ console.print(f"[green]✅ Output did not contain unexpected text for
command '{command_from_config}'")
+ console.print(f"[cyan]Result:\n{stdout_result}\n")
+ proc.kill()
diff --git a/dev/breeze/src/airflow_breeze/utils/run_tests.py
b/dev/breeze/src/airflow_breeze/utils/run_tests.py
index eb3529a5746..9dff5d3935b 100644
--- a/dev/breeze/src/airflow_breeze/utils/run_tests.py
+++ b/dev/breeze/src/airflow_breeze/utils/run_tests.py
@@ -143,6 +143,8 @@ def run_docker_compose_tests(
)
sys.exit(1)
+ # Always with color
+ pytest_args = ["--color=yes"]
if test_type == "task-sdk-integration":
test_path = Path("tests") / "task_sdk_tests"
cwd = TASK_SDK_INTEGRATION_TESTS_ROOT_PATH.as_posix()
@@ -155,10 +157,9 @@ def run_docker_compose_tests(
else:
test_path = Path("tests") / "docker_tests" /
"test_docker_compose_quick_start.py"
cwd = DOCKER_TESTS_ROOT_PATH.as_posix()
+ pytest_args.append("-s") # -s to see print outputs as they come
all_tests = [test_path.as_posix()]
- # Always with color and -s to see print outputs as they come
- pytest_args = ["--color=yes", "-s"]
if not any(pytest_arg.startswith("tests/") for pytest_arg in
extra_pytest_args):
# Only add all tests when no tests were specified on the command line
pytest_args.extend(all_tests)
diff --git a/pyproject.toml b/pyproject.toml
index 39a90b593a8..1102d57e3b7 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -771,6 +771,7 @@ testing = ["dev", "providers.tests", "tests_common",
"tests", "system", "unit",
"dev/breeze/tests/*" = ["TID253", "S101", "TRY002"]
"devel-common/tests/*" = ["S101"]
"airflow-core/tests/*" = ["D", "TID253", "S101", "TRY002"]
+"airflow-ctl-tests/tests/*" = ["D", "TID253", "S101", "TRY002"]
"airflow-e2e-tests/tests/*" = ["D", "TID253", "S101", "TRY002"]
"docker-tests/*" = ["D", "TID253", "S101", "TRY002"]
"task-sdk-integration-tests/*" = ["D", "TID253", "S101", "TRY002"]
diff --git a/scripts/ci/prek/check_airflowctl_command_coverage.py
b/scripts/ci/prek/check_airflowctl_command_coverage.py
index 70c9523e07d..8a81dd098c6 100755
--- a/scripts/ci/prek/check_airflowctl_command_coverage.py
+++ b/scripts/ci/prek/check_airflowctl_command_coverage.py
@@ -37,7 +37,9 @@ sys.path.insert(0, str(Path(__file__).parent.resolve()))
from common_prek_utils import AIRFLOW_ROOT_PATH, console
OPERATIONS_FILE = AIRFLOW_ROOT_PATH / "airflow-ctl" / "src" / "airflowctl" /
"api" / "operations.py"
-CONFTEST_FILE = AIRFLOW_ROOT_PATH / "airflow-ctl-tests" / "tests" /
"airflowctl_tests" / "conftest.py"
+CTL_TESTS_FILE = (
+ AIRFLOW_ROOT_PATH / "airflow-ctl-tests" / "tests" / "airflowctl_tests" /
"test_airflowctl_commands.py"
+)
# Operations excluded from CLI (see cli_config.py)
EXCLUDED_OPERATION_CLASSES = {"BaseOperations", "LoginOperations",
"VersionOperations"}
@@ -102,7 +104,7 @@ def parse_operations() -> dict[str, list[str]]:
def parse_tested_commands() -> set[str]:
tested: set[str] = set()
- with open(CONFTEST_FILE) as f:
+ with open(CTL_TESTS_FILE) as f:
content = f.read()
# Match command patterns like "assets list", "dags list-import-errors",
etc.
@@ -133,8 +135,8 @@ def main():
console.print(f" [red]- {cmd}[/]")
console.print()
console.print("[yellow]Fix by either:[/]")
- console.print("1. Add test to
airflow-ctl-tests/tests/airflowctl_tests/conftest.py")
- console.print("2. Add to EXCLUDED_COMMANDS in
scripts/ci/prek/check_airflowctl_command_coverage.py")
+ console.print(f"1. Add test to {CTL_TESTS_FILE}")
+ console.print(f"2. Add to EXCLUDED_COMMANDS in {__file__}")
sys.exit(1)
total = sum(len(cmds) for cmds in available.values())