This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 59bf5b5167192ebc60e2c9235d0dc1e999bf228a Author: Ephraim Anierobi <[email protected]> AuthorDate: Mon Aug 16 09:20:35 2021 +0100 Do not delete running DAG from the UI (#17630) When the DAG appear again in the UI and we rerun it, say we have catchup set to True, those running task instances that were not deleted would be rerun and an external state change of the task instances would be detected by the LocalTaskJob thereby sending SIGTERM to the task runner This change resolves this by making sure that DAGs are not deleted when the task instances are still running (cherry picked from commit 5a64c1c7cb1161f4db79b3dd47dc8881f23a61b3) --- airflow/api/common/experimental/delete_dag.py | 8 ++++++- airflow/www/views.py | 8 +++++++ tests/api/common/experimental/test_delete_dag.py | 29 ++++++++++++++++++++++-- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py index 4462070..eb45384 100644 --- a/airflow/api/common/experimental/delete_dag.py +++ b/airflow/api/common/experimental/delete_dag.py @@ -21,10 +21,11 @@ import logging from sqlalchemy import or_ from airflow import models -from airflow.exceptions import DagNotFound +from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DagModel, TaskFail from airflow.models.serialized_dag import SerializedDagModel from airflow.utils.session import provide_session +from airflow.utils.state import State log = logging.getLogger(__name__) @@ -40,6 +41,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i :return count of deleted dags """ log.info("Deleting DAG: %s", dag_id) + running_tis = ( + session.query(models.TaskInstance.state).filter(models.TaskInstance.state.in_(State.unfinished)).all() + ) + if running_tis: + raise AirflowException("TaskInstances still running") dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first() if dag is None: raise DagNotFound(f"Dag id {dag_id} not found") diff --git a/airflow/www/views.py b/airflow/www/views.py index 830047b..db8dbed 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1456,6 +1456,14 @@ class Airflow(AirflowBaseView): except DagFileExists: flash(f"Dag id {dag_id} is still in DagBag. Remove the DAG file first.", 'error') return redirect(request.referrer) + except AirflowException: + flash( + f"Cannot delete DAG with id {dag_id} because some task instances of the DAG " + "are still running. Please mark the task instances as " + "failed/succeeded before deleting the DAG", + "error", + ) + return redirect(request.referrer) flash(f"Deleting DAG with id {dag_id}. May take a couple minutes to fully disappear.") diff --git a/tests/api/common/experimental/test_delete_dag.py b/tests/api/common/experimental/test_delete_dag.py index 7570cb8..58bcd37 100644 --- a/tests/api/common/experimental/test_delete_dag.py +++ b/tests/api/common/experimental/test_delete_dag.py @@ -20,14 +20,16 @@ import unittest import pytest -from airflow import models +from airflow import models, settings from airflow.api.common.experimental.delete_dag import delete_dag -from airflow.exceptions import DagNotFound +from airflow.exceptions import AirflowException, DagNotFound from airflow.operators.dummy import DummyOperator +from airflow.utils import timezone from airflow.utils.dates import days_ago from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.types import DagRunType +from tests.test_utils.db import clear_db_dags, clear_db_runs DM = models.DagModel DR = models.DagRun @@ -52,6 +54,29 @@ class TestDeleteDAGCatchError(unittest.TestCase): delete_dag("non-existent DAG") +class TestDeleteDAGErrorsOnRunningTI: + def setup_method(self): + clear_db_dags() + clear_db_runs() + + def teardown_method(self): + clear_db_dags() + clear_db_runs() + + def test_delete_dag_running_taskinstances(self, create_dummy_dag): + dag_id = 'test-dag' + _, task = create_dummy_dag(dag_id) + + ti = TI(task, execution_date=timezone.utcnow()) + ti.refresh_from_db() + session = settings.Session() + ti.state = State.RUNNING + session.merge(ti) + session.commit() + with pytest.raises(AirflowException): + delete_dag(dag_id) + + class TestDeleteDAGSuccessfulDelete(unittest.TestCase): dag_file_path = "/usr/local/airflow/dags/test_dag_8.py" key = "test_dag_id"
