This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fea69dd4ee6 Remove AIP-44 from airflow/dag_processing/processor.py 
(#44532)
fea69dd4ee6 is described below

commit fea69dd4ee6422ec1909c5a6f6c58d8d93aa27ee
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun Dec 1 15:17:31 2024 +0100

    Remove AIP-44 from airflow/dag_processing/processor.py (#44532)
---
 airflow/dag_processing/processor.py | 22 +++-------------------
 1 file changed, 3 insertions(+), 19 deletions(-)

diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index c44da8c8570..4c451adf00c 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -32,7 +32,6 @@ from setproctitle import setproctitle
 from sqlalchemy import delete, event, select
 
 from airflow import settings
-from airflow.api_internal.internal_api_call import internal_api_call
 from airflow.callbacks.callback_requests import (
     DagCallbackRequest,
     TaskCallbackRequest,
@@ -44,6 +43,7 @@ from airflow.models.dag import DAG, DagModel
 from airflow.models.dagbag import DagBag
 from airflow.models.dagwarning import DagWarning, DagWarningType
 from airflow.models.errors import ParseImportError
+from airflow.models.pool import Pool
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance, _run_finished_callback
 from airflow.stats import Stats
@@ -430,7 +430,6 @@ class DagFileProcessor(LoggingMixin):
         self._last_num_of_db_queries = 0
 
     @staticmethod
-    @internal_api_call
     @provide_session
     def update_import_errors(
         file_last_changed: dict[str, datetime],
@@ -494,7 +493,8 @@ class DagFileProcessor(LoggingMixin):
         session.flush()
 
     @classmethod
-    def update_dag_warnings(cla, *, dagbag: DagBag) -> None:
+    @provide_session
+    def update_dag_warnings(cla, *, dagbag: DagBag, session: Session = 
NEW_SESSION) -> None:
         """Validate and raise exception if any task in a dag is using a 
non-existent pool."""
 
         def get_pools(dag) -> dict[str, set[str]]:
@@ -504,15 +504,6 @@ class DagFileProcessor(LoggingMixin):
         for dag in dagbag.dags.values():
             pool_dict.update(get_pools(dag))
         dag_ids = {dag.dag_id for dag in dagbag.dags.values()}
-        return 
DagFileProcessor._validate_task_pools_and_update_dag_warnings(pool_dict, 
dag_ids)
-
-    @classmethod
-    @internal_api_call
-    @provide_session
-    def _validate_task_pools_and_update_dag_warnings(
-        cls, pool_dict: dict[str, set[str]], dag_ids: set[str], session: 
Session = NEW_SESSION
-    ) -> None:
-        from airflow.models.pool import Pool
 
         all_pools = {p.pool for p in Pool.get_pools(session)}
         warnings: set[DagWarning] = set()
@@ -541,11 +532,8 @@ class DagFileProcessor(LoggingMixin):
 
         for warning_to_add in warnings:
             session.merge(warning_to_add)
-        session.flush()
-        session.commit()
 
     @classmethod
-    @internal_api_call
     @provide_session
     def execute_callbacks(
         cls,
@@ -582,7 +570,6 @@ class DagFileProcessor(LoggingMixin):
         session.commit()
 
     @classmethod
-    @internal_api_call
     @provide_session
     def execute_callbacks_without_dag(
         cls, callback_requests: list[CallbackRequest], unit_test_mode: bool, 
session: Session = NEW_SESSION
@@ -621,8 +608,6 @@ class DagFileProcessor(LoggingMixin):
             DAG.execute_callback(callbacks, context, dag.dag_id)
 
     @classmethod
-    @internal_api_call
-    @provide_session
     def _execute_task_callbacks(
         cls, dagbag: DagBag | None, request: TaskCallbackRequest, 
unit_test_mode: bool, session: Session
     ) -> None:
@@ -779,7 +764,6 @@ class DagFileProcessor(LoggingMixin):
         return self._last_num_of_db_queries
 
     @staticmethod
-    @internal_api_call
     @provide_session
     def save_dag_to_db(
         dags: dict[str, DAG],

Reply via email to