[ https://issues.apache.org/jira/browse/AIRFLOW-2951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599403#comment-16599403 ]
ASF GitHub Bot commented on AIRFLOW-2951: ----------------------------------------- kaxil closed pull request #3798: [AIRFLOW-2951] Update dag_run table end_date when state change URL: https://github.com/apache/incubator-airflow/pull/3798 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index 88c5275f5a..2fac1254cd 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -208,6 +208,7 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None): dr.state = state if state == State.RUNNING: dr.start_date = timezone.utcnow() + dr.end_date = None else: dr.end_date = timezone.utcnow() session.commit() diff --git a/airflow/models.py b/airflow/models.py index 55badf4828..6c8031c18c 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4840,6 +4840,8 @@ def get_state(self): def set_state(self, state): if self._state != state: self._state = state + self.end_date = timezone.utcnow() if self._state in State.finished() else None + if self.dag_id is not None: # FIXME: Due to the scoped_session factor we we don't get a clean # session here, so something really weird goes on: @@ -5063,7 +5065,7 @@ def update_state(self, session=None): if (not unfinished_tasks and any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)): self.log.info('Marking run %s failed', self) - self.state = State.FAILED + self.set_state(State.FAILED) dag.handle_callback(self, success=False, reason='task_failure', session=session) @@ -5071,20 +5073,20 @@ def update_state(self, session=None): elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED) for r in roots): self.log.info('Marking run %s successful', self) - self.state = State.SUCCESS + self.set_state(State.SUCCESS) dag.handle_callback(self, success=True, reason='success', session=session) # if *all tasks* are deadlocked, the run failed elif (unfinished_tasks and none_depends_on_past and none_task_concurrency and no_dependencies_met): self.log.info('Deadlock; marking run %s failed', self) - self.state = State.FAILED + self.set_state(State.FAILED) dag.handle_callback(self, success=False, reason='all_tasks_deadlocked', session=session) # finally, if the roots aren't done, the dag is still running else: - self.state = State.RUNNING + self.set_state(State.RUNNING) # todo: determine we want to use with_for_update to make sure to lock the run session.merge(self) diff --git a/tests/models.py b/tests/models.py index a1fd1e9912..7adeb3acdd 100644 --- a/tests/models.py +++ b/tests/models.py @@ -896,6 +896,124 @@ def on_failure_callable(context): updated_dag_state = dag_run.update_state() self.assertEqual(State.FAILED, updated_dag_state) + def test_dagrun_set_state_end_date(self): + session = settings.Session() + + dag = DAG( + 'test_dagrun_set_state_end_date', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + dag.clear() + + now = timezone.utcnow() + dr = dag.create_dagrun(run_id='test_dagrun_set_state_end_date', + state=State.RUNNING, + execution_date=now, + start_date=now) + + # Initial end_date should be NULL + # State.SUCCESS and State.FAILED are all ending state and should set end_date + # State.RUNNING set end_date back to NULL + session.add(dr) + session.commit() + self.assertIsNone(dr.end_date) + + dr.set_state(State.SUCCESS) + session.merge(dr) + session.commit() + + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_set_state_end_date' + ).one() + self.assertIsNotNone(dr_database.end_date) + self.assertEqual(dr.end_date, dr_database.end_date) + + dr.set_state(State.RUNNING) + session.merge(dr) + session.commit() + + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_set_state_end_date' + ).one() + + self.assertIsNone(dr_database.end_date) + + dr.set_state(State.FAILED) + session.merge(dr) + session.commit() + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_set_state_end_date' + ).one() + + self.assertIsNotNone(dr_database.end_date) + self.assertEqual(dr.end_date, dr_database.end_date) + + def test_dagrun_update_state_end_date(self): + session = settings.Session() + + dag = DAG( + 'test_dagrun_update_state_end_date', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + # A -> B + with dag: + op1 = DummyOperator(task_id='A') + op2 = DummyOperator(task_id='B') + op1.set_upstream(op2) + + dag.clear() + + now = timezone.utcnow() + dr = dag.create_dagrun(run_id='test_dagrun_update_state_end_date', + state=State.RUNNING, + execution_date=now, + start_date=now) + + # Initial end_date should be NULL + # State.SUCCESS and State.FAILED are all ending state and should set end_date + # State.RUNNING set end_date back to NULL + session.merge(dr) + session.commit() + self.assertIsNone(dr.end_date) + + ti_op1 = dr.get_task_instance(task_id=op1.task_id) + ti_op1.set_state(state=State.SUCCESS, session=session) + ti_op2 = dr.get_task_instance(task_id=op2.task_id) + ti_op2.set_state(state=State.SUCCESS, session=session) + + dr.update_state() + + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_update_state_end_date' + ).one() + self.assertIsNotNone(dr_database.end_date) + self.assertEqual(dr.end_date, dr_database.end_date) + + ti_op1.set_state(state=State.RUNNING, session=session) + ti_op2.set_state(state=State.RUNNING, session=session) + dr.update_state() + + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_update_state_end_date' + ).one() + + self.assertEqual(dr._state, State.RUNNING) + self.assertIsNone(dr.end_date) + self.assertIsNone(dr_database.end_date) + + ti_op1.set_state(state=State.FAILED, session=session) + ti_op2.set_state(state=State.FAILED, session=session) + dr.update_state() + + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_update_state_end_date' + ).one() + + self.assertIsNotNone(dr_database.end_date) + self.assertEqual(dr.end_date, dr_database.end_date) + def test_get_task_instance_on_empty_dagrun(self): """ Make sure that a proper value is returned when a dagrun has no task instances ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > dag_run end_date Null after a dag is finished > --------------------------------------------- > > Key: AIRFLOW-2951 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2951 > Project: Apache Airflow > Issue Type: Improvement > Components: DagRun > Reporter: Yingbo Wang > Assignee: Yingbo Wang > Priority: Major > > dag_run table should have an end_date updated when a dag is finished. > Currently only user activated dag termination request coming from UI may > change the "end_date" in dag_run table. All scheduled dags that are > automatically running by airflow will leave a NULL value after they fall into > a "success" or "failed" state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)