pierrejeambrun commented on code in PR #45731:
URL: https://github.com/apache/airflow/pull/45731#discussion_r1922244421


##########
airflow/models/backfill.py:
##########
@@ -75,6 +75,22 @@ class DagNoScheduleException(AirflowException):
     """
 
 
+class InvalidBackfillDirection(AirflowException):
+    """
+    Raised when backfill is attempted in reverse with tasks that depend on 
past runs.

Review Comment:
   ```suggestion
       Raised when backfill is attempted in reverse order with tasks that 
depend on past runs.
   ```



##########
tests/api_fastapi/core_api/routes/public/test_backfills.py:
##########
@@ -267,6 +267,53 @@ def test_no_schedule_dag(self, session, dag_maker, 
test_client):
         assert response.status_code == 409
         assert response.json().get("detail") == f"{dag.dag_id} has no schedule"
 
+    @pytest.mark.parametrize(
+        "repro_act, repro_exp,run_backwards, status_code",

Review Comment:
   ```suggestion
           "repro_act, repro_exp, run_backwards, status_code",
   ```



##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -216,6 +218,17 @@ def create_backfill(
             status_code=status.HTTP_409_CONFLICT,
             detail=f"{backfill_request.dag_id} has no schedule",
         )
+    except InvalidReprocessBehavior:
+        raise HTTPException(
+            status_code=status.HTTP_409_CONFLICT,
+            detail=f"{backfill_request.dag_id} has tasks for which 
depends_on_past=True. "
+            "You must set reprocess behavior to reprocess completed or 
reprocess failed.",
+        )
+    except InvalidBackfillDirection:
+        raise HTTPException(
+            status_code=status.HTTP_409_CONFLICT,
+            detail="Backfill cannot be run in reverse when the DAG has tasks 
where depends_on_past=True.",
+        )

Review Comment:
   Same for DagNoScheduleException, 409 does not seem to be the most appropriate



##########
tests/api_fastapi/core_api/routes/public/test_backfills.py:
##########
@@ -267,6 +267,53 @@ def test_no_schedule_dag(self, session, dag_maker, 
test_client):
         assert response.status_code == 409
         assert response.json().get("detail") == f"{dag.dag_id} has no schedule"
 
+    @pytest.mark.parametrize(
+        "repro_act, repro_exp,run_backwards, status_code",
+        [
+            ("none", ReprocessBehavior.NONE, False, 409),
+            ("completed", ReprocessBehavior.COMPLETED, False, 200),
+            ("completed", ReprocessBehavior.COMPLETED, True, 409),
+        ],
+    )
+    def test_create_backfill_with_depends_on_past(
+        self, repro_act, repro_exp, run_backwards, status_code, session, 
dag_maker, test_client
+    ):
+        with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * 
*") as dag:
+            EmptyOperator(task_id="mytask", depends_on_past=True)
+        session.query(DagModel).all()
+        session.commit()

Review Comment:
   What are those two lines for ? Query without using any results, and 
committing an empty session I believe.



##########
airflow/models/backfill.py:
##########
@@ -75,6 +75,22 @@ class DagNoScheduleException(AirflowException):
     """
 
 
+class InvalidBackfillDirection(AirflowException):
+    """
+    Raised when backfill is attempted in reverse with tasks that depend on 
past runs.
+
+    :meta private:
+    """
+
+
+class InvalidReprocessBehavior(AirflowException):
+    """
+    Raised when reprocess behavior is not set for tasks with 
depends_on_past=True.

Review Comment:
   ```suggestion
   class InvalidReprocessBehavior(AirflowException):
       """
       Raised when a backfill cannot be completed because the reprocess 
behavior is not valid.
   ```



##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -216,6 +218,17 @@ def create_backfill(
             status_code=status.HTTP_409_CONFLICT,
             detail=f"{backfill_request.dag_id} has no schedule",
         )
+    except InvalidReprocessBehavior:
+        raise HTTPException(
+            status_code=status.HTTP_409_CONFLICT,
+            detail=f"{backfill_request.dag_id} has tasks for which 
depends_on_past=True. "
+            "You must set reprocess behavior to reprocess completed or 
reprocess failed.",
+        )
+    except InvalidBackfillDirection:
+        raise HTTPException(
+            status_code=status.HTTP_409_CONFLICT,
+            detail="Backfill cannot be run in reverse when the DAG has tasks 
where depends_on_past=True.",
+        )

Review Comment:
   I'm not sure that those are `409` conflict. Maybe more of a 422 
unprocessable entity. I think the default error message from the 
`InvalidReprocessBehavior` and `InvalidBackfillDirection` is good enough and 
should be retrieved directly from the exception.



##########
tests/api_fastapi/core_api/routes/public/test_backfills.py:
##########
@@ -267,6 +267,53 @@ def test_no_schedule_dag(self, session, dag_maker, 
test_client):
         assert response.status_code == 409
         assert response.json().get("detail") == f"{dag.dag_id} has no schedule"
 
+    @pytest.mark.parametrize(
+        "repro_act, repro_exp,run_backwards, status_code",
+        [
+            ("none", ReprocessBehavior.NONE, False, 409),
+            ("completed", ReprocessBehavior.COMPLETED, False, 200),
+            ("completed", ReprocessBehavior.COMPLETED, True, 409),
+        ],
+    )
+    def test_create_backfill_with_depends_on_past(

Review Comment:
   Ideally we would want a similar test for the dry_run endpoint.



##########
tests/api_fastapi/core_api/routes/public/test_backfills.py:
##########
@@ -267,6 +267,53 @@ def test_no_schedule_dag(self, session, dag_maker, 
test_client):
         assert response.status_code == 409
         assert response.json().get("detail") == f"{dag.dag_id} has no schedule"
 
+    @pytest.mark.parametrize(
+        "repro_act, repro_exp,run_backwards, status_code",
+        [
+            ("none", ReprocessBehavior.NONE, False, 409),
+            ("completed", ReprocessBehavior.COMPLETED, False, 200),
+            ("completed", ReprocessBehavior.COMPLETED, True, 409),
+        ],
+    )
+    def test_create_backfill_with_depends_on_past(
+        self, repro_act, repro_exp, run_backwards, status_code, session, 
dag_maker, test_client
+    ):
+        with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * 
*") as dag:
+            EmptyOperator(task_id="mytask", depends_on_past=True)
+        session.query(DagModel).all()
+        session.commit()
+        from_date = pendulum.parse("2024-01-01")
+        from_date_iso = to_iso(from_date)
+        to_date = pendulum.parse("2024-02-01")
+        to_date_iso = to_iso(to_date)
+        max_active_runs = 5
+        data = {
+            "dag_id": dag.dag_id,
+            "from_date": f"{from_date_iso}",
+            "to_date": f"{to_date_iso}",
+            "max_active_runs": max_active_runs,
+            "run_backwards": run_backwards,
+            "dag_run_conf": {"param1": "val1", "param2": True},
+            "dry_run": False,
+            "reprocess_behavior": repro_act,
+        }
+        response = test_client.post(
+            url="/public/backfills",
+            json=data,
+        )
+        assert response.status_code == status_code
+        if response.status_code == 409 and repro_act == "none":
+            assert (
+                response.json().get("detail")
+                == f"{dag.dag_id} has tasks for which depends_on_past=True. 
You must set reprocess behavior to reprocess completed or reprocess failed."
+            )
+
+        if response.status_code == 409 and repro_act == "completed":

Review Comment:
   If response is not success then handle errors
   ```suggestion
           if response.status_code != 200:
   ```
   
   ```
   if run_backward:
   ...
   else:
   ...
   ```
   
   To follow the same logic as the code implementation. First if reverse, then 
if reprocess_behavior. Swapping the assertion order and if condition makes it 
harder to read and understand if all codepath are taken care of for no real 
addition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to