This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fea69dd4ee6 Remove AIP-44 from airflow/dag_processing/processor.py
(#44532)
fea69dd4ee6 is described below
commit fea69dd4ee6422ec1909c5a6f6c58d8d93aa27ee
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun Dec 1 15:17:31 2024 +0100
Remove AIP-44 from airflow/dag_processing/processor.py (#44532)
---
airflow/dag_processing/processor.py | 22 +++-------------------
1 file changed, 3 insertions(+), 19 deletions(-)
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index c44da8c8570..4c451adf00c 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -32,7 +32,6 @@ from setproctitle import setproctitle
from sqlalchemy import delete, event, select
from airflow import settings
-from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import (
DagCallbackRequest,
TaskCallbackRequest,
@@ -44,6 +43,7 @@ from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.errors import ParseImportError
+from airflow.models.pool import Pool
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, _run_finished_callback
from airflow.stats import Stats
@@ -430,7 +430,6 @@ class DagFileProcessor(LoggingMixin):
self._last_num_of_db_queries = 0
@staticmethod
- @internal_api_call
@provide_session
def update_import_errors(
file_last_changed: dict[str, datetime],
@@ -494,7 +493,8 @@ class DagFileProcessor(LoggingMixin):
session.flush()
@classmethod
- def update_dag_warnings(cla, *, dagbag: DagBag) -> None:
+ @provide_session
+ def update_dag_warnings(cla, *, dagbag: DagBag, session: Session =
NEW_SESSION) -> None:
"""Validate and raise exception if any task in a dag is using a
non-existent pool."""
def get_pools(dag) -> dict[str, set[str]]:
@@ -504,15 +504,6 @@ class DagFileProcessor(LoggingMixin):
for dag in dagbag.dags.values():
pool_dict.update(get_pools(dag))
dag_ids = {dag.dag_id for dag in dagbag.dags.values()}
- return
DagFileProcessor._validate_task_pools_and_update_dag_warnings(pool_dict,
dag_ids)
-
- @classmethod
- @internal_api_call
- @provide_session
- def _validate_task_pools_and_update_dag_warnings(
- cls, pool_dict: dict[str, set[str]], dag_ids: set[str], session:
Session = NEW_SESSION
- ) -> None:
- from airflow.models.pool import Pool
all_pools = {p.pool for p in Pool.get_pools(session)}
warnings: set[DagWarning] = set()
@@ -541,11 +532,8 @@ class DagFileProcessor(LoggingMixin):
for warning_to_add in warnings:
session.merge(warning_to_add)
- session.flush()
- session.commit()
@classmethod
- @internal_api_call
@provide_session
def execute_callbacks(
cls,
@@ -582,7 +570,6 @@ class DagFileProcessor(LoggingMixin):
session.commit()
@classmethod
- @internal_api_call
@provide_session
def execute_callbacks_without_dag(
cls, callback_requests: list[CallbackRequest], unit_test_mode: bool,
session: Session = NEW_SESSION
@@ -621,8 +608,6 @@ class DagFileProcessor(LoggingMixin):
DAG.execute_callback(callbacks, context, dag.dag_id)
@classmethod
- @internal_api_call
- @provide_session
def _execute_task_callbacks(
cls, dagbag: DagBag | None, request: TaskCallbackRequest,
unit_test_mode: bool, session: Session
) -> None:
@@ -779,7 +764,6 @@ class DagFileProcessor(LoggingMixin):
return self._last_num_of_db_queries
@staticmethod
- @internal_api_call
@provide_session
def save_dag_to_db(
dags: dict[str, DAG],