This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 301b772d573 Remove findings from positional session check in Core CLI
(#67771)
301b772d573 is described below
commit 301b772d573e68d13a9c692d856fd4fd43c49519
Author: Jens Scheffler <[email protected]>
AuthorDate: Sat May 30 17:03:57 2026 +0200
Remove findings from positional session check in Core CLI (#67771)
* Fix exceptions of positional session use in airflow-core cli
* Clean pytests as well
---
airflow-core/src/airflow/cli/commands/dag_command.py | 16 ++++++++--------
airflow-core/src/airflow/cli/commands/jobs_command.py | 2 +-
airflow-core/src/airflow/cli/commands/task_command.py | 2 +-
airflow-core/src/airflow/cli/commands/team_command.py | 8 ++++----
.../src/airflow/cli/commands/variable_command.py | 8 ++++++--
.../unit/cli/commands/test_rotate_fernet_key_command.py | 15 ++++++++++++---
scripts/ci/prek/known_provide_session_positional.txt | 6 ------
7 files changed, 32 insertions(+), 25 deletions(-)
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py
b/airflow-core/src/airflow/cli/commands/dag_command.py
index 267fcad6a22..c0aa6e206b5 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -362,7 +362,7 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
-def dag_state(args, session: Session = NEW_SESSION) -> None:
+def dag_state(args, *, session: Session = NEW_SESSION) -> None:
"""
Return the state (and conf if exists) of a DagRun at the command line.
@@ -473,7 +473,7 @@ def dag_next_execution(args) -> None:
@suppress_logs_and_warning
@providers_configuration_loaded
@provide_session
-def dag_list_dags(args, session: Session = NEW_SESSION) -> None:
+def dag_list_dags(args, *, session: Session = NEW_SESSION) -> None:
"""Display dags with or without stats at the command line."""
cols = args.columns if args.columns else []
@@ -561,7 +561,7 @@ def dag_list_dags(args, session: Session = NEW_SESSION) ->
None:
@suppress_logs_and_warning
@providers_configuration_loaded
@provide_session
-def dag_details(args, session: Session = NEW_SESSION):
+def dag_details(args, *, session: Session = NEW_SESSION):
"""Get DAG details given a DAG id."""
dag = DagModel.get_dagmodel(args.dag_id, session=session)
if not dag:
@@ -583,7 +583,7 @@ def dag_details(args, session: Session = NEW_SESSION):
@suppress_logs_and_warning
@providers_configuration_loaded
@provide_session
-def dag_list_import_errors(args, session: Session = NEW_SESSION) -> None:
+def dag_list_import_errors(args, *, session: Session = NEW_SESSION) -> None:
"""Display dags with import errors on the command line."""
data = []
@@ -672,7 +672,7 @@ def dag_report(args) -> None:
@suppress_logs_and_warning
@providers_configuration_loaded
@provide_session
-def dag_list_jobs(args, dag: DAG | None = None, session: Session =
NEW_SESSION) -> None:
+def dag_list_jobs(args, dag: DAG | None = None, *, session: Session =
NEW_SESSION) -> None:
"""List latest n jobs."""
queries = []
if dag:
@@ -703,7 +703,7 @@ def dag_list_jobs(args, dag: DAG | None = None, session:
Session = NEW_SESSION)
@suppress_logs_and_warning
@providers_configuration_loaded
@provide_session
-def dag_list_dag_runs(args, dag: DAG | None = None, session: Session =
NEW_SESSION) -> None:
+def dag_list_dag_runs(args, dag: DAG | None = None, *, session: Session =
NEW_SESSION) -> None:
"""List dag runs for a given DAG."""
if dag:
args.dag_id = dag.dag_id
@@ -741,7 +741,7 @@ def dag_list_dag_runs(args, dag: DAG | None = None,
session: Session = NEW_SESSI
@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
-def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) ->
None:
+def dag_test(args, dag: DAG | None = None, *, session: Session = NEW_SESSION)
-> None:
"""Execute one single DagRun for a given DAG and logical date."""
run_conf = None
if args.conf:
@@ -799,7 +799,7 @@ def dag_test(args, dag: DAG | None = None, session: Session
= NEW_SESSION) -> No
@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
-def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
+def dag_reserialize(args, *, session: Session = NEW_SESSION) -> None:
"""Serialize a DAG instance."""
manager = DagBundlesManager()
manager.sync_bundles_to_db(session=session)
diff --git a/airflow-core/src/airflow/cli/commands/jobs_command.py
b/airflow-core/src/airflow/cli/commands/jobs_command.py
index 33bd97b6c31..194d8720db2 100644
--- a/airflow-core/src/airflow/cli/commands/jobs_command.py
+++ b/airflow-core/src/airflow/cli/commands/jobs_command.py
@@ -31,7 +31,7 @@ if TYPE_CHECKING:
@providers_configuration_loaded
@provide_session
-def check(args, session: Session = NEW_SESSION) -> None:
+def check(args, *, session: Session = NEW_SESSION) -> None:
"""Check if job(s) are still alive."""
if args.allow_multiple and args.limit <= 1:
raise SystemExit("To use option --allow-multiple, you must set the
limit to a value greater than 1.")
diff --git a/airflow-core/src/airflow/cli/commands/task_command.py
b/airflow-core/src/airflow/cli/commands/task_command.py
index df9c44910fa..af944526184 100644
--- a/airflow-core/src/airflow/cli/commands/task_command.py
+++ b/airflow-core/src/airflow/cli/commands/task_command.py
@@ -358,7 +358,7 @@ def _guess_debugger() -> _SupportedDebugger:
@suppress_logs_and_warning
@providers_configuration_loaded
@provide_session
-def task_states_for_dag_run(args, session: Session = NEW_SESSION) -> None:
+def task_states_for_dag_run(args, *, session: Session = NEW_SESSION) -> None:
"""Get the status of all task instances in a DagRun."""
dag_run, _ = fetch_dag_run_from_run_id_or_logical_date_string(
dag_id=args.dag_id,
diff --git a/airflow-core/src/airflow/cli/commands/team_command.py
b/airflow-core/src/airflow/cli/commands/team_command.py
index 7928b3d716b..6106fcddb2e 100644
--- a/airflow-core/src/airflow/cli/commands/team_command.py
+++ b/airflow-core/src/airflow/cli/commands/team_command.py
@@ -61,7 +61,7 @@ def _extract_team_name(args):
@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
-def team_create(args, session=NEW_SESSION):
+def team_create(args, *, session=NEW_SESSION):
"""Create a new team. Team names must be 3-50 characters long and contain
only alphanumeric characters, hyphens, and underscores."""
team_name = _extract_team_name(args)
@@ -84,7 +84,7 @@ def team_create(args, session=NEW_SESSION):
@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
-def team_delete(args, session=NEW_SESSION):
+def team_delete(args, *, session=NEW_SESSION):
"""Delete a team after checking for associations."""
team_name = _extract_team_name(args)
@@ -149,7 +149,7 @@ def team_delete(args, session=NEW_SESSION):
@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
-def team_list(args, session=NEW_SESSION):
+def team_list(args, *, session=NEW_SESSION):
"""List all teams."""
teams = session.scalars(select(Team).order_by(Team.name)).all()
if not teams:
@@ -161,7 +161,7 @@ def team_list(args, session=NEW_SESSION):
@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
-def team_sync(args, session=NEW_SESSION):
+def team_sync(args, *, session=NEW_SESSION):
"""Sync missing teams from the dag bundle config."""
dag_bundle_teams = {
bundle.team_name
diff --git a/airflow-core/src/airflow/cli/commands/variable_command.py
b/airflow-core/src/airflow/cli/commands/variable_command.py
index 85166cb6e79..8701b3c8808 100644
--- a/airflow-core/src/airflow/cli/commands/variable_command.py
+++ b/airflow-core/src/airflow/cli/commands/variable_command.py
@@ -21,6 +21,7 @@ from __future__ import annotations
import json
import os
+from typing import TYPE_CHECKING
from sqlalchemy import select
@@ -36,7 +37,10 @@ from airflow.secrets.local_filesystem import load_variables
from airflow.utils import cli as cli_utils
from airflow.utils.cli import suppress_logs_and_warning
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
-from airflow.utils.session import create_session, provide_session
+from airflow.utils.session import NEW_SESSION, create_session, provide_session
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm.session import Session
class VariableDisplayMapper:
@@ -122,7 +126,7 @@ def variables_delete(args):
@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
-def variables_import(args, session):
+def variables_import(args, *, session: Session = NEW_SESSION):
"""Import variables from a given file."""
if not os.path.exists(args.file):
raise SystemExit("Missing variables file.")
diff --git
a/airflow-core/tests/unit/cli/commands/test_rotate_fernet_key_command.py
b/airflow-core/tests/unit/cli/commands/test_rotate_fernet_key_command.py
index 8f29b29a495..b2c5ad1b140 100644
--- a/airflow-core/tests/unit/cli/commands/test_rotate_fernet_key_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_rotate_fernet_key_command.py
@@ -16,6 +16,9 @@
# under the License.
from __future__ import annotations
+from argparse import ArgumentParser
+from typing import TYPE_CHECKING
+
import pytest
from cryptography.fernet import Fernet
from sqlalchemy import select
@@ -30,10 +33,15 @@ from airflow.utils.session import provide_session
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_connections, clear_db_variables
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
pytestmark = pytest.mark.db_test
class TestRotateFernetKeyCommand:
+ parser: ArgumentParser
+
@classmethod
def setup_class(cls):
cls.parser = cli_parser.get_parser()
@@ -47,7 +55,7 @@ class TestRotateFernetKeyCommand:
clear_db_variables()
@provide_session
- def test_should_rotate_variable(self, session):
+ def test_should_rotate_variable(self, *, session: Session):
fernet_key1 = Fernet.generate_key()
fernet_key2 = Fernet.generate_key()
var1_key = f"{__file__}_var1"
@@ -73,13 +81,14 @@ class TestRotateFernetKeyCommand:
with conf_vars({("core", "fernet_key"): fernet_key2.decode()}):
get_fernet.cache_clear() # Clear cached fernet
var1 = session.scalar(select(Variable).where(Variable.key ==
var1_key))
+ assert var1
# Unencrypted variable should be unchanged
assert Variable.get(key=var1_key) == "value"
assert var1._val == "value"
assert Variable.get(key=var2_key) == "value"
@provide_session
- def test_should_rotate_connection(self, session, mock_supervisor_comms):
+ def test_should_rotate_connection(self, mock_supervisor_comms, *, session:
Session):
fernet_key1 = Fernet.generate_key()
fernet_key2 = Fernet.generate_key()
var1_key = f"{__file__}_var1"
@@ -128,7 +137,7 @@ class TestRotateFernetKeyCommand:
get_fernet.cache_clear() # Clear cached fernet
# Unencrypted variable should be unchanged
- conn1: Connection = BaseHook.get_connection(var1_key)
+ conn1 = BaseHook.get_connection(var1_key)
assert conn1.password == "pass"
# Mock for the second connection
diff --git a/scripts/ci/prek/known_provide_session_positional.txt
b/scripts/ci/prek/known_provide_session_positional.txt
index ab197c350d0..73b1e8ddc79 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -1,8 +1,3 @@
-airflow-core/src/airflow/cli/commands/dag_command.py::8
-airflow-core/src/airflow/cli/commands/jobs_command.py::1
-airflow-core/src/airflow/cli/commands/task_command.py::1
-airflow-core/src/airflow/cli/commands/team_command.py::4
-airflow-core/src/airflow/cli/commands/variable_command.py::1
airflow-core/src/airflow/jobs/base_job_runner.py::2
airflow-core/src/airflow/jobs/job.py::7
airflow-core/src/airflow/jobs/scheduler_job_runner.py::11
@@ -42,7 +37,6 @@ airflow-core/src/airflow/utils/cli_action_loggers.py::1
airflow-core/src/airflow/utils/db.py::7
airflow-core/src/airflow/utils/db_cleanup.py::2
airflow-core/src/airflow/utils/log/file_task_handler.py::1
-airflow-core/tests/unit/cli/commands/test_rotate_fernet_key_command.py::2
airflow-core/tests/unit/jobs/test_scheduler_job.py::1
airflow-core/tests/unit/listeners/test_listeners.py::7
airflow-core/tests/unit/models/test_taskinstance.py::4