ashb closed pull request #3990: [AIRFLOW-2951] Update dag_run table end_date
when state change
URL: https://github.com/apache/incubator-airflow/pull/3990
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/models.py b/airflow/models.py
index 8fc259d1b5..428923ff9e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4845,6 +4845,8 @@ def get_state(self):
def set_state(self, state):
if self._state != state:
self._state = state
+ self.end_date = timezone.utcnow() if self._state in
State.finished() else None
+
if self.dag_id is not None:
# FIXME: Due to the scoped_session factor we we don't get a
clean
# session here, so something really weird goes on:
@@ -5068,7 +5070,7 @@ def update_state(self, session=None):
if (not unfinished_tasks and
any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r
in roots)):
self.log.info('Marking run %s failed', self)
- self.state = State.FAILED
+ self.set_state(State.FAILED)
dag.handle_callback(self, success=False, reason='task_failure',
session=session)
@@ -5076,20 +5078,20 @@ def update_state(self, session=None):
elif not unfinished_tasks and all(r.state in (State.SUCCESS,
State.SKIPPED)
for r in roots):
self.log.info('Marking run %s successful', self)
- self.state = State.SUCCESS
+ self.set_state(State.SUCCESS)
dag.handle_callback(self, success=True, reason='success',
session=session)
# if *all tasks* are deadlocked, the run failed
elif (unfinished_tasks and none_depends_on_past and
none_task_concurrency and no_dependencies_met):
self.log.info('Deadlock; marking run %s failed', self)
- self.state = State.FAILED
+ self.set_state(State.FAILED)
dag.handle_callback(self, success=False,
reason='all_tasks_deadlocked',
session=session)
# finally, if the roots aren't done, the dag is still running
else:
- self.state = State.RUNNING
+ self.set_state(State.RUNNING)
# todo: determine we want to use with_for_update to make sure to lock
the run
session.merge(self)
diff --git a/tests/models.py b/tests/models.py
index 60aee3c84f..55fa41bd90 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -915,6 +915,124 @@ def on_failure_callable(context):
updated_dag_state = dag_run.update_state()
self.assertEqual(State.FAILED, updated_dag_state)
+ def test_dagrun_set_state_end_date(self):
+ session = settings.Session()
+
+ dag = DAG(
+ 'test_dagrun_set_state_end_date',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ dag.clear()
+
+ now = timezone.utcnow()
+ dr = dag.create_dagrun(run_id='test_dagrun_set_state_end_date',
+ state=State.RUNNING,
+ execution_date=now,
+ start_date=now)
+
+ # Initial end_date should be NULL
+ # State.SUCCESS and State.FAILED are all ending state and should set
end_date
+ # State.RUNNING set end_date back to NULL
+ session.add(dr)
+ session.commit()
+ self.assertIsNone(dr.end_date)
+
+ dr.set_state(State.SUCCESS)
+ session.merge(dr)
+ session.commit()
+
+ dr_database = session.query(DagRun).filter(
+ DagRun.run_id == 'test_dagrun_set_state_end_date'
+ ).one()
+ self.assertIsNotNone(dr_database.end_date)
+ self.assertEqual(dr.end_date, dr_database.end_date)
+
+ dr.set_state(State.RUNNING)
+ session.merge(dr)
+ session.commit()
+
+ dr_database = session.query(DagRun).filter(
+ DagRun.run_id == 'test_dagrun_set_state_end_date'
+ ).one()
+
+ self.assertIsNone(dr_database.end_date)
+
+ dr.set_state(State.FAILED)
+ session.merge(dr)
+ session.commit()
+ dr_database = session.query(DagRun).filter(
+ DagRun.run_id == 'test_dagrun_set_state_end_date'
+ ).one()
+
+ self.assertIsNotNone(dr_database.end_date)
+ self.assertEqual(dr.end_date, dr_database.end_date)
+
+ def test_dagrun_update_state_end_date(self):
+ session = settings.Session()
+
+ dag = DAG(
+ 'test_dagrun_update_state_end_date',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ # A -> B
+ with dag:
+ op1 = DummyOperator(task_id='A')
+ op2 = DummyOperator(task_id='B')
+ op1.set_upstream(op2)
+
+ dag.clear()
+
+ now = timezone.utcnow()
+ dr = dag.create_dagrun(run_id='test_dagrun_update_state_end_date',
+ state=State.RUNNING,
+ execution_date=now,
+ start_date=now)
+
+ # Initial end_date should be NULL
+ # State.SUCCESS and State.FAILED are all ending state and should set
end_date
+ # State.RUNNING set end_date back to NULL
+ session.merge(dr)
+ session.commit()
+ self.assertIsNone(dr.end_date)
+
+ ti_op1 = dr.get_task_instance(task_id=op1.task_id)
+ ti_op1.set_state(state=State.SUCCESS, session=session)
+ ti_op2 = dr.get_task_instance(task_id=op2.task_id)
+ ti_op2.set_state(state=State.SUCCESS, session=session)
+
+ dr.update_state()
+
+ dr_database = session.query(DagRun).filter(
+ DagRun.run_id == 'test_dagrun_update_state_end_date'
+ ).one()
+ self.assertIsNotNone(dr_database.end_date)
+ self.assertEqual(dr.end_date, dr_database.end_date)
+
+ ti_op1.set_state(state=State.RUNNING, session=session)
+ ti_op2.set_state(state=State.RUNNING, session=session)
+ dr.update_state()
+
+ dr_database = session.query(DagRun).filter(
+ DagRun.run_id == 'test_dagrun_update_state_end_date'
+ ).one()
+
+ self.assertEqual(dr._state, State.RUNNING)
+ self.assertIsNone(dr.end_date)
+ self.assertIsNone(dr_database.end_date)
+
+ ti_op1.set_state(state=State.FAILED, session=session)
+ ti_op2.set_state(state=State.FAILED, session=session)
+ dr.update_state()
+
+ dr_database = session.query(DagRun).filter(
+ DagRun.run_id == 'test_dagrun_update_state_end_date'
+ ).one()
+
+ self.assertIsNotNone(dr_database.end_date)
+ self.assertEqual(dr.end_date, dr_database.end_date)
+
def test_get_task_instance_on_empty_dagrun(self):
"""
Make sure that a proper value is returned when a dagrun has no task
instances
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services