This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b84249d9fff Implement basic backfill dry run (#43241)
b84249d9fff is described below
commit b84249d9ffff49f91ccef2e42f5528fec90dc17a
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Oct 23 06:15:51 2024 -0700
Implement basic backfill dry run (#43241)
Add simple dry run functionality. Does not check whether these runs exist.
Just doing the basic thing first. Sample console output.
---
airflow/cli/cli_config.py | 6 +++++
airflow/cli/commands/backfill_command.py | 39 +++++++++++++++++++++++++++++++-
airflow/models/backfill.py | 27 +++++++++++++---------
3 files changed, 60 insertions(+), 12 deletions(-)
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 5d1fe9ba8e5..b818ea08d57 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -325,6 +325,11 @@ ARG_MAX_ACTIVE_RUNS = Arg(
type=positive_int(allow_zero=False),
help="Max active runs for this backfill.",
)
+ARG_BACKFILL_DRY_RUN = Arg(
+ ("--dry-run",),
+ help="Perform a dry run",
+ action="store_true",
+)
# misc
@@ -1030,6 +1035,7 @@ BACKFILL_COMMANDS = (
ARG_DAG_RUN_CONF,
ARG_RUN_BACKWARDS,
ARG_MAX_ACTIVE_RUNS,
+ ARG_BACKFILL_DRY_RUN,
),
),
)
diff --git a/airflow/cli/commands/backfill_command.py
b/airflow/cli/commands/backfill_command.py
index 8714ed55850..378c6ea5f95 100644
--- a/airflow/cli/commands/backfill_command.py
+++ b/airflow/cli/commands/backfill_command.py
@@ -21,10 +21,31 @@ import logging
import signal
from airflow import settings
-from airflow.models.backfill import _create_backfill
+from airflow.models.backfill import _create_backfill, _get_info_list
+from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils
from airflow.utils.cli import sigint_handler
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
+from airflow.utils.session import create_session
+
+
+def _do_dry_run(*, params, dag_id, from_date, to_date, reverse):
+ print("Performing dry run of backfill.")
+ print("Printing params:")
+ for k, v in params.items():
+ print(f" - {k} = {v}")
+ with create_session() as session:
+ serdag = session.get(SerializedDagModel, dag_id)
+
+ info_list = _get_info_list(
+ dag=serdag.dag,
+ from_date=from_date,
+ to_date=to_date,
+ reverse=reverse,
+ )
+ print("Logical dates to be attempted:")
+ for info in info_list:
+ print(f" - {info.logical_date}")
@cli_utils.action_cli
@@ -34,6 +55,22 @@ def create_backfill(args) -> None:
logging.basicConfig(level=settings.LOGGING_LEVEL,
format=settings.SIMPLE_LOG_FORMAT)
signal.signal(signal.SIGTERM, sigint_handler)
+ if args.dry_run:
+ _do_dry_run(
+ params=dict(
+ dag_id=args.dag,
+ from_date=args.from_date,
+ to_date=args.to_date,
+ max_active_runs=args.max_active_runs,
+ reverse=args.run_backwards,
+ dag_run_conf=args.dag_run_conf,
+ ),
+ dag_id=args.dag,
+ from_date=args.from_date,
+ to_date=args.to_date,
+ reverse=args.run_backwards,
+ )
+ return
_create_backfill(
dag_id=args.dag,
from_date=args.from_date,
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 37a95331130..53a9fca1df1 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -175,6 +175,13 @@ def _create_backfill_dag_run(dag, info, backfill_id,
dag_run_conf, backfill_sort
)
+def _get_info_list(*, dag, from_date, to_date, reverse):
+ dagrun_info_list = dag.iter_dagrun_infos_between(from_date, to_date)
+ if reverse:
+ dagrun_info_list = reversed([x for x in
dag.iter_dagrun_infos_between(from_date, to_date)])
+ return dagrun_info_list
+
+
def _create_backfill(
*,
dag_id: str,
@@ -200,6 +207,14 @@ def _create_backfill(
f"There can be only one running backfill per dag."
)
+ dag = serdag.dag
+ depends_on_past = any(x.depends_on_past for x in dag.tasks)
+ if depends_on_past:
+ if reverse is True:
+ raise ValueError(
+ "Backfill cannot be run in reverse when the dag has tasks
where depends_on_past=True"
+ )
+
br = Backfill(
dag_id=dag_id,
from_date=from_date,
@@ -210,18 +225,8 @@ def _create_backfill(
session.add(br)
session.commit()
- dag = serdag.dag
- depends_on_past = any(x.depends_on_past for x in dag.tasks)
- if depends_on_past:
- if reverse is True:
- raise ValueError(
- "Backfill cannot be run in reverse when the dag has tasks
where depends_on_past=True"
- )
-
backfill_sort_ordinal = 0
- dagrun_info_list = dag.iter_dagrun_infos_between(from_date, to_date)
- if reverse:
- dagrun_info_list = reversed([x for x in
dag.iter_dagrun_infos_between(from_date, to_date)])
+ dagrun_info_list = _get_info_list(dag=dag, from_date=from_date,
to_date=to_date, reverse=reverse)
for info in dagrun_info_list:
backfill_sort_ordinal += 1
session.commit()