ephraimbuddy commented on code in PR #56660:
URL: https://github.com/apache/airflow/pull/56660#discussion_r2440021291


##########
task-sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -1197,17 +1197,39 @@ def test(
             data_interval = (
                 
self.timetable.infer_manual_data_interval(run_after=logical_date) if 
logical_date else None
             )
-            scheduler_dag = 
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self))  # type: 
ignore[arg-type]
-            # Preserve callback functions from original Dag since they're lost 
during serialization
-            # and yes it is a hack for now! It is a tradeoff for code 
simplicity.
-            # Without it, we need "Scheduler Dag" (Serialized dag) for the 
scheduler bits
-            #   -- dep check, scheduling tis
-            # and need real dag to get and run callbacks without having to 
load the dag model
+            from airflow.models.dagbag import DBDagBag
+
+            scheduler_dag = 
DBDagBag().get_latest_version_of_dag(dag_id=self.dag_id, session=session)
+            if not scheduler_dag:
+                from airflow.dag_processing.bundles.manager import 
DagBundlesManager
+                from airflow.dag_processing.dagbag import DagBag, 
sync_bag_to_db
+                from airflow.sdk.definitions._internal.dag_parsing_context 
import (
+                    _airflow_parsing_context_manager,
+                )
 
+                manager = DagBundlesManager()
+                manager.sync_bundles_to_db(session=session)
+                session.commit()
+                # sync all bundles? or use the dags-folder bundle?
+                # What if the test dag is in a different bundle?
+                for bundle in manager.get_all_dag_bundles():
+                    if not bundle.is_initialized:
+                        bundle.initialize()
+                    with _airflow_parsing_context_manager(dag_id=self.dag_id):
+                        dagbag = DagBag(
+                            dag_folder=bundle.path, bundle_path=bundle.path, 
include_examples=False
+                        )
+                        sync_bag_to_db(dagbag, bundle.name, bundle.version)
+                        # type: ignore[attr-defined]

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to