pierrejeambrun commented on code in PR #45731:
URL: https://github.com/apache/airflow/pull/45731#discussion_r1922244421
##########
airflow/models/backfill.py:
##########
@@ -75,6 +75,22 @@ class DagNoScheduleException(AirflowException):
"""
+class InvalidBackfillDirection(AirflowException):
+ """
+ Raised when backfill is attempted in reverse with tasks that depend on
past runs.
Review Comment:
```suggestion
Raised when backfill is attempted in reverse order with tasks that
depend on past runs.
```
##########
tests/api_fastapi/core_api/routes/public/test_backfills.py:
##########
@@ -267,6 +267,53 @@ def test_no_schedule_dag(self, session, dag_maker,
test_client):
assert response.status_code == 409
assert response.json().get("detail") == f"{dag.dag_id} has no schedule"
+ @pytest.mark.parametrize(
+ "repro_act, repro_exp,run_backwards, status_code",
Review Comment:
```suggestion
"repro_act, repro_exp, run_backwards, status_code",
```
##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -216,6 +218,17 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"{backfill_request.dag_id} has no schedule",
)
+ except InvalidReprocessBehavior:
+ raise HTTPException(
+ status_code=status.HTTP_409_CONFLICT,
+ detail=f"{backfill_request.dag_id} has tasks for which
depends_on_past=True. "
+ "You must set reprocess behavior to reprocess completed or
reprocess failed.",
+ )
+ except InvalidBackfillDirection:
+ raise HTTPException(
+ status_code=status.HTTP_409_CONFLICT,
+ detail="Backfill cannot be run in reverse when the DAG has tasks
where depends_on_past=True.",
+ )
Review Comment:
Same for DagNoScheduleException, 409 does not seem to be the most appropriate
##########
tests/api_fastapi/core_api/routes/public/test_backfills.py:
##########
@@ -267,6 +267,53 @@ def test_no_schedule_dag(self, session, dag_maker,
test_client):
assert response.status_code == 409
assert response.json().get("detail") == f"{dag.dag_id} has no schedule"
+ @pytest.mark.parametrize(
+ "repro_act, repro_exp,run_backwards, status_code",
+ [
+ ("none", ReprocessBehavior.NONE, False, 409),
+ ("completed", ReprocessBehavior.COMPLETED, False, 200),
+ ("completed", ReprocessBehavior.COMPLETED, True, 409),
+ ],
+ )
+ def test_create_backfill_with_depends_on_past(
+ self, repro_act, repro_exp, run_backwards, status_code, session,
dag_maker, test_client
+ ):
+ with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * *
*") as dag:
+ EmptyOperator(task_id="mytask", depends_on_past=True)
+ session.query(DagModel).all()
+ session.commit()
Review Comment:
What are those two lines for ? Query without using any results, and
committing an empty session I believe.
##########
airflow/models/backfill.py:
##########
@@ -75,6 +75,22 @@ class DagNoScheduleException(AirflowException):
"""
+class InvalidBackfillDirection(AirflowException):
+ """
+ Raised when backfill is attempted in reverse with tasks that depend on
past runs.
+
+ :meta private:
+ """
+
+
+class InvalidReprocessBehavior(AirflowException):
+ """
+ Raised when reprocess behavior is not set for tasks with
depends_on_past=True.
Review Comment:
```suggestion
class InvalidReprocessBehavior(AirflowException):
"""
Raised when a backfill cannot be completed because the reprocess
behavior is not valid.
```
##########
airflow/api_fastapi/core_api/routes/public/backfills.py:
##########
@@ -216,6 +218,17 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"{backfill_request.dag_id} has no schedule",
)
+ except InvalidReprocessBehavior:
+ raise HTTPException(
+ status_code=status.HTTP_409_CONFLICT,
+ detail=f"{backfill_request.dag_id} has tasks for which
depends_on_past=True. "
+ "You must set reprocess behavior to reprocess completed or
reprocess failed.",
+ )
+ except InvalidBackfillDirection:
+ raise HTTPException(
+ status_code=status.HTTP_409_CONFLICT,
+ detail="Backfill cannot be run in reverse when the DAG has tasks
where depends_on_past=True.",
+ )
Review Comment:
I'm not sure that those are `409` conflict. Maybe more of a 422
unprocessable entity. I think the default error message from the
`InvalidReprocessBehavior` and `InvalidBackfillDirection` is good enough and
should be retrieved directly from the exception.
##########
tests/api_fastapi/core_api/routes/public/test_backfills.py:
##########
@@ -267,6 +267,53 @@ def test_no_schedule_dag(self, session, dag_maker,
test_client):
assert response.status_code == 409
assert response.json().get("detail") == f"{dag.dag_id} has no schedule"
+ @pytest.mark.parametrize(
+ "repro_act, repro_exp,run_backwards, status_code",
+ [
+ ("none", ReprocessBehavior.NONE, False, 409),
+ ("completed", ReprocessBehavior.COMPLETED, False, 200),
+ ("completed", ReprocessBehavior.COMPLETED, True, 409),
+ ],
+ )
+ def test_create_backfill_with_depends_on_past(
Review Comment:
Ideally we would want a similar test for the dry_run endpoint.
##########
tests/api_fastapi/core_api/routes/public/test_backfills.py:
##########
@@ -267,6 +267,53 @@ def test_no_schedule_dag(self, session, dag_maker,
test_client):
assert response.status_code == 409
assert response.json().get("detail") == f"{dag.dag_id} has no schedule"
+ @pytest.mark.parametrize(
+ "repro_act, repro_exp,run_backwards, status_code",
+ [
+ ("none", ReprocessBehavior.NONE, False, 409),
+ ("completed", ReprocessBehavior.COMPLETED, False, 200),
+ ("completed", ReprocessBehavior.COMPLETED, True, 409),
+ ],
+ )
+ def test_create_backfill_with_depends_on_past(
+ self, repro_act, repro_exp, run_backwards, status_code, session,
dag_maker, test_client
+ ):
+ with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * *
*") as dag:
+ EmptyOperator(task_id="mytask", depends_on_past=True)
+ session.query(DagModel).all()
+ session.commit()
+ from_date = pendulum.parse("2024-01-01")
+ from_date_iso = to_iso(from_date)
+ to_date = pendulum.parse("2024-02-01")
+ to_date_iso = to_iso(to_date)
+ max_active_runs = 5
+ data = {
+ "dag_id": dag.dag_id,
+ "from_date": f"{from_date_iso}",
+ "to_date": f"{to_date_iso}",
+ "max_active_runs": max_active_runs,
+ "run_backwards": run_backwards,
+ "dag_run_conf": {"param1": "val1", "param2": True},
+ "dry_run": False,
+ "reprocess_behavior": repro_act,
+ }
+ response = test_client.post(
+ url="/public/backfills",
+ json=data,
+ )
+ assert response.status_code == status_code
+ if response.status_code == 409 and repro_act == "none":
+ assert (
+ response.json().get("detail")
+ == f"{dag.dag_id} has tasks for which depends_on_past=True.
You must set reprocess behavior to reprocess completed or reprocess failed."
+ )
+
+ if response.status_code == 409 and repro_act == "completed":
Review Comment:
If response is not success then handle errors
```suggestion
if response.status_code != 200:
```
```
if run_backward:
...
else:
...
```
To follow the same logic as the code implementation. First if reverse, then
if reprocess_behavior. Swapping the assertion order and if condition makes it
harder to read and understand if all codepath are taken care of for no real
addition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]