Repository: incubator-airflow Updated Branches: refs/heads/master ac9167f37 -> 5479ac8d4
[AIRFLOW-798] Check return_code before forcing termination LocalTaskJob could still log an error en self destruct, although the underlying process already exited. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/24d641bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/24d641bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/24d641bc Branch: refs/heads/master Commit: 24d641bc106c112f86771bd394d877dd4df578f9 Parents: a2b0ea3 Author: Bolke de Bruin <[email protected]> Authored: Tue Jan 24 12:01:44 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Tue Jan 24 16:40:22 2017 +0100 ---------------------------------------------------------------------- airflow/jobs.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/24d641bc/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 8bb93bb..978fc35 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -2081,28 +2081,25 @@ class LocalTaskJob(BaseJob): # task is already terminating, let it breathe return - # Suicide pill - TI = models.TaskInstance + self.task_instance.refresh_from_db() ti = self.task_instance - new_ti = session.query(TI).filter( - TI.dag_id == ti.dag_id, TI.task_id == ti.task_id, - TI.execution_date == ti.execution_date).scalar() - if new_ti.state == State.RUNNING: + if ti.state == State.RUNNING: self.was_running = True fqdn = socket.getfqdn() - if not (fqdn == new_ti.hostname and - self.task_runner.process.pid == new_ti.pid): - logging.warning("Recorded hostname and pid of {new_ti.hostname} " - "and {new_ti.pid} do not match this instance's " + if not (fqdn == ti.hostname and + self.task_runner.process.pid == ti.pid): + logging.warning("Recorded hostname and pid of {ti.hostname} " + "and {ti.pid} do not match this instance's " "which are {fqdn} and " - "{self.task_runner.process.pid}. Taking the poison pill. " - "So long." + "{self.task_runner.process.pid}. " + "Taking the poison pill. So long." .format(**locals())) raise AirflowException("Another worker/process is running this job") - elif self.was_running and hasattr(self.task_runner, 'process'): + elif (self.was_running + and self.task_runner.return_code() is None + and hasattr(self.task_runner, 'process')): logging.warning( "State of this instance has been externally set to " - "{self.task_instance.state}. " - "Taking the poison pill. So long.".format(**locals())) + "{}. Taking the poison pill. So long.".format(ti.state)) self.task_runner.terminate() self.terminating = True
