This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 485d78f8c6e Update example dags to import `DAG`, `Asset` etc from Task 
SDK (#48014)
485d78f8c6e is described below

commit 485d78f8c6e05a05547f9eb822f122e309bef4f8
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Apr 4 04:18:21 2025 +0530

    Update example dags to import `DAG`, `Asset` etc from Task SDK (#48014)
    
    Users of Airflow 3 should import things from Task SDK.
    
    This PR updates imports for DAG, Asset, Param to use SDK.
    
    related #47851
    
    closes #48433
---
 .../src/airflow/cli/commands/dag_command.py        |  3 +-
 .../src/airflow/cli/commands/task_command.py       |  4 +-
 .../airflow/example_dags/example_asset_alias.py    |  3 +-
 .../example_asset_alias_with_no_taskflow.py        |  3 +-
 .../example_dags/example_asset_decorator.py        |  3 +-
 .../example_dags/example_asset_with_watchers.py    |  3 +-
 .../src/airflow/example_dags/example_assets.py     |  3 +-
 .../airflow/example_dags/example_bash_operator.py  |  5 +-
 .../example_branch_datetime_operator.py            |  2 +-
 .../example_branch_day_of_week_operator.py         |  2 +-
 .../airflow/example_dags/example_branch_labels.py  |  2 +-
 .../example_dags/example_branch_operator.py        |  2 +-
 .../example_branch_operator_decorator.py           |  2 +-
 .../example_branch_python_dop_operator_3.py        |  2 +-
 .../src/airflow/example_dags/example_complex.py    |  3 +-
 .../airflow/example_dags/example_custom_weight.py  |  6 +-
 .../airflow/example_dags/example_dag_decorator.py  |  2 +-
 .../example_dags/example_dynamic_task_mapping.py   |  2 +-
 ...amic_task_mapping_with_no_taskflow_operators.py |  2 +-
 .../example_external_task_marker_dag.py            |  2 +-
 .../example_dags/example_inlet_event_extra.py      |  3 +-
 .../example_dags/example_kubernetes_executor.py    |  2 +-
 .../airflow/example_dags/example_latest_only.py    |  2 +-
 .../example_latest_only_with_trigger.py            |  2 +-
 .../example_local_kubernetes_executor.py           |  2 +-
 .../example_dags/example_nested_branch_dag.py      |  2 +-
 .../example_dags/example_outlet_event_extra.py     |  4 +-
 .../example_dags/example_params_trigger_ui.py      |  3 +-
 .../example_dags/example_params_ui_tutorial.py     |  3 +-
 .../example_passing_params_via_test_command.py     |  2 +-
 .../example_dags/example_python_operator.py        |  2 +-
 .../src/airflow/example_dags/example_sensors.py    |  2 +-
 .../airflow/example_dags/example_setup_teardown.py |  2 +-
 .../example_setup_teardown_taskflow.py             |  2 +-
 .../example_dags/example_short_circuit_operator.py |  3 +-
 .../src/airflow/example_dags/example_skip_dag.py   |  4 +-
 .../src/airflow/example_dags/example_task_group.py |  2 +-
 .../example_dags/example_task_group_decorator.py   |  2 +-
 .../example_time_delta_sensor_async.py             |  2 +-
 .../example_dags/example_trigger_controller_dag.py |  2 +-
 .../example_dags/example_trigger_target_dag.py     |  2 +-
 .../example_dags/example_workday_timetable.py      |  2 +-
 .../src/airflow/example_dags/example_xcom.py       |  2 +-
 .../src/airflow/example_dags/example_xcomargs.py   |  2 +-
 airflow-core/src/airflow/example_dags/tutorial.py  |  6 +-
 .../src/airflow/example_dags/tutorial_dag.py       |  6 +-
 airflow-core/src/airflow/models/dag.py             | 74 ++++++++++++++++++++++
 airflow-core/src/airflow/utils/cli.py              |  5 +-
 .../tests/unit/cli/commands/test_dag_command.py    |  2 +-
 .../tests/unit/cli/commands/test_task_command.py   |  4 ++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 22 +++++--
 airflow-core/tests/unit/models/test_dagbag.py      | 10 +--
 airflow-core/tests/unit/models/test_dagcode.py     |  8 ++-
 .../tests/unit/models/test_serialized_dag.py       | 16 +++--
 54 files changed, 177 insertions(+), 88 deletions(-)

diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py 
b/airflow-core/src/airflow/cli/commands/dag_command.py
index 9bff688a41b..44e14352ad1 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -44,6 +44,7 @@ from airflow.dag_processing.bundles.manager import 
DagBundlesManager
 from airflow.exceptions import AirflowException
 from airflow.jobs.job import Job
 from airflow.models import DagBag, DagModel, DagRun, DagTag, TaskInstance
+from airflow.models.dag import DAG
 from airflow.models.errors import ParseImportError
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.sdk.definitions._internal.dag_parsing_context import 
_airflow_parsing_context_manager
@@ -59,7 +60,6 @@ if TYPE_CHECKING:
     from graphviz.dot import Dot
     from sqlalchemy.orm import Session
 
-    from airflow.models.dag import DAG
     from airflow.timetables.base import DataInterval
 log = logging.getLogger(__name__)
 
@@ -656,6 +656,7 @@ def dag_test(args, dag: DAG | None = None, session: Session 
= NEW_SESSION) -> No
             f"Dag {args.dag_id!r} could not be found; either it does not exist 
or it failed to parse."
         )
 
+    dag = DAG.from_sdk_dag(dag)
     dr: DagRun = dag.test(
         logical_date=logical_date,
         run_conf=run_conf,
diff --git a/airflow-core/src/airflow/cli/commands/task_command.py 
b/airflow-core/src/airflow/cli/commands/task_command.py
index 1d64b084664..e6fb5f68ac9 100644
--- a/airflow-core/src/airflow/cli/commands/task_command.py
+++ b/airflow-core/src/airflow/cli/commands/task_command.py
@@ -255,7 +255,7 @@ def task_state(args) -> None:
     >>> airflow tasks state tutorial sleep 2015-01-01
     success
     """
-    dag = get_dag(args.subdir, args.dag_id)
+    dag = get_dag(args.subdir, args.dag_id, from_db=True)
     task = dag.get_task(task_id=args.task_id)
     ti, _ = _get_ti(task, args.map_index, 
logical_date_or_run_id=args.logical_date_or_run_id)
     print(ti.current_state())
@@ -367,6 +367,8 @@ def task_test(args, dag: DAG | None = None, session: 
Session = NEW_SESSION) -> N
 
     dag = dag or get_dag(args.subdir, args.dag_id)
 
+    dag = DAG.from_sdk_dag(dag)
+
     task = dag.get_task(task_id=args.task_id)
     # Add CLI provided task_params to task.params
     if args.task_params:
diff --git a/airflow-core/src/airflow/example_dags/example_asset_alias.py 
b/airflow-core/src/airflow/example_dags/example_asset_alias.py
index 9c98b9756a8..dfbad070bb3 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_alias.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_alias.py
@@ -34,9 +34,8 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow import DAG
 from airflow.decorators import task
-from airflow.sdk.definitions.asset import Asset, AssetAlias
+from airflow.sdk import DAG, Asset, AssetAlias
 
 with DAG(
     dag_id="asset_s3_bucket_producer",
diff --git 
a/airflow-core/src/airflow/example_dags/example_asset_alias_with_no_taskflow.py 
b/airflow-core/src/airflow/example_dags/example_asset_alias_with_no_taskflow.py
index c3d1ac0b8d1..d9de5e06d9f 100644
--- 
a/airflow-core/src/airflow/example_dags/example_asset_alias_with_no_taskflow.py
+++ 
b/airflow-core/src/airflow/example_dags/example_asset_alias_with_no_taskflow.py
@@ -35,9 +35,8 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow import DAG
 from airflow.providers.standard.operators.python import PythonOperator
-from airflow.sdk.definitions.asset import Asset, AssetAlias
+from airflow.sdk import DAG, Asset, AssetAlias
 
 with DAG(
     dag_id="asset_s3_bucket_producer_with_no_taskflow",
diff --git a/airflow-core/src/airflow/example_dags/example_asset_decorator.py 
b/airflow-core/src/airflow/example_dags/example_asset_decorator.py
index b7560f21342..1e3ea4b9fcb 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_decorator.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_decorator.py
@@ -19,8 +19,7 @@ from __future__ import annotations
 import pendulum
 
 from airflow.decorators import dag, task
-from airflow.sdk.definitions.asset import Asset
-from airflow.sdk.definitions.asset.decorators import asset
+from airflow.sdk import Asset, asset
 
 
 @asset(uri="s3://bucket/asset1_producer", schedule=None)
diff --git 
a/airflow-core/src/airflow/example_dags/example_asset_with_watchers.py 
b/airflow-core/src/airflow/example_dags/example_asset_with_watchers.py
index aa430d57b06..8468806b137 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_with_watchers.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_with_watchers.py
@@ -21,9 +21,8 @@ Example DAG for demonstrating the usage of event driven 
scheduling using assets
 from __future__ import annotations
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
 from airflow.providers.standard.triggers.file import FileDeleteTrigger
-from airflow.sdk import Asset, AssetWatcher, chain
+from airflow.sdk import DAG, Asset, AssetWatcher, chain
 
 file_path = "/tmp/test"
 
diff --git a/airflow-core/src/airflow/example_dags/example_assets.py 
b/airflow-core/src/airflow/example_dags/example_assets.py
index b81cdad9453..2bb3cffc527 100644
--- a/airflow-core/src/airflow/example_dags/example_assets.py
+++ b/airflow-core/src/airflow/example_dags/example_assets.py
@@ -54,9 +54,8 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk import DAG, Asset
 from airflow.timetables.assets import AssetOrTimeSchedule
 from airflow.timetables.trigger import CronTriggerTimetable
 
diff --git a/airflow-core/src/airflow/example_dags/example_bash_operator.py 
b/airflow-core/src/airflow/example_dags/example_bash_operator.py
index ab9d09f5cbb..a4fb3161ab6 100644
--- a/airflow-core/src/airflow/example_dags/example_bash_operator.py
+++ b/airflow-core/src/airflow/example_dags/example_bash_operator.py
@@ -23,9 +23,9 @@ import datetime
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
 
 with DAG(
     dag_id="example_bash_operator",
@@ -72,6 +72,3 @@ this_will_skip = BashOperator(
 )
 # [END howto_operator_bash_skip]
 this_will_skip >> run_this_last
-
-if __name__ == "__main__":
-    dag.test()
diff --git 
a/airflow-core/src/airflow/example_dags/example_branch_datetime_operator.py 
b/airflow-core/src/airflow/example_dags/example_branch_datetime_operator.py
index 39589b46f3f..710bdb1de89 100644
--- a/airflow-core/src/airflow/example_dags/example_branch_datetime_operator.py
+++ b/airflow-core/src/airflow/example_dags/example_branch_datetime_operator.py
@@ -24,9 +24,9 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.datetime import 
BranchDateTimeOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
 
 dag1 = DAG(
     dag_id="example_branch_datetime_operator",
diff --git 
a/airflow-core/src/airflow/example_dags/example_branch_day_of_week_operator.py 
b/airflow-core/src/airflow/example_dags/example_branch_day_of_week_operator.py
index 9182f9a7898..43400522468 100644
--- 
a/airflow-core/src/airflow/example_dags/example_branch_day_of_week_operator.py
+++ 
b/airflow-core/src/airflow/example_dags/example_branch_day_of_week_operator.py
@@ -23,10 +23,10 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.weekday import 
BranchDayOfWeekOperator
 from airflow.providers.standard.utils.weekday import WeekDay
+from airflow.sdk import DAG
 
 with DAG(
     dag_id="example_weekday_branch_operator",
diff --git a/airflow-core/src/airflow/example_dags/example_branch_labels.py 
b/airflow-core/src/airflow/example_dags/example_branch_labels.py
index fadb5f23cb5..edc1b059738 100644
--- a/airflow-core/src/airflow/example_dags/example_branch_labels.py
+++ b/airflow-core/src/airflow/example_dags/example_branch_labels.py
@@ -23,8 +23,8 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
 from airflow.utils.edgemodifier import Label
 
 with DAG(
diff --git a/airflow-core/src/airflow/example_dags/example_branch_operator.py 
b/airflow-core/src/airflow/example_dags/example_branch_operator.py
index aed92576e3a..d824de4aa08 100644
--- a/airflow-core/src/airflow/example_dags/example_branch_operator.py
+++ b/airflow-core/src/airflow/example_dags/example_branch_operator.py
@@ -29,7 +29,6 @@ from pathlib import Path
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import (
     BranchExternalPythonOperator,
@@ -39,6 +38,7 @@ from airflow.providers.standard.operators.python import (
     PythonOperator,
     PythonVirtualenvOperator,
 )
+from airflow.sdk import DAG
 from airflow.utils.edgemodifier import Label
 from airflow.utils.trigger_rule import TriggerRule
 
diff --git 
a/airflow-core/src/airflow/example_dags/example_branch_operator_decorator.py 
b/airflow-core/src/airflow/example_dags/example_branch_operator_decorator.py
index f30fd7394b0..5c76080024f 100644
--- a/airflow-core/src/airflow/example_dags/example_branch_operator_decorator.py
+++ b/airflow-core/src/airflow/example_dags/example_branch_operator_decorator.py
@@ -31,8 +31,8 @@ import tempfile
 import pendulum
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
 from airflow.utils.edgemodifier import Label
 from airflow.utils.trigger_rule import TriggerRule
 
diff --git 
a/airflow-core/src/airflow/example_dags/example_branch_python_dop_operator_3.py 
b/airflow-core/src/airflow/example_dags/example_branch_python_dop_operator_3.py
index 8bca82645f8..cc2358da7ce 100644
--- 
a/airflow-core/src/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ 
b/airflow-core/src/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -25,8 +25,8 @@ from __future__ import annotations
 import pendulum
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
 
 
 @task.branch()
diff --git a/airflow-core/src/airflow/example_dags/example_complex.py 
b/airflow-core/src/airflow/example_dags/example_complex.py
index 601e5b4c790..1a414aa4d6c 100644
--- a/airflow-core/src/airflow/example_dags/example_complex.py
+++ b/airflow-core/src/airflow/example_dags/example_complex.py
@@ -23,9 +23,8 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk import chain
+from airflow.sdk import DAG, chain
 
 with DAG(
     dag_id="example_complex",
diff --git a/airflow-core/src/airflow/example_dags/example_custom_weight.py 
b/airflow-core/src/airflow/example_dags/example_custom_weight.py
index 1e1f9c31e94..4bbc261c553 100644
--- a/airflow-core/src/airflow/example_dags/example_custom_weight.py
+++ b/airflow-core/src/airflow/example_dags/example_custom_weight.py
@@ -24,11 +24,11 @@ import datetime
 import pendulum
 
 from airflow.example_dags.plugins.decreasing_priority_weight_strategy import 
DecreasingPriorityStrategy
-
-# [START example_custom_weight_dag]
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
+
+# [START example_custom_weight_dag]
 
 with DAG(
     dag_id="example_custom_weight",
diff --git a/airflow-core/src/airflow/example_dags/example_dag_decorator.py 
b/airflow-core/src/airflow/example_dags/example_dag_decorator.py
index 995bdcb3687..9d1aec9b1e2 100644
--- a/airflow-core/src/airflow/example_dags/example_dag_decorator.py
+++ b/airflow-core/src/airflow/example_dags/example_dag_decorator.py
@@ -27,7 +27,7 @@ from airflow.models.baseoperator import BaseOperator
 from airflow.providers.standard.operators.bash import BashOperator
 
 if TYPE_CHECKING:
-    from airflow.sdk.definitions.context import Context
+    from airflow.sdk import Context
 
 
 class GetRequestOperator(BaseOperator):
diff --git 
a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py 
b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py
index 61c32bf3832..54967e97dbe 100644
--- a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py
+++ b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 from datetime import datetime
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
 
 with DAG(dag_id="example_dynamic_task_mapping", schedule=None, 
start_date=datetime(2022, 3, 4)) as dag:
 
diff --git 
a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
 
b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
index 3d42ac47b56..c762eee74f9 100644
--- 
a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
+++ 
b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 from datetime import datetime
 
 from airflow.models.baseoperator import BaseOperator
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
 
 
 class AddOneOperator(BaseOperator):
diff --git 
a/airflow-core/src/airflow/example_dags/example_external_task_marker_dag.py 
b/airflow-core/src/airflow/example_dags/example_external_task_marker_dag.py
index a4c630a1e9f..b8fad182549 100644
--- a/airflow-core/src/airflow/example_dags/example_external_task_marker_dag.py
+++ b/airflow-core/src/airflow/example_dags/example_external_task_marker_dag.py
@@ -42,9 +42,9 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.sensors.external_task import 
ExternalTaskMarker, ExternalTaskSensor
+from airflow.sdk import DAG
 
 start_date = pendulum.datetime(2021, 1, 1, tz="UTC")
 
diff --git a/airflow-core/src/airflow/example_dags/example_inlet_event_extra.py 
b/airflow-core/src/airflow/example_dags/example_inlet_event_extra.py
index c503e832a83..9919564f3ee 100644
--- a/airflow-core/src/airflow/example_dags/example_inlet_event_extra.py
+++ b/airflow-core/src/airflow/example_dags/example_inlet_event_extra.py
@@ -26,9 +26,8 @@ from __future__ import annotations
 import datetime
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk import DAG, Asset
 
 asset = Asset("s3://output/1.txt")
 
diff --git 
a/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py 
b/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py
index 27ffb4cdad6..c7082acad05 100644
--- a/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py
+++ b/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py
@@ -29,7 +29,7 @@ import pendulum
 from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
 
 log = logging.getLogger(__name__)
 
diff --git a/airflow-core/src/airflow/example_dags/example_latest_only.py 
b/airflow-core/src/airflow/example_dags/example_latest_only.py
index 1bda2751312..8881d7d1f80 100644
--- a/airflow-core/src/airflow/example_dags/example_latest_only.py
+++ b/airflow-core/src/airflow/example_dags/example_latest_only.py
@@ -21,9 +21,9 @@ from __future__ import annotations
 
 import datetime
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
+from airflow.sdk import DAG
 
 with DAG(
     dag_id="latest_only",
diff --git 
a/airflow-core/src/airflow/example_dags/example_latest_only_with_trigger.py 
b/airflow-core/src/airflow/example_dags/example_latest_only_with_trigger.py
index 63dba3a5d39..7e8a2b1b475 100644
--- a/airflow-core/src/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow-core/src/airflow/example_dags/example_latest_only_with_trigger.py
@@ -26,9 +26,9 @@ import datetime
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
+from airflow.sdk import DAG
 from airflow.utils.trigger_rule import TriggerRule
 
 with DAG(
diff --git 
a/airflow-core/src/airflow/example_dags/example_local_kubernetes_executor.py 
b/airflow-core/src/airflow/example_dags/example_local_kubernetes_executor.py
index 07be5ba1d38..8563ae05435 100644
--- a/airflow-core/src/airflow/example_dags/example_local_kubernetes_executor.py
+++ b/airflow-core/src/airflow/example_dags/example_local_kubernetes_executor.py
@@ -27,7 +27,7 @@ from datetime import datetime
 from airflow.configuration import conf
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
 
 log = logging.getLogger(__name__)
 
diff --git a/airflow-core/src/airflow/example_dags/example_nested_branch_dag.py 
b/airflow-core/src/airflow/example_dags/example_nested_branch_dag.py
index d0f3eea02d7..ac2907645c2 100644
--- a/airflow-core/src/airflow/example_dags/example_nested_branch_dag.py
+++ b/airflow-core/src/airflow/example_dags/example_nested_branch_dag.py
@@ -26,8 +26,8 @@ from __future__ import annotations
 import pendulum
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
 from airflow.utils.trigger_rule import TriggerRule
 
 with DAG(
diff --git 
a/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py 
b/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
index 8b08bb5fc94..18fd49263e1 100644
--- a/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
+++ b/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
@@ -26,10 +26,8 @@ from __future__ import annotations
 import datetime
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk.definitions.asset import Asset
-from airflow.sdk.definitions.asset.metadata import Metadata
+from airflow.sdk import DAG, Asset, Metadata
 
 asset = Asset(uri="s3://output/1.txt", name="test-asset")
 
diff --git a/airflow-core/src/airflow/example_dags/example_params_trigger_ui.py 
b/airflow-core/src/airflow/example_dags/example_params_trigger_ui.py
index 9acabdda980..b1f331d3faa 100644
--- a/airflow-core/src/airflow/example_dags/example_params_trigger_ui.py
+++ b/airflow-core/src/airflow/example_dags/example_params_trigger_ui.py
@@ -26,8 +26,7 @@ import datetime
 from pathlib import Path
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
-from airflow.sdk import Param
+from airflow.sdk import DAG, Param
 from airflow.utils.trigger_rule import TriggerRule
 
 # [START params_trigger]
diff --git 
a/airflow-core/src/airflow/example_dags/example_params_ui_tutorial.py 
b/airflow-core/src/airflow/example_dags/example_params_ui_tutorial.py
index 25aac4e1374..6634bb9a6a7 100644
--- a/airflow-core/src/airflow/example_dags/example_params_ui_tutorial.py
+++ b/airflow-core/src/airflow/example_dags/example_params_ui_tutorial.py
@@ -28,8 +28,7 @@ import json
 from pathlib import Path
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
-from airflow.sdk import Param
+from airflow.sdk import DAG, Param
 
 with DAG(
     dag_id=Path(__file__).stem,
diff --git 
a/airflow-core/src/airflow/example_dags/example_passing_params_via_test_command.py
 
b/airflow-core/src/airflow/example_dags/example_passing_params_via_test_command.py
index 7dcd963c096..2b390756b9f 100644
--- 
a/airflow-core/src/airflow/example_dags/example_passing_params_via_test_command.py
+++ 
b/airflow-core/src/airflow/example_dags/example_passing_params_via_test_command.py
@@ -26,8 +26,8 @@ import textwrap
 import pendulum
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import DAG
 
 
 @task(task_id="run_this")
diff --git a/airflow-core/src/airflow/example_dags/example_python_operator.py 
b/airflow-core/src/airflow/example_dags/example_python_operator.py
index 976813d53fd..d48bcae483c 100644
--- a/airflow-core/src/airflow/example_dags/example_python_operator.py
+++ b/airflow-core/src/airflow/example_dags/example_python_operator.py
@@ -29,12 +29,12 @@ from pprint import pprint
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.python import (
     ExternalPythonOperator,
     PythonOperator,
     PythonVirtualenvOperator,
 )
+from airflow.sdk import DAG
 
 log = logging.getLogger(__name__)
 
diff --git a/airflow-core/src/airflow/example_dags/example_sensors.py 
b/airflow-core/src/airflow/example_dags/example_sensors.py
index 7481271edbc..5ec33bad51b 100644
--- a/airflow-core/src/airflow/example_dags/example_sensors.py
+++ b/airflow-core/src/airflow/example_dags/example_sensors.py
@@ -21,7 +21,6 @@ import datetime
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.sensors.bash import BashSensor
 from airflow.providers.standard.sensors.filesystem import FileSensor
@@ -30,6 +29,7 @@ from airflow.providers.standard.sensors.time import 
TimeSensor, TimeSensorAsync
 from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, 
TimeDeltaSensorAsync
 from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
 from airflow.providers.standard.utils.weekday import WeekDay
+from airflow.sdk import DAG
 from airflow.utils.trigger_rule import TriggerRule
 
 
diff --git a/airflow-core/src/airflow/example_dags/example_setup_teardown.py 
b/airflow-core/src/airflow/example_dags/example_setup_teardown.py
index 81994fabc20..a36e79a55e5 100644
--- a/airflow-core/src/airflow/example_dags/example_setup_teardown.py
+++ b/airflow-core/src/airflow/example_dags/example_setup_teardown.py
@@ -21,8 +21,8 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import DAG
 from airflow.utils.task_group import TaskGroup
 
 with DAG(
diff --git 
a/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py 
b/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py
index 6fec9f9a478..6eeee2b4235 100644
--- a/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py
+++ b/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 import pendulum
 
 from airflow.decorators import setup, task, task_group, teardown
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
 
 with DAG(
     dag_id="example_setup_teardown_taskflow",
diff --git 
a/airflow-core/src/airflow/example_dags/example_short_circuit_operator.py 
b/airflow-core/src/airflow/example_dags/example_short_circuit_operator.py
index b20f8acb8d4..494bd55a869 100644
--- a/airflow-core/src/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow-core/src/airflow/example_dags/example_short_circuit_operator.py
@@ -21,10 +21,9 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import ShortCircuitOperator
-from airflow.sdk import chain
+from airflow.sdk import DAG, chain
 from airflow.utils.trigger_rule import TriggerRule
 
 with DAG(
diff --git a/airflow-core/src/airflow/example_dags/example_skip_dag.py 
b/airflow-core/src/airflow/example_dags/example_skip_dag.py
index 99f439cc3f7..7575494d0d9 100644
--- a/airflow-core/src/airflow/example_dags/example_skip_dag.py
+++ b/airflow-core/src/airflow/example_dags/example_skip_dag.py
@@ -26,12 +26,12 @@ import pendulum
 
 from airflow.exceptions import AirflowSkipException
 from airflow.models.baseoperator import BaseOperator
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
 from airflow.utils.trigger_rule import TriggerRule
 
 if TYPE_CHECKING:
-    from airflow.sdk.definitions.context import Context
+    from airflow.sdk import Context
 
 
 # Create some placeholder operators
diff --git a/airflow-core/src/airflow/example_dags/example_task_group.py 
b/airflow-core/src/airflow/example_dags/example_task_group.py
index 2f1931808b5..e83ac2e9989 100644
--- a/airflow-core/src/airflow/example_dags/example_task_group.py
+++ b/airflow-core/src/airflow/example_dags/example_task_group.py
@@ -21,9 +21,9 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
 from airflow.utils.task_group import TaskGroup
 
 # [START howto_task_group]
diff --git 
a/airflow-core/src/airflow/example_dags/example_task_group_decorator.py 
b/airflow-core/src/airflow/example_dags/example_task_group_decorator.py
index ce4a0e33b8c..29b270a763a 100644
--- a/airflow-core/src/airflow/example_dags/example_task_group_decorator.py
+++ b/airflow-core/src/airflow/example_dags/example_task_group_decorator.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 import pendulum
 
 from airflow.decorators import task, task_group
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
 
 
 # [START howto_task_group_decorator]
diff --git 
a/airflow-core/src/airflow/example_dags/example_time_delta_sensor_async.py 
b/airflow-core/src/airflow/example_dags/example_time_delta_sensor_async.py
index 140dd866b9e..7b847e6871e 100644
--- a/airflow-core/src/airflow/example_dags/example_time_delta_sensor_async.py
+++ b/airflow-core/src/airflow/example_dags/example_time_delta_sensor_async.py
@@ -26,9 +26,9 @@ import datetime
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.sensors.time_delta import TimeDeltaSensorAsync
+from airflow.sdk import DAG
 
 with DAG(
     dag_id="example_time_delta_sensor_async",
diff --git 
a/airflow-core/src/airflow/example_dags/example_trigger_controller_dag.py 
b/airflow-core/src/airflow/example_dags/example_trigger_controller_dag.py
index e546bd0a7e8..6adac540957 100644
--- a/airflow-core/src/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow-core/src/airflow/example_dags/example_trigger_controller_dag.py
@@ -25,8 +25,8 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunOperator
+from airflow.sdk import DAG
 
 with DAG(
     dag_id="example_trigger_controller_dag",
diff --git 
a/airflow-core/src/airflow/example_dags/example_trigger_target_dag.py 
b/airflow-core/src/airflow/example_dags/example_trigger_target_dag.py
index 3af68a25607..6eae90499d5 100644
--- a/airflow-core/src/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow-core/src/airflow/example_dags/example_trigger_target_dag.py
@@ -26,8 +26,8 @@ from __future__ import annotations
 import pendulum
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import DAG
 
 
 @task(task_id="run_this")
diff --git a/airflow-core/src/airflow/example_dags/example_workday_timetable.py 
b/airflow-core/src/airflow/example_dags/example_workday_timetable.py
index db569c8e627..c5e942ec654 100644
--- a/airflow-core/src/airflow/example_dags/example_workday_timetable.py
+++ b/airflow-core/src/airflow/example_dags/example_workday_timetable.py
@@ -19,8 +19,8 @@ from __future__ import annotations
 import pendulum
 
 from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
 
 with DAG(
     dag_id="example_workday_timetable",
diff --git a/airflow-core/src/airflow/example_dags/example_xcom.py 
b/airflow-core/src/airflow/example_dags/example_xcom.py
index 2563eda77ee..00c88073891 100644
--- a/airflow-core/src/airflow/example_dags/example_xcom.py
+++ b/airflow-core/src/airflow/example_dags/example_xcom.py
@@ -22,9 +22,9 @@ from __future__ import annotations
 import pendulum
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
 from airflow.models.xcom_arg import XComArg
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import DAG
 
 value_1 = [1, 2, 3]
 value_2 = {"a": "b"}
diff --git a/airflow-core/src/airflow/example_dags/example_xcomargs.py 
b/airflow-core/src/airflow/example_dags/example_xcomargs.py
index a7103dc1911..166a65e4c96 100644
--- a/airflow-core/src/airflow/example_dags/example_xcomargs.py
+++ b/airflow-core/src/airflow/example_dags/example_xcomargs.py
@@ -24,8 +24,8 @@ import logging
 import pendulum
 
 from airflow.decorators import task
-from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import DAG
 
 log = logging.getLogger(__name__)
 
diff --git a/airflow-core/src/airflow/example_dags/tutorial.py 
b/airflow-core/src/airflow/example_dags/tutorial.py
index 8f371ee2427..7e12eb45469 100644
--- a/airflow-core/src/airflow/example_dags/tutorial.py
+++ b/airflow-core/src/airflow/example_dags/tutorial.py
@@ -28,12 +28,12 @@ from __future__ import annotations
 import textwrap
 from datetime import datetime, timedelta
 
-# The DAG object; we'll need this to instantiate a DAG
-from airflow.models.dag import DAG
-
 # Operators; we need this to operate!
 from airflow.providers.standard.operators.bash import BashOperator
 
+# The DAG object; we'll need this to instantiate a DAG
+from airflow.sdk import DAG
+
 # [END import_module]
 
 
diff --git a/airflow-core/src/airflow/example_dags/tutorial_dag.py 
b/airflow-core/src/airflow/example_dags/tutorial_dag.py
index 0e4f5086efc..0f891e5dd78 100644
--- a/airflow-core/src/airflow/example_dags/tutorial_dag.py
+++ b/airflow-core/src/airflow/example_dags/tutorial_dag.py
@@ -29,12 +29,12 @@ import textwrap
 
 import pendulum
 
-# The DAG object; we'll need this to instantiate a DAG
-from airflow.models.dag import DAG
-
 # Operators; we need this to operate!
 from airflow.providers.standard.operators.python import PythonOperator
 
+# The DAG object; we'll need this to instantiate a DAG
+from airflow.sdk import DAG
+
 # [END import_module]
 
 # [START instantiate_dag]
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 8ca0f338330..5e6a5013c5e 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -90,6 +90,7 @@ from airflow.models.taskinstance import (
     clear_task_instances,
 )
 from airflow.models.tasklog import LogTemplate
+from airflow.sdk import TaskGroup
 from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey, 
BaseAsset
 from airflow.sdk.definitions.dag import DAG as TaskSDKDag, dag as 
task_sdk_dag_decorator
 from airflow.secrets.local_filesystem import LocalFilesystemBackend
@@ -1997,6 +1998,79 @@ class DAG(TaskSDKDag, LoggingMixin):
                     if isinstance(port, of_type):
                         yield task.task_id, port
 
+    @classmethod
+    def from_sdk_dag(cls, dag: TaskSDKDag) -> DAG:
+        """Create a new (Scheduler) DAG object from a TaskSDKDag."""
+        if not isinstance(dag, TaskSDKDag):
+            return dag
+
+        fields = attrs.fields(dag.__class__)
+
+        kwargs = {}
+        for field in fields:
+            # Skip fields that are:
+            # 1. Initialized after creation (init=False)
+            # 2. Internal state fields that shouldn't be copied
+            if not field.init or field.name in ["edge_info"]:
+                continue
+
+            value = getattr(dag, field.name)
+
+            # Handle special cases where values need conversion
+            if field.name == "max_consecutive_failed_dag_runs":
+                # SchedulerDAG requires this to be >= 0, while TaskSDKDag 
allows -1
+                if value == -1:
+                    # If it is -1, we get the default value from the DAG
+                    continue
+
+            kwargs[field.name] = value
+
+        new_dag = cls(**kwargs)
+
+        task_group_map = {}
+
+        def create_task_groups(task_group, parent_group=None):
+            new_task_group = copy.deepcopy(task_group)
+
+            new_task_group.dag = new_dag
+            new_task_group.parent_group = parent_group
+            new_task_group.children = {}
+
+            task_group_map[task_group.group_id] = new_task_group
+
+            for child in task_group.children.values():
+                if isinstance(child, TaskGroup):
+                    create_task_groups(child, new_task_group)
+
+        create_task_groups(dag.task_group)
+
+        def create_tasks(task):
+            if isinstance(task, TaskGroup):
+                return task_group_map[task.group_id]
+
+            new_task = copy.deepcopy(task)
+
+            # Only overwrite the specific attributes we want to change
+            new_task.task_id = task.task_id
+            new_task.dag = None  # Don't set dag yet
+            new_task.task_group = task_group_map.get(task.task_group.group_id) 
if task.task_group else None
+
+            return new_task
+
+        # Process all tasks in the original DAG
+        for task in dag.tasks:
+            new_task = create_tasks(task)
+            if not isinstance(new_task, TaskGroup):
+                # Add the task to the DAG
+                new_dag.task_dict[new_task.task_id] = new_task
+                if new_task.task_group:
+                    new_task.task_group.children[new_task.task_id] = new_task
+                new_task.dag = new_dag
+
+        new_dag.edge_info = dag.edge_info.copy()
+
+        return new_dag
+
 
 class DagTag(Base):
     """A tag name per dag, to allow quick filtering in the DAG view."""
diff --git a/airflow-core/src/airflow/utils/cli.py 
b/airflow-core/src/airflow/utils/cli.py
index 0114d65e04f..e2137237f69 100644
--- a/airflow-core/src/airflow/utils/cli.py
+++ b/airflow-core/src/airflow/utils/cli.py
@@ -266,7 +266,8 @@ def get_dag(subdir: str | None, dag_id: str, from_db: bool 
= False) -> DAG:
     find the correct path (assuming it's a file) and failing that, use the 
configured
     dags folder.
     """
-    from airflow.models import DagBag
+    from airflow.models.dag import DAG
+    from airflow.models.dagbag import DagBag
 
     if from_db:
         dagbag = DagBag(read_dags_from_db=True)
@@ -275,6 +276,8 @@ def get_dag(subdir: str | None, dag_id: str, from_db: bool 
= False) -> DAG:
         first_path = process_subdir(subdir)
         dagbag = DagBag(first_path)
         dag = dagbag.dags.get(dag_id)  # avoids db calls made in get_dag
+        # Create a SchedulerDAG since some of the CLI commands rely on DB 
access
+        dag = DAG.from_sdk_dag(dag)
     if not dag:
         if from_db:
             raise AirflowException(f"Dag {dag_id!r} could not be found in 
DagBag read from database.")
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py 
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 77944cbc2a2..55ead5897d4 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -310,7 +310,7 @@ class TestCliDags:
     @conf_vars({("core", "load_examples"): "true"})
     def test_dagbag_dag_col(self):
         valid_cols = [c for c in dag_command.DAGSchema().fields]
-        dagbag = DagBag(include_examples=True)
+        dagbag = DagBag(include_examples=True, read_dags_from_db=True)
         dag_details = 
dag_command._get_dagbag_dag_details(dagbag.get_dag("tutorial_dag"))
         assert list(dag_details.keys()) == valid_cols
 
diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py 
b/airflow-core/tests/unit/cli/commands/test_task_command.py
index 57534bd978b..b3dd69912ce 100644
--- a/airflow-core/tests/unit/cli/commands/test_task_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_task_command.py
@@ -41,6 +41,7 @@ from airflow.configuration import conf
 from airflow.exceptions import DagRunNotFound
 from airflow.models import DagBag, DagRun, TaskInstance
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State, TaskInstanceState
@@ -339,6 +340,9 @@ class TestCliTasks:
     def test_task_states_for_dag_run(self):
         dag2 = DagBag().dags["example_python_operator"]
         task2 = dag2.get_task(task_id="print_the_context")
+
+        dag2 = SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(dag2))
+
         default_date2 = timezone.datetime(2016, 1, 9)
         dag2.clear()
         data_interval = 
dag2.timetable.infer_manual_data_interval(run_after=default_date2)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index b4970176248..c703037b0f0 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -64,7 +64,7 @@ from airflow.models.taskinstance import TaskInstance
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions.asset import Asset
-from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
 from airflow.timetables.base import DataInterval
 from airflow.traces.tracer import Trace
 from airflow.utils import timezone
@@ -5805,12 +5805,17 @@ class TestSchedulerJob:
         dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, 
"example_branch_operator.py")
         dagbag = DagBag(dagfile)
         dag = dagbag.get_dag("example_branch_operator")
-        DAG.bulk_write_to_db("testing", None, [dag])
+        dm = LazyDeserializedDAG(data=SerializedDAG.to_dict(dag))
+        scheduler_dag = DAG.from_sdk_dag(dag)
+
+        DAG.bulk_write_to_db("testing", None, [dm])
         SerializedDagModel.write_dag(dag=dag, bundle_name="testing")
         dag_v = DagVersion.get_latest_version(dag.dag_id)
-        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
+
+        data_interval = 
scheduler_dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
+
         dag_run = create_dagrun(
-            dag,
+            scheduler_dag,
             logical_date=DEFAULT_DATE,
             run_type=DagRunType.SCHEDULED,
             data_interval=data_interval,
@@ -5866,13 +5871,16 @@ class TestSchedulerJob:
         dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, 
"example_branch_operator.py")
         dagbag = DagBag(dagfile)
         dag = dagbag.get_dag("example_branch_operator")
-        DAG.bulk_write_to_db("testing", None, [dag])
+        dm = LazyDeserializedDAG(data=SerializedDAG.to_dict(dag))
+        scheduler_dag = DAG.from_sdk_dag(dag)
+
+        DAG.bulk_write_to_db("testing", None, [dm])
         SerializedDagModel.write_dag(dag, bundle_name="testing")
         session.query(Job).delete()
 
-        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
+        data_interval = 
scheduler_dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         dag_run = create_dagrun(
-            dag,
+            scheduler_dag,
             logical_date=DEFAULT_DATE,
             run_type=DagRunType.SCHEDULED,
             data_interval=data_interval,
diff --git a/airflow-core/tests/unit/models/test_dagbag.py 
b/airflow-core/tests/unit/models/test_dagbag.py
index 6d659c99c4c..657cc0e3659 100644
--- a/airflow-core/tests/unit/models/test_dagbag.py
+++ b/airflow-core/tests/unit/models/test_dagbag.py
@@ -681,7 +681,7 @@ with airflow.DAG(
         """
         with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 0)), 
tick=False):
             example_bash_op_dag = 
DagBag(include_examples=True).dags.get("example_bash_operator")
-            example_bash_op_dag.sync_to_db()
+            DAG.from_sdk_dag(example_bash_op_dag).sync_to_db()
             SerializedDagModel.write_dag(dag=example_bash_op_dag, 
bundle_name="testing")
 
             dag_bag = DagBag(read_dags_from_db=True)
@@ -699,7 +699,7 @@ with airflow.DAG(
         # Make a change in the DAG and write Serialized DAG to the DB
         with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 6)), 
tick=False):
             example_bash_op_dag.tags.add("new_tag")
-            example_bash_op_dag.sync_to_db()
+            DAG.from_sdk_dag(example_bash_op_dag).sync_to_db()
             SerializedDagModel.write_dag(dag=example_bash_op_dag, 
bundle_name="testing")
 
         # Since min_serialized_dag_fetch_interval is passed verify that 
calling 'dag_bag.get_dag'
@@ -723,7 +723,7 @@ with airflow.DAG(
         # serialize the initial version of the DAG
         with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 0)), 
tick=False):
             example_bash_op_dag = 
DagBag(include_examples=True).dags.get("example_bash_operator")
-            example_bash_op_dag.sync_to_db()
+            DAG.from_sdk_dag(example_bash_op_dag).sync_to_db()
             SerializedDagModel.write_dag(dag=example_bash_op_dag, 
bundle_name="testing")
 
         # deserialize the DAG
@@ -749,7 +749,7 @@ with airflow.DAG(
         # long before the transaction is committed
         with time_machine.travel((tz.datetime(2020, 1, 5, 1, 0, 0)), 
tick=False):
             example_bash_op_dag.tags.add("new_tag")
-            example_bash_op_dag.sync_to_db()
+            DAG.from_sdk_dag(example_bash_op_dag).sync_to_db()
             SerializedDagModel.write_dag(dag=example_bash_op_dag, 
bundle_name="testing")
 
         # Since min_serialized_dag_fetch_interval is passed verify that 
calling 'dag_bag.get_dag'
@@ -769,7 +769,7 @@ with airflow.DAG(
 
         example_dags = dagbag.dags
         for dag in example_dags.values():
-            dag.sync_to_db()
+            DAG.from_sdk_dag(dag).sync_to_db()
             SerializedDagModel.write_dag(dag, bundle_name="dag_maker")
 
         new_dagbag = DagBag(read_dags_from_db=True)
diff --git a/airflow-core/tests/unit/models/test_dagcode.py 
b/airflow-core/tests/unit/models/test_dagcode.py
index f604037ffe7..97bc070a6d4 100644
--- a/airflow-core/tests/unit/models/test_dagcode.py
+++ b/airflow-core/tests/unit/models/test_dagcode.py
@@ -30,6 +30,7 @@ from airflow.models.dag import DAG
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagcode import DagCode
 from airflow.models.serialized_dag import SerializedDagModel as SDM
+from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
 
 # To move it to a shared module.
 from airflow.utils.file import open_maybe_zipped
@@ -54,7 +55,8 @@ def make_example_dags(module):
             session.add(testing)
 
     dagbag = DagBag(module.__path__[0])
-    DAG.bulk_write_to_db("testing", None, dagbag.dags.values())
+    dags = [LazyDeserializedDAG(data=SerializedDAG.to_dict(dag)) for dag in 
dagbag.dags.values()]
+    DAG.bulk_write_to_db("testing", None, dags)
     return dagbag.dags
 
 
@@ -142,7 +144,9 @@ class TestDagCode:
         """Test new DagCode is created in DB when ser dag is changed"""
         example_dag = 
make_example_dags(example_dags_module).get("example_bash_operator")
         SDM.write_dag(example_dag, bundle_name="testing")
-        example_dag.create_dagrun(
+
+        dag = DAG.from_sdk_dag(example_dag)
+        dag.create_dagrun(
             run_id="test1",
             run_after=pendulum.datetime(2025, 1, 1, tz="UTC"),
             state=DagRunState.QUEUED,
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index f20e3cc1375..91910383b4a 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -36,7 +36,7 @@ from airflow.providers.standard.operators.bash import 
BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import PythonOperator
 from airflow.sdk import DAG, Asset
-from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
 from airflow.settings import json
 from airflow.utils.hashlib_wrapper import md5
 from airflow.utils.session import create_session
@@ -60,7 +60,9 @@ def make_example_dags(module):
             session.add(testing)
 
     dagbag = DagBag(module.__path__[0])
-    SchedulerDAG.bulk_write_to_db("testing", None, dagbag.dags.values())
+
+    dags = [LazyDeserializedDAG(data=SerializedDAG.to_dict(dag)) for dag in 
dagbag.dags.values()]
+    SchedulerDAG.bulk_write_to_db("testing", None, dags)
     return dagbag.dags
 
 
@@ -143,7 +145,10 @@ class TestSerializedDagModel:
         example_bash_op_dag = example_dags.get("example_bash_operator")
         dag_updated = SDM.write_dag(dag=example_bash_op_dag, 
bundle_name="testing")
         assert dag_updated is True
-        example_bash_op_dag.create_dagrun(
+
+        # SchedulerDAG is created to create dagrun
+        dag = SchedulerDAG.from_sdk_dag(dag=example_bash_op_dag)
+        dag.create_dagrun(
             run_id="test1",
             run_after=pendulum.datetime(2025, 1, 1, tz="UTC"),
             state=DagRunState.QUEUED,
@@ -191,7 +196,10 @@ class TestSerializedDagModel:
         assert len(example_dags) == len(serialized_dags)
 
         dag = example_dags.get("example_bash_operator")
-        dag.create_dagrun(
+
+        # DAGs are serialized and deserialized to access create_dagrun object
+        sdag = 
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(dag=dag))
+        sdag.create_dagrun(
             run_id="test1",
             run_after=pendulum.datetime(2025, 1, 1, tz="UTC"),
             state=DagRunState.QUEUED,

Reply via email to