This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 5202e770cf3 [v3-1-test] Ensure dag.test uses serialized dag for 
testing (#56660) (#56820)
5202e770cf3 is described below

commit 5202e770cf38ddc6f75325a54d3a4b37386aa6b0
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sat Oct 18 22:25:25 2025 +0100

    [v3-1-test] Ensure dag.test uses serialized dag for testing (#56660) 
(#56820)
    
    * Ensure dag.test uses serialized dag for testing (#56660)
    
    * Ensure dag.test uses serialized dag for testing
    
    While `dag test` command uses serialized dag, dag.test was using
    in-memory serialized dag making direct usage of dag.test method
    resulting in error.
    
    This PR fixes this and ensures dag.test parses dag if the dag is not
    parsed
    
    * fixup! Ensure dag.test uses serialized dag for testing
    
    * Update query to v2 and do backcompat for 3.1
    
    * Remove backcompat
    
    * Return previous behaviour with SerializedDag
    
    (cherry picked from commit 6d977a925e83b5b55003efbcde85cce9ff836ba8)
    
    * fixup! Ensure dag.test uses serialized dag for testing (#56660)
---
 airflow-core/tests/unit/models/test_dag.py  | 25 ++++++++++++++++++++++
 task-sdk/src/airflow/sdk/definitions/dag.py | 32 ++++++++++++++++++++++++++---
 2 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 442372a57da..febed30fbd2 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -53,6 +53,7 @@ from airflow.models.dag import (
     get_asset_triggered_next_run_info,
     get_next_data_interval,
 )
+from airflow.models.dagbag import DBDagBag
 from airflow.models.dagbundle import DagBundleModel
 from airflow.models.dagrun import DagRun
 from airflow.models.serialized_dag import SerializedDagModel
@@ -80,6 +81,7 @@ from airflow.utils.state import DagRunState, State, 
TaskInstanceState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 from tests_common.test_utils.asserts import assert_queries_count
+from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.dag import create_scheduler_dag, sync_dag_to_db
 from tests_common.test_utils.db import (
     clear_db_assets,
@@ -179,6 +181,29 @@ class TestDag:
         clear_db_dags()
         clear_db_assets()
 
+    @conf_vars({("core", "load_examples"): "false"})
+    def test_dag_test_auto_parses_when_not_serialized(self, test_dags_bundle, 
session):
+        """
+        DAG.test() should auto-parse and sync the DAG if it's not serialized 
yet.
+        """
+
+        dag_id = "test_example_bash_operator"
+
+        dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER), 
include_examples=False)
+        dag = dagbag.dags.get(dag_id)
+
+        # Ensure not serialized yet
+        assert DBDagBag().get_latest_version_of_dag(dag_id, session=session) 
is None
+        assert session.scalar(select(DagRun).where(DagRun.dag_id == dag_id)) 
is None
+
+        dr = dag.test()
+        assert dr is not None
+
+        # Serialized DAG should now exist and DagRun would be created
+        ser = DBDagBag().get_latest_version_of_dag(dag_id, session=session)
+        assert ser is not None
+        assert session.scalar(select(DagRun).where(DagRun.dag_id == dag_id)) 
is not None
+
     def teardown_method(self) -> None:
         clear_db_runs()
         clear_db_dags()
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py 
b/task-sdk/src/airflow/sdk/definitions/dag.py
index 5c4b139eb75..58df4b86063 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -1187,7 +1187,33 @@ class DAG:
             data_interval = (
                 
self.timetable.infer_manual_data_interval(run_after=logical_date) if 
logical_date else None
             )
-            scheduler_dag = 
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self))  # type: 
ignore[arg-type]
+            from airflow.models.dag_version import DagVersion
+
+            version = DagVersion.get_version(self.dag_id)
+            if not version:
+                from airflow.dag_processing.bundles.manager import 
DagBundlesManager
+                from airflow.models.dagbag import DagBag, sync_bag_to_db
+                from airflow.sdk.definitions._internal.dag_parsing_context 
import (
+                    _airflow_parsing_context_manager,
+                )
+
+                manager = DagBundlesManager()
+                manager.sync_bundles_to_db(session=session)
+                session.commit()
+                # sync all bundles? or use the dags-folder bundle?
+                # What if the test dag is in a different bundle?
+                for bundle in manager.get_all_dag_bundles():
+                    if not bundle.is_initialized:
+                        bundle.initialize()
+                    with _airflow_parsing_context_manager(dag_id=self.dag_id):
+                        dagbag = DagBag(
+                            dag_folder=bundle.path, bundle_path=bundle.path, 
include_examples=False
+                        )
+                        sync_bag_to_db(dagbag, bundle.name, bundle.version)
+                    version = DagVersion.get_version(self.dag_id)
+                    if version:
+                        break
+            scheduler_dag = 
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self))
             # Preserve callback functions from original Dag since they're lost 
during serialization
             # and yes it is a hack for now! It is a tradeoff for code 
simplicity.
             # Without it, we need "Scheduler Dag" (Serialized dag) for the 
scheduler bits
@@ -1196,8 +1222,8 @@ class DAG:
 
             # Scheduler DAG shouldn't have these attributes, but assigning them
             # here is an easy hack to get this test() thing working.
-            scheduler_dag.on_success_callback = self.on_success_callback  # 
type: ignore[attr-defined]
-            scheduler_dag.on_failure_callback = self.on_failure_callback  # 
type: ignore[attr-defined]
+            scheduler_dag.on_success_callback = self.on_success_callback  # 
type: ignore[attr-defined, union-attr]
+            scheduler_dag.on_failure_callback = self.on_failure_callback  # 
type: ignore[attr-defined, union-attr]
 
             dr: DagRun = get_or_create_dagrun(
                 dag=scheduler_dag,

Reply via email to