This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4be5616a5b Add subdir parameter to dags reserialize command (#26170)
4be5616a5b is described below

commit 4be5616a5b3ccc146316c651c4279e914c93740b
Author: Kamil BreguĊ‚a <[email protected]>
AuthorDate: Tue Sep 6 21:11:26 2022 +0200

    Add subdir parameter to dags reserialize command (#26170)
---
 airflow/cli/cli_parser.py               |  5 ++++-
 airflow/cli/commands/dag_command.py     |  3 +--
 tests/cli/commands/test_dag_command.py  | 24 ++++++++++++++++++++++++
 tests/cli/commands/test_task_command.py |  1 +
 4 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 96cdefc731..231bbed490 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1189,7 +1189,10 @@ DAGS_COMMANDS = (
             "version of Airflow that you are running."
         ),
         
func=lazy_load_command('airflow.cli.commands.dag_command.dag_reserialize'),
-        args=(ARG_CLEAR_ONLY,),
+        args=(
+            ARG_CLEAR_ONLY,
+            ARG_SUBDIR,
+        ),
     ),
 )
 TASKS_COMMANDS = (
diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index 447c819852..29c4a4f3ff 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -503,6 +503,5 @@ def dag_reserialize(args, session: Session = NEW_SESSION):
     session.query(SerializedDagModel).delete(synchronize_session=False)
 
     if not args.clear_only:
-        dagbag = DagBag()
-        dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+        dagbag = DagBag(process_subdir(args.subdir))
         dagbag.sync_to_db(session=session)
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 47762928ce..2ba4b9cc32 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -80,6 +80,30 @@ class TestCliDags(unittest.TestCase):
             serialized_dags_after_reserialize = 
session.query(SerializedDagModel).all()
         assert len(serialized_dags_after_reserialize) >= 40  # Serialized DAGs 
back
 
+    def test_reserialize_should_support_subdir_argument(self):
+        # Run clear of serialized dags
+        dag_command.dag_reserialize(self.parser.parse_args(['dags', 
'reserialize', "--clear-only"]))
+
+        # Assert no serialized Dags
+        with create_session() as session:
+            serialized_dags_after_clear = 
session.query(SerializedDagModel).all()
+        assert len(serialized_dags_after_clear) == 0
+
+        # Serialize manually
+        dag_path = self.dagbag.dags['example_bash_operator'].fileloc
+        # Set default value of include_examples parameter to false
+        dagbag_default = list(DagBag.__init__.__defaults__)
+        dagbag_default[1] = False
+        with mock.patch(
+            'airflow.cli.commands.dag_command.DagBag.__init__.__defaults__', 
tuple(dagbag_default)
+        ):
+            dag_command.dag_reserialize(self.parser.parse_args(['dags', 
'reserialize', '--subdir', dag_path]))
+
+        # Check serialized DAG are back
+        with create_session() as session:
+            serialized_dags_after_reserialize = 
session.query(SerializedDagModel).all()
+        assert len(serialized_dags_after_reserialize) == 1  # Serialized DAG 
back
+
     @mock.patch("airflow.cli.commands.dag_command.DAG.run")
     def test_backfill(self, mock_run):
         dag_command.dag_backfill(
diff --git a/tests/cli/commands/test_task_command.py 
b/tests/cli/commands/test_task_command.py
index b0782790ce..8476d7f3e9 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -73,6 +73,7 @@ class TestCliTasks:
         clear_db_runs()
 
         cls.dag = cls.dagbag.get_dag(cls.dag_id)
+        cls.dagbag.sync_to_db()
         cls.dag_run = cls.dag.create_dagrun(
             state=State.NONE, run_id=cls.run_id, run_type=DagRunType.MANUAL, 
execution_date=DEFAULT_DATE
         )

Reply via email to