This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b61bef13a8 Allow writing Task SDK DAG in `serialized_dag` table 
(#48748)
0b61bef13a8 is described below

commit 0b61bef13a8c80426e33298a90d79867371ffeca
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Apr 3 21:03:53 2025 +0530

    Allow writing Task SDK DAG in `serialized_dag` table (#48748)
    
    Pulling part of https://github.com/apache/airflow/pull/48014 out
---
 airflow-core/src/airflow/models/serialized_dag.py  | 32 ++--------------------
 .../tests/unit/models/test_serialized_dag.py       | 19 ++-----------
 2 files changed, 6 insertions(+), 45 deletions(-)

diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index a00a5c424f2..2cb8d7205aa 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -45,7 +45,7 @@ from airflow.models.dagrun import DagRun
 from airflow.sdk.definitions.asset import AssetUniqueKey
 from airflow.serialization.dag_dependency import DagDependency
 from airflow.serialization.serialized_objects import SerializedDAG
-from airflow.settings import COMPRESS_SERIALIZED_DAGS, 
MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
+from airflow.settings import COMPRESS_SERIALIZED_DAGS, json
 from airflow.utils import timezone
 from airflow.utils.hashlib_wrapper import md5
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -57,7 +57,7 @@ if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
     from airflow.models import Operator
-    from airflow.models.dag import DAG
+    from airflow.sdk import DAG
     from airflow.serialization.serialized_objects import LazyDeserializedDAG
 
 log = logging.getLogger(__name__)
@@ -296,7 +296,7 @@ class SerializedDagModel(Base):
     load_op_links = True
 
     def __init__(self, dag: DAG | LazyDeserializedDAG) -> None:
-        from airflow.models.dag import DAG
+        from airflow.sdk import DAG
 
         self.dag_id = dag.dag_id
         dag_data = {}
@@ -558,32 +558,6 @@ class SerializedDagModel(Base):
         """
         return session.scalar(cls.latest_item_select_object(dag_id))
 
-    @staticmethod
-    @provide_session
-    def bulk_sync_to_db(
-        dags: list[DAG] | list[LazyDeserializedDAG],
-        bundle_name: str,
-        bundle_version: str | None = None,
-        session: Session = NEW_SESSION,
-    ) -> None:
-        """
-        Save DAGs as Serialized DAG objects in the database.
-
-        Each DAG is saved in a separate database query.
-
-        :param dags: the DAG objects to save to the DB
-        :param session: ORM Session
-        :return: None
-        """
-        for dag in dags:
-            SerializedDagModel.write_dag(
-                dag=dag,
-                bundle_name=bundle_name,
-                bundle_version=bundle_version,
-                min_update_interval=MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
-                session=session,
-            )
-
     @classmethod
     @provide_session
     def get_last_updated_datetime(cls, dag_id: str, session: Session = 
NEW_SESSION) -> datetime | None:
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 57060a34db3..f20e3cc1375 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -28,14 +28,14 @@ from sqlalchemy import func, select, update
 import airflow.example_dags as example_dags_module
 from airflow.decorators import task as task_decorator
 from airflow.models.asset import AssetModel
-from airflow.models.dag import DAG, DagModel
+from airflow.models.dag import DAG as SchedulerDAG, DagModel
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagbag import DagBag
 from airflow.models.serialized_dag import SerializedDagModel as SDM
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import PythonOperator
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk import DAG, Asset
 from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.settings import json
 from airflow.utils.hashlib_wrapper import md5
@@ -44,7 +44,6 @@ from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 from tests_common.test_utils import db
-from tests_common.test_utils.asserts import assert_queries_count
 
 pytestmark = pytest.mark.db_test
 
@@ -61,7 +60,7 @@ def make_example_dags(module):
             session.add(testing)
 
     dagbag = DagBag(module.__path__[0])
-    DAG.bulk_write_to_db("testing", None, dagbag.dags.values())
+    SchedulerDAG.bulk_write_to_db("testing", None, dagbag.dags.values())
     return dagbag.dags
 
 
@@ -206,18 +205,6 @@ class TestSerializedDagModel:
         # assert only the latest SDM is returned
         assert len(sdags) != len(serialized_dags2)
 
-    def test_bulk_sync_to_db(self, testing_dag_bundle):
-        dags = [
-            DAG("dag_1", schedule=None),
-            DAG("dag_2", schedule=None),
-            DAG("dag_3", schedule=None),
-        ]
-        DAG.bulk_write_to_db("testing", None, dags)
-        # we also write to dag_version and dag_code tables
-        # in dag_version.
-        with assert_queries_count(24):
-            SDM.bulk_sync_to_db(dags, bundle_name="testing")
-
     def test_order_of_dag_params_is_stable(self):
         """
         This asserts that we have logic in place which guarantees the order

Reply via email to