kaxil commented on code in PR #56660:
URL: https://github.com/apache/airflow/pull/56660#discussion_r2439802353


##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -179,6 +180,29 @@ def setup_method(self) -> None:
         clear_db_dags()
         clear_db_assets()
 
+    @conf_vars({("core", "load_examples"): "false"})
+    def test_dag_test_auto_parses_when_not_serialized(self, 
testing_dag_bundle, session):
+        """
+        DAG.test() should auto-parse and sync the DAG if it's not serialized 
yet.
+        """
+        from airflow.models.dagbag import DBDagBag

Review Comment:
   Can go at the top of the file



##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -179,6 +180,29 @@ def setup_method(self) -> None:
         clear_db_dags()
         clear_db_assets()
 
+    @conf_vars({("core", "load_examples"): "false"})
+    def test_dag_test_auto_parses_when_not_serialized(self, 
testing_dag_bundle, session):
+        """
+        DAG.test() should auto-parse and sync the DAG if it's not serialized 
yet.
+        """
+        from airflow.models.dagbag import DBDagBag
+
+        dag_id = "test_example_bash_operator"
+
+        # Ensure not serialized yet
+        assert DBDagBag().get_latest_version_of_dag(dag_id, session=session) 
is None
+        assert session.query(DagRun).filter(DagRun.dag_id == dag_id).scalar() 
is None

Review Comment:
   Isn't this old sqla syntax?



##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -179,6 +180,29 @@ def setup_method(self) -> None:
         clear_db_dags()
         clear_db_assets()
 
+    @conf_vars({("core", "load_examples"): "false"})
+    def test_dag_test_auto_parses_when_not_serialized(self, 
testing_dag_bundle, session):
+        """
+        DAG.test() should auto-parse and sync the DAG if it's not serialized 
yet.
+        """
+        from airflow.models.dagbag import DBDagBag
+
+        dag_id = "test_example_bash_operator"
+
+        # Ensure not serialized yet
+        assert DBDagBag().get_latest_version_of_dag(dag_id, session=session) 
is None
+        assert session.query(DagRun).filter(DagRun.dag_id == dag_id).scalar() 
is None
+
+        dag = DAG(dag_id=dag_id, schedule=None)

Review Comment:
   ```suggestion
           dag = DAG(dag_id=dag_id)
   ```



##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -179,6 +180,29 @@ def setup_method(self) -> None:
         clear_db_dags()
         clear_db_assets()
 
+    @conf_vars({("core", "load_examples"): "false"})
+    def test_dag_test_auto_parses_when_not_serialized(self, 
testing_dag_bundle, session):
+        """
+        DAG.test() should auto-parse and sync the DAG if it's not serialized 
yet.
+        """
+        from airflow.models.dagbag import DBDagBag
+
+        dag_id = "test_example_bash_operator"
+
+        # Ensure not serialized yet
+        assert DBDagBag().get_latest_version_of_dag(dag_id, session=session) 
is None
+        assert session.query(DagRun).filter(DagRun.dag_id == dag_id).scalar() 
is None
+
+        dag = DAG(dag_id=dag_id, schedule=None)

Review Comment:
   also worth adding a single hello world style task



##########
task-sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -1197,17 +1197,37 @@ def test(
             data_interval = (
                 
self.timetable.infer_manual_data_interval(run_after=logical_date) if 
logical_date else None
             )
-            scheduler_dag = 
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self))  # type: 
ignore[arg-type]
-            # Preserve callback functions from original Dag since they're lost 
during serialization
-            # and yes it is a hack for now! It is a tradeoff for code 
simplicity.
-            # Without it, we need "Scheduler Dag" (Serialized dag) for the 
scheduler bits
-            #   -- dep check, scheduling tis
-            # and need real dag to get and run callbacks without having to 
load the dag model
+            from airflow.models.dagbag import DBDagBag
+
+            scheduler_dag = 
DBDagBag().get_latest_version_of_dag(dag_id=self.dag_id, session=session)
+            if not scheduler_dag:
+                from airflow.dag_processing.bundles.manager import 
DagBundlesManager
+                from airflow.dag_processing.dagbag import DagBag, 
sync_bag_to_db

Review Comment:
   This (`airflow.dag_processing.dagbag`) won't be present if we want to 
cherry-pick it in 3.1.1 -- it is `airflow.models.dagbag` in 3.1.1 -- maybe add 
try..except



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to