[AIRFLOW-900] Double trigger should not kill original task instance This update the tests of an earlier AIRFLOW-900.
Closes #2146 from bolkedebruin/AIRFLOW-900 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2b26a5d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2b26a5d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2b26a5d9 Branch: refs/heads/v1-8-test Commit: 2b26a5d95ce230b66255c8e7e7388c8013dc6ba6 Parents: 57faa53 Author: Bolke de Bruin <[email protected]> Authored: Sat Mar 11 13:42:58 2017 -0800 Committer: Bolke de Bruin <[email protected]> Committed: Sun Mar 12 08:36:07 2017 -0700 ---------------------------------------------------------------------- tests/core.py | 58 ----------------------- tests/dags/sleep_forever_dag.py | 29 ------------ tests/dags/test_double_trigger.py | 29 ++++++++++++ tests/jobs.py | 86 ++++++++++++++++++++++++++++++++-- 4 files changed, 112 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 636ad43..870a0cb 100644 --- a/tests/core.py +++ b/tests/core.py @@ -896,64 +896,6 @@ class CoreTest(unittest.TestCase): trigger_rule="non_existant", dag=self.dag) - def test_run_task_twice(self): - """If two copies of a TI run, the new one should die, and old should live""" - dagbag = models.DagBag( - dag_folder=TEST_DAG_FOLDER, - include_examples=False, - ) - TI = models.TaskInstance - dag = dagbag.dags.get('sleep_forever_dag') - task = dag.task_dict.get('sleeps_forever') - - ti = TI(task=task, execution_date=DEFAULT_DATE) - job1 = jobs.LocalTaskJob( - task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) - job2 = jobs.LocalTaskJob( - task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) - - p1 = multiprocessing.Process(target=job1.run) - p2 = multiprocessing.Process(target=job2.run) - try: - p1.start() - start_time = timetime() - sleep(5.0) # must wait for session to be created on p1 - settings.engine.dispose() - session = settings.Session() - ti.refresh_from_db(session=session) - self.assertEqual(State.RUNNING, ti.state) - p1pid = ti.pid - settings.engine.dispose() - p2.start() - p2.join(5) # wait 5 seconds until termination - self.assertFalse(p2.is_alive()) - self.assertTrue(p1.is_alive()) - - settings.engine.dispose() - session = settings.Session() - ti.refresh_from_db(session=session) - self.assertEqual(State.RUNNING, ti.state) - self.assertEqual(p1pid, ti.pid) - - # check changing hostname kills task - ti.refresh_from_db(session=session, lock_for_update=True) - ti.hostname = 'nonexistenthostname' - session.merge(ti) - session.commit() - - p1.join(5) - self.assertFalse(p1.is_alive()) - finally: - try: - p1.terminate() - except AttributeError: - pass # process already terminated - try: - p2.terminate() - except AttributeError: - pass # process already terminated - session.close() - def test_terminate_task(self): """If a task instance's db state get deleted, it should fail""" TI = models.TaskInstance http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/dags/sleep_forever_dag.py ---------------------------------------------------------------------- diff --git a/tests/dags/sleep_forever_dag.py b/tests/dags/sleep_forever_dag.py deleted file mode 100644 index b1f810e..0000000 --- a/tests/dags/sleep_forever_dag.py +++ /dev/null @@ -1,29 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Used for unit tests""" -import airflow -from airflow.operators.bash_operator import BashOperator -from airflow.models import DAG - -dag = DAG( - dag_id='sleep_forever_dag', - schedule_interval=None, -) - -task = BashOperator( - task_id='sleeps_forever', - dag=dag, - bash_command="sleep 10000000000", - start_date=airflow.utils.dates.days_ago(2), - owner='airflow') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/dags/test_double_trigger.py ---------------------------------------------------------------------- diff --git a/tests/dags/test_double_trigger.py b/tests/dags/test_double_trigger.py new file mode 100644 index 0000000..b58f5c9 --- /dev/null +++ b/tests/dags/test_double_trigger.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import datetime + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator + +DEFAULT_DATE = datetime(2016, 1, 1) + +args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE, +} + +dag = DAG(dag_id='test_localtaskjob_double_trigger', default_args=args) +task = DummyOperator( + task_id='test_localtaskjob_double_trigger_task', + dag=dag) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index d208fd4..aee0e9c 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -23,12 +23,13 @@ import os import shutil import unittest import six -import sys +import socket from tempfile import mkdtemp from airflow import AirflowException, settings, models from airflow.bin import cli -from airflow.jobs import BackfillJob, SchedulerJob +from airflow.executors import SequentialExecutor +from airflow.jobs import BackfillJob, SchedulerJob, LocalTaskJob from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator @@ -36,8 +37,12 @@ from airflow.utils.db import provide_session from airflow.utils.state import State from airflow.utils.timeout import timeout from airflow.utils.dag_processing import SimpleDagBag + from mock import patch -from tests.executor.test_executor import TestExecutor +from sqlalchemy.orm.session import make_transient +from tests.executors.test_executor import TestExecutor + +from tests.core import TEST_DAG_FOLDER from airflow import configuration configuration.load_test_config() @@ -344,6 +349,81 @@ class BackfillJobTest(unittest.TestCase): self.assertEqual(State.NONE, ti.state) +class LocalTaskJobTest(unittest.TestCase): + def setUp(self): + pass + + @patch.object(LocalTaskJob, "_is_descendant_process") + def test_localtaskjob_heartbeat(self, is_descendant): + session = settings.Session() + dag = DAG( + 'test_localtaskjob_heartbeat', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + with dag: + op1 = DummyOperator(task_id='op1') + + dag.clear() + dr = dag.create_dagrun(run_id="test", + state=State.SUCCESS, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session) + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + ti.state = State.RUNNING + ti.hostname = "blablabla" + session.commit() + + job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) + self.assertRaises(AirflowException, job1.heartbeat_callback) + + is_descendant.return_value = True + ti.state = State.RUNNING + ti.hostname = socket.getfqdn() + ti.pid = 1 + session.merge(ti) + session.commit() + + ret = job1.heartbeat_callback() + self.assertEqual(ret, None) + + is_descendant.return_value = False + self.assertRaises(AirflowException, job1.heartbeat_callback) + + def test_localtaskjob_double_trigger(self): + dagbag = models.DagBag( + dag_folder=TEST_DAG_FOLDER, + include_examples=False, + ) + dag = dagbag.dags.get('test_localtaskjob_double_trigger') + task = dag.get_task('test_localtaskjob_double_trigger_task') + + session = settings.Session() + + dag.clear() + dr = dag.create_dagrun(run_id="test", + state=State.SUCCESS, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session) + ti = dr.get_task_instance(task_id=task.task_id, session=session) + ti.state = State.RUNNING + ti.hostname = socket.getfqdn() + ti.pid = 1 + session.commit() + + ti_run = TI(task=task, execution_date=DEFAULT_DATE) + job1 = LocalTaskJob(task_instance=ti_run, ignore_ti_state=True, executor=SequentialExecutor()) + self.assertRaises(AirflowException, job1.run) + + ti = dr.get_task_instance(task_id=task.task_id, session=session) + self.assertEqual(ti.pid, 1) + self.assertEqual(ti.state, State.RUNNING) + + session.close() + + class SchedulerJobTest(unittest.TestCase): # These defaults make the test faster to run default_scheduler_args = {"file_process_interval": 0,
