[AIRFLOW-900] Fixes bugs in LocalTaskJob for double run protection Right now, a second task instance being triggered will cause both itself and the original task to run because the hostname and pid fields are updated regardless if the task is already running. Also, pid field is not refreshed from db properly. Also, we should check against parent's pid.
Will be followed up by working tests. Closes #2102 from saguziel/aguziel-fix-trigger-2 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1243ab16 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1243ab16 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1243ab16 Branch: refs/heads/v1-8-test Commit: 1243ab16849ab9716b26aeba6a11ea3e9e9a81ca Parents: a8f2c27 Author: Alex Guziel <[email protected]> Authored: Sat Mar 11 10:54:39 2017 -0800 Committer: Bolke de Bruin <[email protected]> Committed: Sun Mar 12 08:34:45 2017 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 41 ++++++++++++++----------- airflow/models.py | 2 ++ tests/core.py | 59 ++++++++++++++++++++++++++++++++++++ tests/dags/sleep_forever_dag.py | 29 ++++++++++++++++++ 4 files changed, 113 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index c61b229..222d9ba 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -2072,15 +2072,6 @@ class LocalTaskJob(BaseJob): try: self.task_runner.start() - ti = self.task_instance - session = settings.Session() - if self.task_runner.process: - ti.pid = self.task_runner.process.pid - ti.hostname = socket.getfqdn() - session.merge(ti) - session.commit() - session.close() - last_heartbeat_time = time.time() heartbeat_time_limit = conf.getint('scheduler', 'scheduler_zombie_task_threshold') @@ -2120,6 +2111,18 @@ class LocalTaskJob(BaseJob): self.task_runner.terminate() self.task_runner.on_finish() + def _is_descendant_process(self, pid): + """Checks if pid is a descendant of the current process. + + :param pid: process id to check + :type pid: int + :rtype: bool + """ + try: + return psutil.Process(pid) in psutil.Process().children(recursive=True) + except psutil.NoSuchProcess: + return False + @provide_session def heartbeat_callback(self, session=None): """Self destruct task if state has been moved away from running externally""" @@ -2133,15 +2136,17 @@ class LocalTaskJob(BaseJob): if ti.state == State.RUNNING: self.was_running = True fqdn = socket.getfqdn() - if not (fqdn == ti.hostname and - self.task_runner.process.pid == ti.pid): - logging.warning("Recorded hostname and pid of {ti.hostname} " - "and {ti.pid} do not match this instance's " - "which are {fqdn} and " - "{self.task_runner.process.pid}. " - "Taking the poison pill. So long." - .format(**locals())) - raise AirflowException("Another worker/process is running this job") + if fqdn != ti.hostname: + logging.warning("The recorded hostname {ti.hostname} " + "does not match this instance's hostname " + "{fqdn}".format(**locals())) + raise AirflowException("Hostname of job runner does not match") + elif not self._is_descendant_process(ti.pid): + current_pid = os.getpid() + logging.warning("Recorded pid {ti.pid} is not a " + "descendant of the current pid " + "{current_pid}".format(**locals())) + raise AirflowException("PID of job runner does not match") elif (self.was_running and self.task_runner.return_code() is None and hasattr(self.task_runner, 'process')): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 32c52ac..7c6590f 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -997,6 +997,7 @@ class TaskInstance(Base): self.end_date = ti.end_date self.try_number = ti.try_number self.hostname = ti.hostname + self.pid = ti.pid else: self.state = None @@ -1320,6 +1321,7 @@ class TaskInstance(Base): if not test_mode: session.add(Log(State.RUNNING, self)) self.state = State.RUNNING + self.pid = os.getpid() self.end_date = None if not test_mode: session.merge(self) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index ee7a738..636ad43 100644 --- a/tests/core.py +++ b/tests/core.py @@ -26,6 +26,7 @@ from datetime import datetime, time, timedelta from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication import signal +from time import time as timetime from time import sleep import warnings @@ -895,6 +896,64 @@ class CoreTest(unittest.TestCase): trigger_rule="non_existant", dag=self.dag) + def test_run_task_twice(self): + """If two copies of a TI run, the new one should die, and old should live""" + dagbag = models.DagBag( + dag_folder=TEST_DAG_FOLDER, + include_examples=False, + ) + TI = models.TaskInstance + dag = dagbag.dags.get('sleep_forever_dag') + task = dag.task_dict.get('sleeps_forever') + + ti = TI(task=task, execution_date=DEFAULT_DATE) + job1 = jobs.LocalTaskJob( + task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) + job2 = jobs.LocalTaskJob( + task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) + + p1 = multiprocessing.Process(target=job1.run) + p2 = multiprocessing.Process(target=job2.run) + try: + p1.start() + start_time = timetime() + sleep(5.0) # must wait for session to be created on p1 + settings.engine.dispose() + session = settings.Session() + ti.refresh_from_db(session=session) + self.assertEqual(State.RUNNING, ti.state) + p1pid = ti.pid + settings.engine.dispose() + p2.start() + p2.join(5) # wait 5 seconds until termination + self.assertFalse(p2.is_alive()) + self.assertTrue(p1.is_alive()) + + settings.engine.dispose() + session = settings.Session() + ti.refresh_from_db(session=session) + self.assertEqual(State.RUNNING, ti.state) + self.assertEqual(p1pid, ti.pid) + + # check changing hostname kills task + ti.refresh_from_db(session=session, lock_for_update=True) + ti.hostname = 'nonexistenthostname' + session.merge(ti) + session.commit() + + p1.join(5) + self.assertFalse(p1.is_alive()) + finally: + try: + p1.terminate() + except AttributeError: + pass # process already terminated + try: + p2.terminate() + except AttributeError: + pass # process already terminated + session.close() + def test_terminate_task(self): """If a task instance's db state get deleted, it should fail""" TI = models.TaskInstance http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/tests/dags/sleep_forever_dag.py ---------------------------------------------------------------------- diff --git a/tests/dags/sleep_forever_dag.py b/tests/dags/sleep_forever_dag.py new file mode 100644 index 0000000..b1f810e --- /dev/null +++ b/tests/dags/sleep_forever_dag.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Used for unit tests""" +import airflow +from airflow.operators.bash_operator import BashOperator +from airflow.models import DAG + +dag = DAG( + dag_id='sleep_forever_dag', + schedule_interval=None, +) + +task = BashOperator( + task_id='sleeps_forever', + dag=dag, + bash_command="sleep 10000000000", + start_date=airflow.utils.dates.days_ago(2), + owner='airflow')
