Copilot commented on code in PR #64939:
URL: https://github.com/apache/airflow/pull/64939#discussion_r3066477479


##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -264,6 +265,8 @@ def _validate_backfill_params(
     current_time = timezone.utcnow()
     if from_date >= current_time and to_date >= current_time:
         raise InvalidBackfillDate("Backfill cannot be executed for future 
dates.")
+    if dag_run_conf is not None:
+        dag.params.deep_merge(dag_run_conf).validate()

Review Comment:
   This call can raise `ValueError` on invalid conf. If the backfill API layer 
doesn’t translate `ValueError` into a 4xx response (as noted in the PR 
description), this change will cause invalid user input to surface as a 500 for 
backfills. Consider catching `ValueError` here (or in the route handler) and 
raising/returning an Airflow exception type that the backfill API already maps 
to a 400-style response.
   ```suggestion
           try:
               dag.params.deep_merge(dag_run_conf).validate()
           except ValueError as e:
               raise AirflowException(f"Invalid dag_run_conf: {e}") from e
   ```



##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -264,6 +265,8 @@ def _validate_backfill_params(
     current_time = timezone.utcnow()
     if from_date >= current_time and to_date >= current_time:
         raise InvalidBackfillDate("Backfill cannot be executed for future 
dates.")
+    if dag_run_conf is not None:
+        dag.params.deep_merge(dag_run_conf).validate()

Review Comment:
   If `dag.params.deep_merge(...)` mutates `dag.params` in-place (common for 
merge helpers), this validation step can permanently alter the DAG’s params for 
the rest of the backfill creation flow and potentially leak into subsequent 
uses of the same SerializedDAG object. To avoid side effects, validate using a 
copy of the params structure (merge into a cloned params dict/ParamsDict) 
before calling `validate()`.



##########
airflow-core/tests/unit/models/test_backfill.py:
##########
@@ -386,6 +386,155 @@ def test_reprocess_behavior(reprocess_behavior, num_in_b, 
exc_reasons, dag_maker
     assert all(x.state == DagRunState.QUEUED for x in dag_runs_in_b)
 
 
[email protected](
+    "reprocess_behavior",
+    [ReprocessBehavior.FAILED, ReprocessBehavior.COMPLETED],
+)
+def test_backfill_conf_overrides_existing_dag_run(reprocess_behavior, 
dag_maker, session):
+    """When reprocessing an existing DagRun, the backfill's dag_run_conf 
should override the existing conf."""
+    with dag_maker(schedule="@daily") as dag:
+        PythonOperator(task_id="hi", python_callable=print)
+
+    existing_date = "2021-01-03"
+    dag_maker.create_dagrun(
+        run_id=f"scheduled_{existing_date}",
+        logical_date=timezone.parse(existing_date),
+        session=session,
+        state="failed",

Review Comment:
   The new tests set DagRun state using a raw string (`\"failed\"`). For 
consistency and to avoid brittle coupling to string values, prefer using the 
existing state constants (e.g., `DagRunState.FAILED`) used elsewhere in Airflow 
tests/models.
   ```suggestion
           state=DagRunState.FAILED,
   ```



##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -249,6 +249,7 @@ def _validate_backfill_params(
     from_date: datetime,
     to_date: datetime,
     reprocess_behavior: ReprocessBehavior | None,
+    dag_run_conf: dict | None = None,
 ) -> None:

Review Comment:
   This PR introduces a semantic difference between `dag_run_conf=None` (do not 
override existing DagRun.conf) and `dag_run_conf={}` (override to empty), but 
the Backfill model’s `dag_run_conf` column is non-nullable (per context) and 
defaults to `{}`. Persisting the backfill job without a way to represent “conf 
omitted” risks losing that distinction for any later processing of the backfill 
(e.g., subsequent clearing/reprocessing driven from the persisted Backfill 
record). Consider either making the column nullable, or persisting an explicit 
flag (e.g., `dag_run_conf_provided: bool`) to preserve intent across the 
backfill lifecycle.



##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -524,15 +529,18 @@ def _handle_clear_run(
         run_on_latest_version=run_on_latest,
     )
 
-    # Update backfill_id and run_type in DagRun table
+    # Update backfill_id, run_type, and optionally conf in DagRun table
+    update_values: dict = dict(
+        backfill_id=backfill_id,
+        run_type=DagRunType.BACKFILL_JOB,
+        triggered_by=DagRunTriggeredByType.BACKFILL,
+    )
+    if dag_run_conf is not None:
+        update_values["conf"] = dag_run_conf
     session.execute(
         update(DagRun)
         .where(DagRun.logical_date == info.logical_date, DagRun.dag_id == 
dag.dag_id)
-        .values(
-            backfill_id=backfill_id,
-            run_type=DagRunType.BACKFILL_JOB,
-            triggered_by=DagRunTriggeredByType.BACKFILL,
-        )
+        .values(**update_values)

Review Comment:
   Updating DagRun rows by (dag_id, logical_date) can unintentionally update 
multiple rows because Airflow does not generally enforce uniqueness on 
logical_date. Since you already have the specific DagRun instance (`dr`), the 
update should target a unique identifier (e.g., `DagRun.id == dr.id` or 
`DagRun.run_id == dr.run_id`) to avoid overwriting `conf` (and other fields) on 
unrelated runs that share the same logical_date.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to