This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4be5616a5b Add subdir parameter to dags reserialize command (#26170)
4be5616a5b is described below
commit 4be5616a5b3ccc146316c651c4279e914c93740b
Author: Kamil BreguĊa <[email protected]>
AuthorDate: Tue Sep 6 21:11:26 2022 +0200
Add subdir parameter to dags reserialize command (#26170)
---
airflow/cli/cli_parser.py | 5 ++++-
airflow/cli/commands/dag_command.py | 3 +--
tests/cli/commands/test_dag_command.py | 24 ++++++++++++++++++++++++
tests/cli/commands/test_task_command.py | 1 +
4 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 96cdefc731..231bbed490 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1189,7 +1189,10 @@ DAGS_COMMANDS = (
"version of Airflow that you are running."
),
func=lazy_load_command('airflow.cli.commands.dag_command.dag_reserialize'),
- args=(ARG_CLEAR_ONLY,),
+ args=(
+ ARG_CLEAR_ONLY,
+ ARG_SUBDIR,
+ ),
),
)
TASKS_COMMANDS = (
diff --git a/airflow/cli/commands/dag_command.py
b/airflow/cli/commands/dag_command.py
index 447c819852..29c4a4f3ff 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -503,6 +503,5 @@ def dag_reserialize(args, session: Session = NEW_SESSION):
session.query(SerializedDagModel).delete(synchronize_session=False)
if not args.clear_only:
- dagbag = DagBag()
- dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+ dagbag = DagBag(process_subdir(args.subdir))
dagbag.sync_to_db(session=session)
diff --git a/tests/cli/commands/test_dag_command.py
b/tests/cli/commands/test_dag_command.py
index 47762928ce..2ba4b9cc32 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -80,6 +80,30 @@ class TestCliDags(unittest.TestCase):
serialized_dags_after_reserialize =
session.query(SerializedDagModel).all()
assert len(serialized_dags_after_reserialize) >= 40 # Serialized DAGs
back
+ def test_reserialize_should_support_subdir_argument(self):
+ # Run clear of serialized dags
+ dag_command.dag_reserialize(self.parser.parse_args(['dags',
'reserialize', "--clear-only"]))
+
+ # Assert no serialized Dags
+ with create_session() as session:
+ serialized_dags_after_clear =
session.query(SerializedDagModel).all()
+ assert len(serialized_dags_after_clear) == 0
+
+ # Serialize manually
+ dag_path = self.dagbag.dags['example_bash_operator'].fileloc
+ # Set default value of include_examples parameter to false
+ dagbag_default = list(DagBag.__init__.__defaults__)
+ dagbag_default[1] = False
+ with mock.patch(
+ 'airflow.cli.commands.dag_command.DagBag.__init__.__defaults__',
tuple(dagbag_default)
+ ):
+ dag_command.dag_reserialize(self.parser.parse_args(['dags',
'reserialize', '--subdir', dag_path]))
+
+ # Check serialized DAG are back
+ with create_session() as session:
+ serialized_dags_after_reserialize =
session.query(SerializedDagModel).all()
+ assert len(serialized_dags_after_reserialize) == 1 # Serialized DAG
back
+
@mock.patch("airflow.cli.commands.dag_command.DAG.run")
def test_backfill(self, mock_run):
dag_command.dag_backfill(
diff --git a/tests/cli/commands/test_task_command.py
b/tests/cli/commands/test_task_command.py
index b0782790ce..8476d7f3e9 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -73,6 +73,7 @@ class TestCliTasks:
clear_db_runs()
cls.dag = cls.dagbag.get_dag(cls.dag_id)
+ cls.dagbag.sync_to_db()
cls.dag_run = cls.dag.create_dagrun(
state=State.NONE, run_id=cls.run_id, run_type=DagRunType.MANUAL,
execution_date=DEFAULT_DATE
)