This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 5202e770cf3 [v3-1-test] Ensure dag.test uses serialized dag for
testing (#56660) (#56820)
5202e770cf3 is described below
commit 5202e770cf38ddc6f75325a54d3a4b37386aa6b0
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sat Oct 18 22:25:25 2025 +0100
[v3-1-test] Ensure dag.test uses serialized dag for testing (#56660)
(#56820)
* Ensure dag.test uses serialized dag for testing (#56660)
* Ensure dag.test uses serialized dag for testing
While `dag test` command uses serialized dag, dag.test was using
in-memory serialized dag making direct usage of dag.test method
resulting in error.
This PR fixes this and ensures dag.test parses dag if the dag is not
parsed
* fixup! Ensure dag.test uses serialized dag for testing
* Update query to v2 and do backcompat for 3.1
* Remove backcompat
* Return previous behaviour with SerializedDag
(cherry picked from commit 6d977a925e83b5b55003efbcde85cce9ff836ba8)
* fixup! Ensure dag.test uses serialized dag for testing (#56660)
---
airflow-core/tests/unit/models/test_dag.py | 25 ++++++++++++++++++++++
task-sdk/src/airflow/sdk/definitions/dag.py | 32 ++++++++++++++++++++++++++---
2 files changed, 54 insertions(+), 3 deletions(-)
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 442372a57da..febed30fbd2 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -53,6 +53,7 @@ from airflow.models.dag import (
get_asset_triggered_next_run_info,
get_next_data_interval,
)
+from airflow.models.dagbag import DBDagBag
from airflow.models.dagbundle import DagBundleModel
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
@@ -80,6 +81,7 @@ from airflow.utils.state import DagRunState, State,
TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.asserts import assert_queries_count
+from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.dag import create_scheduler_dag, sync_dag_to_db
from tests_common.test_utils.db import (
clear_db_assets,
@@ -179,6 +181,29 @@ class TestDag:
clear_db_dags()
clear_db_assets()
+ @conf_vars({("core", "load_examples"): "false"})
+ def test_dag_test_auto_parses_when_not_serialized(self, test_dags_bundle,
session):
+ """
+ DAG.test() should auto-parse and sync the DAG if it's not serialized
yet.
+ """
+
+ dag_id = "test_example_bash_operator"
+
+ dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER),
include_examples=False)
+ dag = dagbag.dags.get(dag_id)
+
+ # Ensure not serialized yet
+ assert DBDagBag().get_latest_version_of_dag(dag_id, session=session)
is None
+ assert session.scalar(select(DagRun).where(DagRun.dag_id == dag_id))
is None
+
+ dr = dag.test()
+ assert dr is not None
+
+ # Serialized DAG should now exist and DagRun would be created
+ ser = DBDagBag().get_latest_version_of_dag(dag_id, session=session)
+ assert ser is not None
+ assert session.scalar(select(DagRun).where(DagRun.dag_id == dag_id))
is not None
+
def teardown_method(self) -> None:
clear_db_runs()
clear_db_dags()
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index 5c4b139eb75..58df4b86063 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -1187,7 +1187,33 @@ class DAG:
data_interval = (
self.timetable.infer_manual_data_interval(run_after=logical_date) if
logical_date else None
)
- scheduler_dag =
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self)) # type:
ignore[arg-type]
+ from airflow.models.dag_version import DagVersion
+
+ version = DagVersion.get_version(self.dag_id)
+ if not version:
+ from airflow.dag_processing.bundles.manager import
DagBundlesManager
+ from airflow.models.dagbag import DagBag, sync_bag_to_db
+ from airflow.sdk.definitions._internal.dag_parsing_context
import (
+ _airflow_parsing_context_manager,
+ )
+
+ manager = DagBundlesManager()
+ manager.sync_bundles_to_db(session=session)
+ session.commit()
+ # sync all bundles? or use the dags-folder bundle?
+ # What if the test dag is in a different bundle?
+ for bundle in manager.get_all_dag_bundles():
+ if not bundle.is_initialized:
+ bundle.initialize()
+ with _airflow_parsing_context_manager(dag_id=self.dag_id):
+ dagbag = DagBag(
+ dag_folder=bundle.path, bundle_path=bundle.path,
include_examples=False
+ )
+ sync_bag_to_db(dagbag, bundle.name, bundle.version)
+ version = DagVersion.get_version(self.dag_id)
+ if version:
+ break
+ scheduler_dag =
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self))
# Preserve callback functions from original Dag since they're lost
during serialization
# and yes it is a hack for now! It is a tradeoff for code
simplicity.
# Without it, we need "Scheduler Dag" (Serialized dag) for the
scheduler bits
@@ -1196,8 +1222,8 @@ class DAG:
# Scheduler DAG shouldn't have these attributes, but assigning them
# here is an easy hack to get this test() thing working.
- scheduler_dag.on_success_callback = self.on_success_callback #
type: ignore[attr-defined]
- scheduler_dag.on_failure_callback = self.on_failure_callback #
type: ignore[attr-defined]
+ scheduler_dag.on_success_callback = self.on_success_callback #
type: ignore[attr-defined, union-attr]
+ scheduler_dag.on_failure_callback = self.on_failure_callback #
type: ignore[attr-defined, union-attr]
dr: DagRun = get_or_create_dagrun(
dag=scheduler_dag,