jedcunningham commented on code in PR #45507:
URL: https://github.com/apache/airflow/pull/45507#discussion_r1912616447


##########
airflow/cli/commands/remote_commands/dag_command.py:
##########
@@ -537,7 +537,20 @@ def dag_test(args, dag: DAG | None = None, session: 
Session = NEW_SESSION) -> No
 @provide_session
 def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
     """Serialize a DAG instance."""
-    # TODO: AIP-66 bundle centric reserialize
-    raise NotImplementedError(
-        "AIP-66: This command is not implemented yet - use `dag-processor 
--num-runs 1` in the meantime."
-    )
+    from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+    manager = DagBundlesManager()
+    manager.parse_config()

Review Comment:
   ```suggestion
   ```
   
   No need to call this explicitly, it's done in `__init__`.



##########
airflow/cli/cli_config.py:
##########
@@ -166,6 +166,14 @@ def string_lower_type(val):
     ),
     default="[AIRFLOW_HOME]/dags" if BUILD_DOCS else settings.DAGS_FOLDER,
 )
+ARG_BUNDLE_NAME = Arg(
+    (
+        "-B",
+        "--bundle-name",
+    ),
+    help=("The name of the DAG bundle to use."),
+    default="dags-folder",

Review Comment:
   We should default to None instead, and parse all configured bundles.



##########
airflow/cli/commands/remote_commands/dag_command.py:
##########
@@ -537,7 +537,20 @@ def dag_test(args, dag: DAG | None = None, session: 
Session = NEW_SESSION) -> No
 @provide_session
 def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
     """Serialize a DAG instance."""
-    # TODO: AIP-66 bundle centric reserialize
-    raise NotImplementedError(
-        "AIP-66: This command is not implemented yet - use `dag-processor 
--num-runs 1` in the meantime."
-    )
+    from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+    manager = DagBundlesManager()
+    manager.parse_config()
+    manager.sync_bundles_to_db(session=session)
+    session.commit()
+    if args.bundle_name:
+        bundle = manager.get_bundle(args.bundle_name)
+        if not bundle:
+            raise SystemExit(f"Bundle {args.bundle_name} not found")
+        dag_bag = DagBag(bundle.path)
+        dag_bag.sync_to_db(bundle.name, bundle_version=bundle.version, 
session=session)
+    else:
+        bundles = manager.get_all_dag_bundles()
+        for bundle in bundles:
+            dag_bag = DagBag(bundle.path)

Review Comment:
   ```suggestion
               dag_bag = DagBag(bundle.path, include_examples=False)
   ```
   
   Also here.



##########
airflow/cli/commands/remote_commands/dag_command.py:
##########
@@ -537,7 +537,20 @@ def dag_test(args, dag: DAG | None = None, session: 
Session = NEW_SESSION) -> No
 @provide_session
 def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
     """Serialize a DAG instance."""
-    # TODO: AIP-66 bundle centric reserialize
-    raise NotImplementedError(
-        "AIP-66: This command is not implemented yet - use `dag-processor 
--num-runs 1` in the meantime."
-    )
+    from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+    manager = DagBundlesManager()
+    manager.parse_config()
+    manager.sync_bundles_to_db(session=session)
+    session.commit()
+    if args.bundle_name:
+        bundle = manager.get_bundle(args.bundle_name)
+        if not bundle:
+            raise SystemExit(f"Bundle {args.bundle_name} not found")
+        dag_bag = DagBag(bundle.path)

Review Comment:
   ```suggestion
           dag_bag = DagBag(bundle.path, include_examples=False)
   ```
   
   Once we have #45532, we should explicitly disable examples, as examples are 
their own distinct bundle now.



##########
airflow/cli/commands/remote_commands/dag_command.py:
##########
@@ -537,7 +537,20 @@ def dag_test(args, dag: DAG | None = None, session: 
Session = NEW_SESSION) -> No
 @provide_session
 def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
     """Serialize a DAG instance."""
-    # TODO: AIP-66 bundle centric reserialize
-    raise NotImplementedError(
-        "AIP-66: This command is not implemented yet - use `dag-processor 
--num-runs 1` in the meantime."
-    )
+    from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+    manager = DagBundlesManager()
+    manager.parse_config()
+    manager.sync_bundles_to_db(session=session)
+    session.commit()
+    if args.bundle_name:
+        bundle = manager.get_bundle(args.bundle_name)
+        if not bundle:
+            raise SystemExit(f"Bundle {args.bundle_name} not found")
+        dag_bag = DagBag(bundle.path)
+        dag_bag.sync_to_db(bundle.name, bundle_version=bundle.version, 
session=session)

Review Comment:
   ```suggestion
           dag_bag.sync_to_db(bundle.name, 
bundle_version=bundle.get_current_version(), session=session)
   ```
   
   Shouldn't this be `get_current_version`? We passed None to `get_bundle` so 
that we get the latest (if the bundle supports versioning of course).



##########
tests/cli/commands/remote_commands/test_dag_command.py:
##########
@@ -108,20 +106,13 @@ def test_reserialize_should_support_subdir_argument(self, 
session):
         serialized_dags_after_clear = session.query(SerializedDagModel).all()
         assert len(serialized_dags_after_clear) == 0
 
-        # Serialize manually
-        dag_path = self.dagbag.dags["example_bash_operator"].fileloc
-        # Set default value of include_examples parameter to false
-        dagbag_default = list(DagBag.__init__.__defaults__)
-        dagbag_default[1] = False
-        with mock.patch(
-            
"airflow.cli.commands.remote_commands.dag_command.DagBag.__init__.__defaults__",
-            tuple(dagbag_default),
-        ):
-            dag_command.dag_reserialize(self.parser.parse_args(["dags", 
"reserialize", "--subdir", dag_path]))
+        dag_command.dag_reserialize(

Review Comment:
   Probably worth using `configure_testing_dag_bundle` - that'll let you point 
at a single file still. And you can leave the example DAGs on to prove that 
it's parsing a single bundle.
   
   Then a new test prove it's parsing more than just 1 bundle too.



##########
airflow/cli/commands/remote_commands/dag_command.py:
##########
@@ -537,7 +537,20 @@ def dag_test(args, dag: DAG | None = None, session: 
Session = NEW_SESSION) -> No
 @provide_session
 def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
     """Serialize a DAG instance."""
-    # TODO: AIP-66 bundle centric reserialize
-    raise NotImplementedError(
-        "AIP-66: This command is not implemented yet - use `dag-processor 
--num-runs 1` in the meantime."
-    )
+    from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+    manager = DagBundlesManager()
+    manager.parse_config()
+    manager.sync_bundles_to_db(session=session)
+    session.commit()
+    if args.bundle_name:
+        bundle = manager.get_bundle(args.bundle_name)
+        if not bundle:
+            raise SystemExit(f"Bundle {args.bundle_name} not found")
+        dag_bag = DagBag(bundle.path)
+        dag_bag.sync_to_db(bundle.name, bundle_version=bundle.version, 
session=session)
+    else:
+        bundles = manager.get_all_dag_bundles()
+        for bundle in bundles:
+            dag_bag = DagBag(bundle.path)
+            dag_bag.sync_to_db(bundle.name, bundle_version=bundle.version, 
session=session)

Review Comment:
   ```suggestion
               dag_bag.sync_to_db(bundle.name, 
bundle_version=bundle.get_current_version(), session=session)
   ```
   
   Also here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to