This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b4177cbfb67 Fix exceptions of positional session use in airflow-core
dag_processing (#67772)
b4177cbfb67 is described below
commit b4177cbfb672314263a9d726eb1ca36a97e7cc82
Author: Jens Scheffler <[email protected]>
AuthorDate: Sat May 30 15:28:52 2026 +0200
Fix exceptions of positional session use in airflow-core dag_processing
(#67772)
---
airflow-core/src/airflow/dag_processing/dagbag.py | 2 +-
airflow-core/src/airflow/dag_processing/manager.py | 6 ++++--
scripts/ci/prek/known_provide_session_positional.txt | 2 --
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py
b/airflow-core/src/airflow/dag_processing/dagbag.py
index bd0a6dacb86..dcaab93f1ab 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -245,7 +245,7 @@ class DagBag(LoggingMixin):
return list(self.dags)
@provide_session
- def get_dag(self, dag_id, session: Session = NEW_SESSION):
+ def get_dag(self, dag_id, *, session: Session = NEW_SESSION):
"""
Get the DAG out of the dictionary, and refreshes it if expired.
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index ed5c61c604c..691237cc46e 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -411,6 +411,7 @@ class DagFileProcessorManager(LoggingMixin):
def deactivate_stale_dags(
self,
last_parsed: dict[DagFileInfo, datetime | None],
+ *,
session: Session = NEW_SESSION,
):
"""Detect and deactivate DAGs which are no longer present in files."""
@@ -588,7 +589,7 @@ class DagFileProcessorManager(LoggingMixin):
)
@provide_session
- def _claim_priority_files(self, session: Session = NEW_SESSION) ->
list[DagFileInfo]:
+ def _claim_priority_files(self, *, session: Session = NEW_SESSION) ->
list[DagFileInfo]:
"""Fetch priority parsing requests from the metadata database."""
files: list[DagFileInfo] = []
bundles = {b.name: b for b in self._dag_bundles}
@@ -617,6 +618,7 @@ class DagFileProcessorManager(LoggingMixin):
@retry_db_transaction
def _fetch_callbacks_from_db(
self,
+ *,
session: Session = NEW_SESSION,
) -> list[CallbackRequest]:
"""Fetch callbacks from database and add them to the internal queue
for execution."""
@@ -921,7 +923,7 @@ class DagFileProcessorManager(LoggingMixin):
@provide_session
def clear_orphaned_import_errors(
- self, bundle_name: str, observed_filelocs: set[str], session: Session
= NEW_SESSION
+ self, bundle_name: str, observed_filelocs: set[str], *, session:
Session = NEW_SESSION
):
"""
Clear import errors for files that no longer exist.
diff --git a/scripts/ci/prek/known_provide_session_positional.txt
b/scripts/ci/prek/known_provide_session_positional.txt
index 6fa731efdb0..ab197c350d0 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -3,8 +3,6 @@ airflow-core/src/airflow/cli/commands/jobs_command.py::1
airflow-core/src/airflow/cli/commands/task_command.py::1
airflow-core/src/airflow/cli/commands/team_command.py::4
airflow-core/src/airflow/cli/commands/variable_command.py::1
-airflow-core/src/airflow/dag_processing/dagbag.py::1
-airflow-core/src/airflow/dag_processing/manager.py::4
airflow-core/src/airflow/jobs/base_job_runner.py::2
airflow-core/src/airflow/jobs/job.py::7
airflow-core/src/airflow/jobs/scheduler_job_runner.py::11