This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 446e66b052a116f8371be331624c1e1b03299380 Author: Ephraim Anierobi <[email protected]> AuthorDate: Fri Jun 18 00:29:00 2021 +0100 Fix DAG run state not updated while DAG is paused (#16343) The state of a DAG run does not update while the DAG is paused. The tasks continue to run if the DAG run was kicked off before the DAG was paused and eventually finish and are marked correctly. The DAG run state does not get updated and stays in Running state until the DAG is unpaused. This change fixes it by running a check on task exit to update state(if possible) of the DagRun if the task was able to finish the DagRun while the DAG is paused Co-authored-by: Ash Berlin-Taylor <[email protected]> (cherry picked from commit 3834df6ade22b33addd47e3ab2165a0b282926fa) --- airflow/jobs/local_task_job.py | 15 ++++++++++++++ tests/jobs/test_local_task_job.py | 42 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 9e68450..efd84d6 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -160,6 +160,8 @@ class LocalTaskJob(BaseJob): if self.task_instance.state != State.SUCCESS: error = self.task_runner.deserialize_run_error() self.task_instance._run_finished_callback(error=error) # pylint: disable=protected-access + if not self.task_instance.test_mode: + self._update_dagrun_state_for_paused_dag() def on_kill(self): self.task_runner.terminate() @@ -206,3 +208,16 @@ class LocalTaskJob(BaseJob): error = self.task_runner.deserialize_run_error() or "task marked as failed externally" ti._run_finished_callback(error=error) # pylint: disable=protected-access self.terminating = True + + @provide_session + def _update_dagrun_state_for_paused_dag(self, session=None): + """ + Checks for paused dags with DagRuns in the running state and + update the DagRun state if possible + """ + dag = self.task_instance.task.dag + if dag.get_is_paused(): + dag_run = self.task_instance.get_dagrun(session=session) + if dag_run: + dag_run.dag = dag + dag_run.update_state(session=session, execute_callbacks=True) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 9047f8a..82d85f6 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -44,6 +44,7 @@ from airflow.utils.net import get_hostname from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timeout import timeout +from airflow.utils.types import DagRunType from tests.test_utils.asserts import assert_queries_count from tests.test_utils.db import clear_db_jobs, clear_db_runs from tests.test_utils.mock_executor import MockExecutor @@ -571,6 +572,43 @@ class TestLocalTaskJob(unittest.TestCase): assert task_terminated_externally.value == 1 assert not process.is_alive() + def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self): + """Test that with DAG paused, DagRun state will update when the tasks finishes the run""" + dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE) + op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True) + + session = settings.Session() + orm_dag = DagModel( + dag_id=dag.dag_id, + has_task_concurrency_limits=False, + next_dagrun=dag.start_date, + next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), + is_active=True, + is_paused=True, + ) + session.add(orm_dag) + session.flush() + # Write Dag to DB + dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) + dagbag.bag_dag(dag, root_dag=dag) + dagbag.sync_to_db() + + dr = dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session, + ) + assert dr.state == State.RUNNING + ti = TaskInstance(op1, dr.execution_date) + job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) + job1.task_runner = StandardTaskRunner(job1) + job1.run() + session.add(dr) + session.refresh(dr) + assert dr.state == State.SUCCESS + @pytest.fixture() def clean_db_helper(): @@ -589,12 +627,12 @@ class TestLocalTaskJobPerformance: task = DummyOperator(task_id='test_state_succeeded1', dag=dag) dag.clear() - dag.create_dagrun(run_id=unique_prefix, state=State.NONE) + dag.create_dagrun(run_id=unique_prefix, execution_date=DEFAULT_DATE, state=State.NONE) ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) mock_get_task_runner.return_value.return_code.side_effects = return_codes job = LocalTaskJob(task_instance=ti, executor=MockExecutor()) - with assert_queries_count(15): + with assert_queries_count(16): job.run()
