Repository: incubator-airflow Updated Branches: refs/heads/master 922158751 -> 927f30c9b
Revert "[AIRFLOW-779] Task should fail with specific message when deleted" This reverts commit 9221587514e2a0155cdced2d3ae50129b0793a10. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/927f30c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/927f30c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/927f30c9 Branch: refs/heads/master Commit: 927f30c9b15bc3ccd7ea8aa53abf4c9d82b3bef5 Parents: 9221587 Author: Dan Davydov <[email protected]> Authored: Fri Jan 20 14:49:53 2017 -0800 Committer: Dan Davydov <[email protected]> Committed: Fri Jan 20 14:49:53 2017 -0800 ---------------------------------------------------------------------- airflow/jobs.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/927f30c9/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 350c6d4..f1de333 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -2016,6 +2016,10 @@ class LocalTaskJob(BaseJob): self.pickle_id = pickle_id self.mark_success = mark_success + # terminating state is used so that a job don't try to + # terminate multiple times + self.terminating = False + # Keeps track of the fact that the task instance has been observed # as running at least once self.was_running = False @@ -2079,16 +2083,17 @@ class LocalTaskJob(BaseJob): def heartbeat_callback(self, session=None): """Self destruct task if state has been moved away from running externally""" + if self.terminating: + # task is already terminating, let it breathe + return + # Suicide pill TI = models.TaskInstance ti = self.task_instance new_ti = session.query(TI).filter( TI.dag_id == ti.dag_id, TI.task_id == ti.task_id, TI.execution_date == ti.execution_date).scalar() - if new_ti is None: - logging.warning("Task instance does not exist in DB. Terminating") - raise AirflowException("Task instance does not exist in DB") - elif new_ti.state == State.RUNNING: + if new_ti.state == State.RUNNING: self.was_running = True fqdn = socket.getfqdn() if not (fqdn == new_ti.hostname and @@ -2105,4 +2110,5 @@ class LocalTaskJob(BaseJob): "State of this instance has been externally set to " "{self.task_instance.state}. " "Taking the poison pill. So long.".format(**locals())) - raise AirflowException("Task instance state has been changed externally") + self.task_runner.terminate() + self.terminating = True
