Repository: incubator-airflow Updated Branches: refs/heads/master d243c003b -> 7c0f8373f
[AIRFLOW-234] make task that aren't `running` self-terminate Closes #1585 from mistercrunch/undeads Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c0f8373 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c0f8373 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c0f8373 Branch: refs/heads/master Commit: 7c0f8373f59b0554d5ba15bb0e5e8669f0830313 Parents: d243c00 Author: Maxime Beauchemin <[email protected]> Authored: Fri Jun 17 14:30:55 2016 -0700 Committer: Maxime Beauchemin <[email protected]> Committed: Fri Jun 17 14:30:55 2016 -0700 ---------------------------------------------------------------------- airflow/example_dags/docker_copy_data.py | 13 ++++++ airflow/example_dags/example_bash_operator.py | 13 ++++++ airflow/example_dags/example_branch_operator.py | 13 ++++++ airflow/example_dags/example_docker_operator.py | 13 ++++++ airflow/example_dags/example_http_operator.py | 13 ++++++ airflow/example_dags/example_python_operator.py | 13 ++++++ .../example_short_circuit_operator.py | 13 ++++++ airflow/example_dags/example_subdag_operator.py | 13 ++++++ .../example_trigger_controller_dag.py | 14 +++++- .../example_dags/example_trigger_target_dag.py | 13 ++++++ airflow/example_dags/example_xcom.py | 13 ++++++ airflow/example_dags/test_utils.py | 29 ++++++++++++ airflow/jobs.py | 46 ++++++++++++-------- airflow/models.py | 27 +++++++++--- tests/core.py | 42 +++++++++++++++++- 15 files changed, 263 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/docker_copy_data.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/docker_copy_data.py b/airflow/example_dags/docker_copy_data.py index ccf84c1..f0789b1 100644 --- a/airflow/example_dags/docker_copy_data.py +++ b/airflow/example_dags/docker_copy_data.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. ''' This sample "listen to directory". move the new file and print it, using docker-containers. The following operators are being used: DockerOperator, BashOperator & ShortCircuitOperator. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_bash_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 4ab9144..c759f4d 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from builtins import range from airflow.operators import BashOperator, DummyOperator from airflow.models import DAG http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_branch_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index f576d20..edd177a 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from airflow.operators import BranchPythonOperator, DummyOperator from airflow.models import DAG from datetime import datetime, timedelta http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_docker_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_docker_operator.py b/airflow/example_dags/example_docker_operator.py index e014fe5..6bb71ff 100644 --- a/airflow/example_dags/example_docker_operator.py +++ b/airflow/example_dags/example_docker_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """ from airflow import DAG from airflow.operators import BashOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_http_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py index 4501825..41ea385 100644 --- a/airflow/example_dags/example_http_operator.py +++ b/airflow/example_dags/example_http_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """ ### Example HTTP operator and sensor """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_python_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 0fc2180..a2f8abd 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from __future__ import print_function from builtins import range from airflow.operators import PythonOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_short_circuit_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index 967c65e..907cf51 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from airflow.operators import ShortCircuitOperator, DummyOperator from airflow.models import DAG import airflow.utils.helpers http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_subdag_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py index 120f333..57a62c6 100644 --- a/airflow/example_dags/example_subdag_operator.py +++ b/airflow/example_dags/example_subdag_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from datetime import datetime from airflow.models import DAG http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_trigger_controller_dag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index 4b66ad1..b754d64 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -1,4 +1,16 @@ - +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """This example illustrates the use of the TriggerDagRunOperator. There are 2 entities at work in this scenario: 1. The Controller DAG - the DAG that conditionally executes the trigger http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_trigger_target_dag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 172003f..41a3e36 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from airflow.operators import BashOperator, PythonOperator from airflow.models import DAG from datetime import datetime http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_xcom.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 2d9c087..71cd44e 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from __future__ import print_function import airflow from datetime import datetime, timedelta http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/test_utils.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py new file mode 100644 index 0000000..38e50d0 --- /dev/null +++ b/airflow/example_dags/test_utils.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Used for unit tests""" +from airflow.operators import BashOperator +from airflow.models import DAG +from datetime import datetime + +dag = DAG( + dag_id='test_utils', + schedule_interval=None, +) + +task = BashOperator( + task_id='sleeps_forever', + dag=dag, + bash_command="sleep 10000000000", + start_date=datetime(2016, 1, 1), + owner='airflow') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 1e583ac..0713bbe 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -19,7 +19,7 @@ from __future__ import unicode_literals from past.builtins import basestring from collections import defaultdict, Counter -from datetime import datetime +from datetime import datetime, timedelta import getpass import logging import socket @@ -116,7 +116,7 @@ class BaseJob(Base, LoggingMixin): ''' pass - def heartbeat_callback(self): + def heartbeat_callback(self, session=None): pass def heartbeat(self): @@ -139,7 +139,7 @@ class BaseJob(Base, LoggingMixin): sleep at all. ''' session = settings.Session() - job = session.query(BaseJob).filter(BaseJob.id == self.id).first() + job = session.query(BaseJob).filter_by(id=self.id).one() if job.state == State.SHUTDOWN: self.kill() @@ -154,9 +154,9 @@ class BaseJob(Base, LoggingMixin): session.merge(job) session.commit() - session.close() - self.heartbeat_callback() + self.heartbeat_callback(session=session) + session.close() self.logger.debug('[heart] Boom.') def run(self): @@ -378,7 +378,8 @@ class SchedulerJob(BaseJob): filename=filename, stacktrace=stacktrace)) session.commit() - def schedule_dag(self, dag): + @provide_session + def schedule_dag(self, dag, session=None): """ This method checks whether a new DagRun needs to be created for a DAG based on scheduling interval @@ -386,7 +387,6 @@ class SchedulerJob(BaseJob): """ if dag.schedule_interval: DagRun = models.DagRun - session = settings.Session() active_runs = DagRun.find( dag_id=dag.dag_id, state=State.RUNNING, @@ -799,10 +799,10 @@ class SchedulerJob(BaseJob): finally: settings.Session.remove() executor.end() - session.close() - def heartbeat_callback(self): + @provide_session + def heartbeat_callback(self, session=None): Stats.gauge('scheduler_heartbeat', 1, 1) @@ -1093,6 +1093,15 @@ class LocalTaskJob(BaseJob): self.pool = pool self.pickle_id = pickle_id self.mark_success = mark_success + + # terminating state is used so that a job don't try to + # terminate multiple times + self.terminating = False + + # Keeps track of the fact that the task instance has been observed + # as running at least once + self.was_running = False + super(LocalTaskJob, self).__init__(*args, **kwargs) def _execute(self): @@ -1115,23 +1124,26 @@ class LocalTaskJob(BaseJob): def on_kill(self): self.process.terminate() - """ - def heartbeat_callback(self): - if datetime.now() - self.start_date < timedelta(seconds=300): + @provide_session + def heartbeat_callback(self, session=None): + """Self destruct task if state has been moved away from running externally""" + + if self.terminating: + # task is already terminating, let it breathe return + # Suicide pill TI = models.TaskInstance ti = self.task_instance - session = settings.Session() state = session.query(TI.state).filter( TI.dag_id==ti.dag_id, TI.task_id==ti.task_id, TI.execution_date==ti.execution_date).scalar() - session.commit() - session.close() - if state != State.RUNNING: + if state == State.RUNNING: + self.was_running = True + elif self.was_running and hasattr(self, 'process'): logging.warning( "State of this instance has been externally set to " "{self.task_instance.state}. " "Taking the poison pill. So long.".format(**locals())) self.process.terminate() - """ + self.terminating = True http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 38359f7..09d880e 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -1179,17 +1179,32 @@ class TaskInstance(Base): def run( self, verbose=True, - ignore_dependencies=False, # Doesn't check for deps, just runs - ignore_depends_on_past=False, # Ignore depends_on_past but respect - # other deps - force=False, # Disregards previous successes - mark_success=False, # Don't run the task, act as if it succeeded - test_mode=False, # Doesn't record success or failure in the DB + ignore_dependencies=False, + ignore_depends_on_past=False, + force=False, + mark_success=False, + test_mode=False, job_id=None, pool=None, session=None): """ Runs the task instance. + + :param verbose: whether to turn on more verbose loggin + :type verbose: boolean + :param ignore_dependencies: Doesn't check for deps, just runs + :type ignore_dependencies: boolean + :param ignore_depends_on_past: Ignore depends_on_past but respect + other dependencies + :type ignore_depends_on_past: boolean + :param force: Forces a run regarless of previous success + :type force: boolean + :param mark_success: Don't run the task, mark its state as success + :type mark_success: boolean + :param test_mode: Doesn't record success or failure in the DB + :type test_mode: boolean + :param pool: specifies the pool to use to run the task instance + :type pool: str """ task = self.task self.pool = pool or task.pool http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 5e6a4fd..2ab14ea 100644 --- a/tests/core.py +++ b/tests/core.py @@ -19,6 +19,7 @@ import json import os import re import unittest +import multiprocessing import mock import tempfile from datetime import datetime, time, timedelta @@ -49,7 +50,7 @@ from airflow.configuration import AirflowConfigException import six -NUM_EXAMPLE_DAGS = 15 +NUM_EXAMPLE_DAGS = 16 DEV_NULL = '/dev/null' TEST_DAG_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'dags') @@ -629,6 +630,45 @@ class CoreTest(unittest.TestCase): trigger_rule="non_existant", dag=self.dag) + def test_terminate_task(self): + """If a task instance's db state get deleted, it should fail""" + TI = models.TaskInstance + dag = self.dagbag.dags.get('test_utils') + task = dag.task_dict.get('sleeps_forever') + + ti = TI(task=task, execution_date=DEFAULT_DATE) + job = jobs.LocalTaskJob( + task_instance=ti, force=True, executor=SequentialExecutor()) + + # Running task instance asynchronously + p = multiprocessing.Process(target=job.run) + p.start() + sleep(5) + settings.engine.dispose() + session = settings.Session() + ti.refresh_from_db(session=session) + # making sure it's actually running + assert State.RUNNING == ti.state + ti = ( + session.query(TI) + .filter_by( + dag_id=task.dag_id, + task_id=task.task_id, + execution_date=DEFAULT_DATE) + .one() + ) + # deleting the instance should result in a failure + session.delete(ti) + session.commit() + # waiting for the async task to finish + p.join() + + # making sure that the task ended up as failed + ti.refresh_from_db(session=session) + assert State.FAILED == ti.state + session.close() + + class CliTests(unittest.TestCase): def setUp(self): configuration.test_mode()
