This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 330f3efd26 Add backfill cancellation logic (#42530)
330f3efd26 is described below

commit 330f3efd26cb3d65d99d2d5e70e84c1a2460cd66
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Oct 2 05:59:14 2024 -0700

    Add backfill cancellation logic (#42530)
---
 .../api_connexion/endpoints/backfill_endpoint.py   | 40 +++++++++----------
 airflow/models/backfill.py                         | 45 ++++++++++++++++++++--
 tests/models/test_backfill.py                      | 45 +++++++++++++++++++++-
 3 files changed, 104 insertions(+), 26 deletions(-)

diff --git a/airflow/api_connexion/endpoints/backfill_endpoint.py 
b/airflow/api_connexion/endpoints/backfill_endpoint.py
index baafdeea4f..a0e728c5bc 100644
--- a/airflow/api_connexion/endpoints/backfill_endpoint.py
+++ b/airflow/api_connexion/endpoints/backfill_endpoint.py
@@ -32,8 +32,12 @@ from airflow.api_connexion.schemas.backfill_schema import (
     backfill_collection_schema,
     backfill_schema,
 )
-from airflow.models.backfill import AlreadyRunningBackfill, Backfill, 
_create_backfill
-from airflow.utils import timezone
+from airflow.models.backfill import (
+    AlreadyRunningBackfill,
+    Backfill,
+    _cancel_backfill,
+    _create_backfill,
+)
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.www.decorators import action_logging
 
@@ -104,24 +108,6 @@ def unpause_backfill(*, backfill_id, session, **kwargs):
     return backfill_schema.dump(br)
 
 
-@provide_session
-@backfill_to_dag
[email protected]_access_dag("PUT")
-@action_logging
-def cancel_backfill(*, backfill_id, session, **kwargs):
-    br: Backfill = session.get(Backfill, backfill_id)
-    if br.completed_at is not None:
-        raise Conflict("Backfill is already completed.")
-
-    br.completed_at = timezone.utcnow()
-
-    # first, pause
-    if not br.is_paused:
-        br.is_paused = True
-    session.commit()
-    return backfill_schema.dump(br)
-
-
 @provide_session
 @backfill_to_dag
 @security.requires_access_dag("GET")
@@ -155,3 +141,17 @@ def create_backfill(
         return backfill_schema.dump(backfill_obj)
     except AlreadyRunningBackfill:
         raise Conflict(f"There is already a running backfill for dag {dag_id}")
+
+
+@provide_session
+@backfill_to_dag
[email protected]_access_dag("PUT")
+@action_logging
+def cancel_backfill(
+    *,
+    backfill_id,
+    session: Session = NEW_SESSION,  # used by backfill_to_dag decorator
+    **kwargs,
+):
+    br = _cancel_backfill(backfill_id=backfill_id)
+    return backfill_schema.dump(br)
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 6d3a8ee4fa..db10c804aa 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -26,12 +26,13 @@ from __future__ import annotations
 import logging
 from typing import TYPE_CHECKING
 
-from sqlalchemy import Boolean, Column, ForeignKeyConstraint, Integer, 
UniqueConstraint, func, select
+from sqlalchemy import Boolean, Column, ForeignKeyConstraint, Integer, 
UniqueConstraint, func, select, update
 from sqlalchemy.orm import relationship
 from sqlalchemy_jsonfield import JSONField
 
-from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.exceptions import Conflict, NotFound
 from airflow.exceptions import AirflowException
+from airflow.models import DagRun
 from airflow.models.base import Base, StringID
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.settings import json
@@ -48,7 +49,11 @@ log = logging.getLogger(__name__)
 
 
 class AlreadyRunningBackfill(AirflowException):
-    """Raised when attempting to create backfill and one already active."""
+    """
+    Raised when attempting to create backfill and one already active.
+
+    :meta private:
+    """
 
 
 class Backfill(Base):
@@ -172,7 +177,11 @@ def _create_backfill(
                     session=session,
                 )
             except Exception:
-                dag.log.exception("something failed")
+                dag.log.exception(
+                    "Error while attempting to create a dag run dag_id='%s' 
logical_date='%s'",
+                    dag.dag_id,
+                    info.logical_date,
+                )
                 session.rollback()
             session.add(
                 BackfillDagRun(
@@ -183,3 +192,31 @@ def _create_backfill(
             )
             session.commit()
     return br
+
+
+def _cancel_backfill(backfill_id) -> Backfill:
+    with create_session() as session:
+        b: Backfill = session.get(Backfill, backfill_id)
+        if b.completed_at is not None:
+            raise Conflict("Backfill is already completed.")
+
+        b.completed_at = timezone.utcnow()
+
+        # first, pause
+        if not b.is_paused:
+            b.is_paused = True
+
+        session.commit()
+
+        # now, let's mark all queued dag runs as failed
+        query = (
+            update(DagRun)
+            .where(
+                
DagRun.id.in_(select(BackfillDagRun.dag_run_id).where(BackfillDagRun.backfill_id
 == b.id)),
+                DagRun.state == DagRunState.QUEUED,
+            )
+            .values(state=DagRunState.FAILED)
+            .execution_options(synchronize_session=False)
+        )
+        session.execute(query)
+    return b
diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py
index 9a845f8680..c45625db33 100644
--- a/tests/models/test_backfill.py
+++ b/tests/models/test_backfill.py
@@ -24,7 +24,13 @@ import pytest
 from sqlalchemy import select
 
 from airflow.models import DagRun
-from airflow.models.backfill import AlreadyRunningBackfill, Backfill, 
BackfillDagRun, _create_backfill
+from airflow.models.backfill import (
+    AlreadyRunningBackfill,
+    Backfill,
+    BackfillDagRun,
+    _cancel_backfill,
+    _create_backfill,
+)
 from airflow.operators.python import PythonOperator
 from airflow.utils.state import DagRunState
 from tests.test_utils.db import clear_db_backfills, clear_db_dags, 
clear_db_runs, clear_db_serialized_dags
@@ -71,7 +77,7 @@ def test_reverse_and_depends_on_past_fails(dep_on_past, 
dag_maker, session):
 
 
 @pytest.mark.parametrize("reverse", [True, False])
-def test_simple(reverse, dag_maker, session):
+def test_create_backfill_simple(reverse, dag_maker, session):
     """
     Verify simple case behavior.
 
@@ -150,3 +156,38 @@ def test_active_dag_run(dag_maker, session):
             reverse=False,
             dag_run_conf={"this": "param"},
         )
+
+
+def test_cancel_backfill(dag_maker, session):
+    """
+    Queued runs should be marked *failed*.
+    Every other dag run should be left alone.
+    """
+    with dag_maker(schedule="@daily") as dag:
+        PythonOperator(task_id="hi", python_callable=print)
+    b = _create_backfill(
+        dag_id=dag.dag_id,
+        from_date=pendulum.parse("2021-01-01"),
+        to_date=pendulum.parse("2021-01-05"),
+        max_active_runs=2,
+        reverse=False,
+        dag_run_conf={},
+    )
+    query = (
+        select(DagRun)
+        .join(BackfillDagRun.dag_run)
+        .where(BackfillDagRun.backfill_id == b.id)
+        .order_by(BackfillDagRun.sort_ordinal)
+    )
+    dag_runs = session.scalars(query).all()
+    dates = [str(x.logical_date.date()) for x in dag_runs]
+    expected_dates = ["2021-01-01", "2021-01-02", "2021-01-03", "2021-01-04", 
"2021-01-05"]
+    assert dates == expected_dates
+    assert all(x.state == DagRunState.QUEUED for x in dag_runs)
+    dag_runs[0].state = "running"
+    session.commit()
+    _cancel_backfill(backfill_id=b.id)
+    session.expunge_all()
+    dag_runs = session.scalars(query).all()
+    states = [x.state for x in dag_runs]
+    assert states == ["running", "failed", "failed", "failed", "failed"]

Reply via email to