This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0b61bef13a8 Allow writing Task SDK DAG in `serialized_dag` table
(#48748)
0b61bef13a8 is described below
commit 0b61bef13a8c80426e33298a90d79867371ffeca
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Apr 3 21:03:53 2025 +0530
Allow writing Task SDK DAG in `serialized_dag` table (#48748)
Pulling part of https://github.com/apache/airflow/pull/48014 out
---
airflow-core/src/airflow/models/serialized_dag.py | 32 ++--------------------
.../tests/unit/models/test_serialized_dag.py | 19 ++-----------
2 files changed, 6 insertions(+), 45 deletions(-)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index a00a5c424f2..2cb8d7205aa 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -45,7 +45,7 @@ from airflow.models.dagrun import DagRun
from airflow.sdk.definitions.asset import AssetUniqueKey
from airflow.serialization.dag_dependency import DagDependency
from airflow.serialization.serialized_objects import SerializedDAG
-from airflow.settings import COMPRESS_SERIALIZED_DAGS,
MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
+from airflow.settings import COMPRESS_SERIALIZED_DAGS, json
from airflow.utils import timezone
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.session import NEW_SESSION, provide_session
@@ -57,7 +57,7 @@ if TYPE_CHECKING:
from sqlalchemy.orm import Session
from airflow.models import Operator
- from airflow.models.dag import DAG
+ from airflow.sdk import DAG
from airflow.serialization.serialized_objects import LazyDeserializedDAG
log = logging.getLogger(__name__)
@@ -296,7 +296,7 @@ class SerializedDagModel(Base):
load_op_links = True
def __init__(self, dag: DAG | LazyDeserializedDAG) -> None:
- from airflow.models.dag import DAG
+ from airflow.sdk import DAG
self.dag_id = dag.dag_id
dag_data = {}
@@ -558,32 +558,6 @@ class SerializedDagModel(Base):
"""
return session.scalar(cls.latest_item_select_object(dag_id))
- @staticmethod
- @provide_session
- def bulk_sync_to_db(
- dags: list[DAG] | list[LazyDeserializedDAG],
- bundle_name: str,
- bundle_version: str | None = None,
- session: Session = NEW_SESSION,
- ) -> None:
- """
- Save DAGs as Serialized DAG objects in the database.
-
- Each DAG is saved in a separate database query.
-
- :param dags: the DAG objects to save to the DB
- :param session: ORM Session
- :return: None
- """
- for dag in dags:
- SerializedDagModel.write_dag(
- dag=dag,
- bundle_name=bundle_name,
- bundle_version=bundle_version,
- min_update_interval=MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
- session=session,
- )
-
@classmethod
@provide_session
def get_last_updated_datetime(cls, dag_id: str, session: Session =
NEW_SESSION) -> datetime | None:
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 57060a34db3..f20e3cc1375 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -28,14 +28,14 @@ from sqlalchemy import func, select, update
import airflow.example_dags as example_dags_module
from airflow.decorators import task as task_decorator
from airflow.models.asset import AssetModel
-from airflow.models.dag import DAG, DagModel
+from airflow.models.dag import DAG as SchedulerDAG, DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagbag import DagBag
from airflow.models.serialized_dag import SerializedDagModel as SDM
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk import DAG, Asset
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.settings import json
from airflow.utils.hashlib_wrapper import md5
@@ -44,7 +44,6 @@ from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils import db
-from tests_common.test_utils.asserts import assert_queries_count
pytestmark = pytest.mark.db_test
@@ -61,7 +60,7 @@ def make_example_dags(module):
session.add(testing)
dagbag = DagBag(module.__path__[0])
- DAG.bulk_write_to_db("testing", None, dagbag.dags.values())
+ SchedulerDAG.bulk_write_to_db("testing", None, dagbag.dags.values())
return dagbag.dags
@@ -206,18 +205,6 @@ class TestSerializedDagModel:
# assert only the latest SDM is returned
assert len(sdags) != len(serialized_dags2)
- def test_bulk_sync_to_db(self, testing_dag_bundle):
- dags = [
- DAG("dag_1", schedule=None),
- DAG("dag_2", schedule=None),
- DAG("dag_3", schedule=None),
- ]
- DAG.bulk_write_to_db("testing", None, dags)
- # we also write to dag_version and dag_code tables
- # in dag_version.
- with assert_queries_count(24):
- SDM.bulk_sync_to_db(dags, bundle_name="testing")
-
def test_order_of_dag_params_is_stable(self):
"""
This asserts that we have logic in place which guarantees the order