This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b84249d9fff Implement basic backfill dry run (#43241)
b84249d9fff is described below

commit b84249d9ffff49f91ccef2e42f5528fec90dc17a
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Oct 23 06:15:51 2024 -0700

    Implement basic backfill dry run (#43241)
    
    Add simple dry run functionality. Does not check whether these runs exist. 
Just doing the basic thing first. Sample console output.
---
 airflow/cli/cli_config.py                |  6 +++++
 airflow/cli/commands/backfill_command.py | 39 +++++++++++++++++++++++++++++++-
 airflow/models/backfill.py               | 27 +++++++++++++---------
 3 files changed, 60 insertions(+), 12 deletions(-)

diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 5d1fe9ba8e5..b818ea08d57 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -325,6 +325,11 @@ ARG_MAX_ACTIVE_RUNS = Arg(
     type=positive_int(allow_zero=False),
     help="Max active runs for this backfill.",
 )
+ARG_BACKFILL_DRY_RUN = Arg(
+    ("--dry-run",),
+    help="Perform a dry run",
+    action="store_true",
+)
 
 
 # misc
@@ -1030,6 +1035,7 @@ BACKFILL_COMMANDS = (
             ARG_DAG_RUN_CONF,
             ARG_RUN_BACKWARDS,
             ARG_MAX_ACTIVE_RUNS,
+            ARG_BACKFILL_DRY_RUN,
         ),
     ),
 )
diff --git a/airflow/cli/commands/backfill_command.py 
b/airflow/cli/commands/backfill_command.py
index 8714ed55850..378c6ea5f95 100644
--- a/airflow/cli/commands/backfill_command.py
+++ b/airflow/cli/commands/backfill_command.py
@@ -21,10 +21,31 @@ import logging
 import signal
 
 from airflow import settings
-from airflow.models.backfill import _create_backfill
+from airflow.models.backfill import _create_backfill, _get_info_list
+from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import sigint_handler
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
+from airflow.utils.session import create_session
+
+
+def _do_dry_run(*, params, dag_id, from_date, to_date, reverse):
+    print("Performing dry run of backfill.")
+    print("Printing params:")
+    for k, v in params.items():
+        print(f"    - {k} = {v}")
+    with create_session() as session:
+        serdag = session.get(SerializedDagModel, dag_id)
+
+    info_list = _get_info_list(
+        dag=serdag.dag,
+        from_date=from_date,
+        to_date=to_date,
+        reverse=reverse,
+    )
+    print("Logical dates to be attempted:")
+    for info in info_list:
+        print(f"    - {info.logical_date}")
 
 
 @cli_utils.action_cli
@@ -34,6 +55,22 @@ def create_backfill(args) -> None:
     logging.basicConfig(level=settings.LOGGING_LEVEL, 
format=settings.SIMPLE_LOG_FORMAT)
     signal.signal(signal.SIGTERM, sigint_handler)
 
+    if args.dry_run:
+        _do_dry_run(
+            params=dict(
+                dag_id=args.dag,
+                from_date=args.from_date,
+                to_date=args.to_date,
+                max_active_runs=args.max_active_runs,
+                reverse=args.run_backwards,
+                dag_run_conf=args.dag_run_conf,
+            ),
+            dag_id=args.dag,
+            from_date=args.from_date,
+            to_date=args.to_date,
+            reverse=args.run_backwards,
+        )
+        return
     _create_backfill(
         dag_id=args.dag,
         from_date=args.from_date,
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 37a95331130..53a9fca1df1 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -175,6 +175,13 @@ def _create_backfill_dag_run(dag, info, backfill_id, 
dag_run_conf, backfill_sort
     )
 
 
+def _get_info_list(*, dag, from_date, to_date, reverse):
+    dagrun_info_list = dag.iter_dagrun_infos_between(from_date, to_date)
+    if reverse:
+        dagrun_info_list = reversed([x for x in 
dag.iter_dagrun_infos_between(from_date, to_date)])
+    return dagrun_info_list
+
+
 def _create_backfill(
     *,
     dag_id: str,
@@ -200,6 +207,14 @@ def _create_backfill(
                 f"There can be only one running backfill per dag."
             )
 
+        dag = serdag.dag
+        depends_on_past = any(x.depends_on_past for x in dag.tasks)
+        if depends_on_past:
+            if reverse is True:
+                raise ValueError(
+                    "Backfill cannot be run in reverse when the dag has tasks 
where depends_on_past=True"
+                )
+
         br = Backfill(
             dag_id=dag_id,
             from_date=from_date,
@@ -210,18 +225,8 @@ def _create_backfill(
         session.add(br)
         session.commit()
 
-        dag = serdag.dag
-        depends_on_past = any(x.depends_on_past for x in dag.tasks)
-        if depends_on_past:
-            if reverse is True:
-                raise ValueError(
-                    "Backfill cannot be run in reverse when the dag has tasks 
where depends_on_past=True"
-                )
-
         backfill_sort_ordinal = 0
-        dagrun_info_list = dag.iter_dagrun_infos_between(from_date, to_date)
-        if reverse:
-            dagrun_info_list = reversed([x for x in 
dag.iter_dagrun_infos_between(from_date, to_date)])
+        dagrun_info_list = _get_info_list(dag=dag, from_date=from_date, 
to_date=to_date, reverse=reverse)
         for info in dagrun_info_list:
             backfill_sort_ordinal += 1
             session.commit()

Reply via email to