Copilot commented on code in PR #64939:
URL: https://github.com/apache/airflow/pull/64939#discussion_r3066477479
##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -264,6 +265,8 @@ def _validate_backfill_params(
current_time = timezone.utcnow()
if from_date >= current_time and to_date >= current_time:
raise InvalidBackfillDate("Backfill cannot be executed for future
dates.")
+ if dag_run_conf is not None:
+ dag.params.deep_merge(dag_run_conf).validate()
Review Comment:
This call can raise `ValueError` on invalid conf. If the backfill API layer
doesn’t translate `ValueError` into a 4xx response (as noted in the PR
description), this change will cause invalid user input to surface as a 500 for
backfills. Consider catching `ValueError` here (or in the route handler) and
raising/returning an Airflow exception type that the backfill API already maps
to a 400-style response.
```suggestion
try:
dag.params.deep_merge(dag_run_conf).validate()
except ValueError as e:
raise AirflowException(f"Invalid dag_run_conf: {e}") from e
```
##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -264,6 +265,8 @@ def _validate_backfill_params(
current_time = timezone.utcnow()
if from_date >= current_time and to_date >= current_time:
raise InvalidBackfillDate("Backfill cannot be executed for future
dates.")
+ if dag_run_conf is not None:
+ dag.params.deep_merge(dag_run_conf).validate()
Review Comment:
If `dag.params.deep_merge(...)` mutates `dag.params` in-place (common for
merge helpers), this validation step can permanently alter the DAG’s params for
the rest of the backfill creation flow and potentially leak into subsequent
uses of the same SerializedDAG object. To avoid side effects, validate using a
copy of the params structure (merge into a cloned params dict/ParamsDict)
before calling `validate()`.
##########
airflow-core/tests/unit/models/test_backfill.py:
##########
@@ -386,6 +386,155 @@ def test_reprocess_behavior(reprocess_behavior, num_in_b,
exc_reasons, dag_maker
assert all(x.state == DagRunState.QUEUED for x in dag_runs_in_b)
[email protected](
+ "reprocess_behavior",
+ [ReprocessBehavior.FAILED, ReprocessBehavior.COMPLETED],
+)
+def test_backfill_conf_overrides_existing_dag_run(reprocess_behavior,
dag_maker, session):
+ """When reprocessing an existing DagRun, the backfill's dag_run_conf
should override the existing conf."""
+ with dag_maker(schedule="@daily") as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+
+ existing_date = "2021-01-03"
+ dag_maker.create_dagrun(
+ run_id=f"scheduled_{existing_date}",
+ logical_date=timezone.parse(existing_date),
+ session=session,
+ state="failed",
Review Comment:
The new tests set DagRun state using a raw string (`\"failed\"`). For
consistency and to avoid brittle coupling to string values, prefer using the
existing state constants (e.g., `DagRunState.FAILED`) used elsewhere in Airflow
tests/models.
```suggestion
state=DagRunState.FAILED,
```
##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -249,6 +249,7 @@ def _validate_backfill_params(
from_date: datetime,
to_date: datetime,
reprocess_behavior: ReprocessBehavior | None,
+ dag_run_conf: dict | None = None,
) -> None:
Review Comment:
This PR introduces a semantic difference between `dag_run_conf=None` (do not
override existing DagRun.conf) and `dag_run_conf={}` (override to empty), but
the Backfill model’s `dag_run_conf` column is non-nullable (per context) and
defaults to `{}`. Persisting the backfill job without a way to represent “conf
omitted” risks losing that distinction for any later processing of the backfill
(e.g., subsequent clearing/reprocessing driven from the persisted Backfill
record). Consider either making the column nullable, or persisting an explicit
flag (e.g., `dag_run_conf_provided: bool`) to preserve intent across the
backfill lifecycle.
##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -524,15 +529,18 @@ def _handle_clear_run(
run_on_latest_version=run_on_latest,
)
- # Update backfill_id and run_type in DagRun table
+ # Update backfill_id, run_type, and optionally conf in DagRun table
+ update_values: dict = dict(
+ backfill_id=backfill_id,
+ run_type=DagRunType.BACKFILL_JOB,
+ triggered_by=DagRunTriggeredByType.BACKFILL,
+ )
+ if dag_run_conf is not None:
+ update_values["conf"] = dag_run_conf
session.execute(
update(DagRun)
.where(DagRun.logical_date == info.logical_date, DagRun.dag_id ==
dag.dag_id)
- .values(
- backfill_id=backfill_id,
- run_type=DagRunType.BACKFILL_JOB,
- triggered_by=DagRunTriggeredByType.BACKFILL,
- )
+ .values(**update_values)
Review Comment:
Updating DagRun rows by (dag_id, logical_date) can unintentionally update
multiple rows because Airflow does not generally enforce uniqueness on
logical_date. Since you already have the specific DagRun instance (`dr`), the
update should target a unique identifier (e.g., `DagRun.id == dr.id` or
`DagRun.run_id == dr.run_id`) to avoid overwriting `conf` (and other fields) on
unrelated runs that share the same logical_date.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]