[AIRFLOW-727] try_number is not increased A dag that has retries enabled will retry indefinitely as try_number gets reset to 0 in LocalTaskJob as task_instance is not fully populated, but nevertheless saved to the databases.
This was caused by a commit in https://github.com/apache/incubator-airflow/pull/1939 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6948e40c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6948e40c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6948e40c Branch: refs/heads/master Commit: 6948e40cb82295ef30c7bd05b216d2201523f9e2 Parents: c8a4eb3 Author: Bolke de Bruin <[email protected]> Authored: Mon Jan 2 21:55:01 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Tue Jan 3 10:28:33 2017 +0100 ---------------------------------------------------------------------- airflow/bin/cli.py | 1 + tests/dags/test_retry_handling_job.py | 36 ++++++++++++++++++++++++++++++ tests/jobs.py | 23 +++++++++++++++++++ 3 files changed, 60 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6948e40c/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index eab4a30..fc5a242 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -355,6 +355,7 @@ def run(args, dag=None): task = dag.get_task(task_id=args.task_id) ti = TaskInstance(task, args.execution_date) + ti.refresh_from_db() if args.local: print("Logging into: " + filename) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6948e40c/tests/dags/test_retry_handling_job.py ---------------------------------------------------------------------- diff --git a/tests/dags/test_retry_handling_job.py b/tests/dags/test_retry_handling_job.py new file mode 100644 index 0000000..111dfd4 --- /dev/null +++ b/tests/dags/test_retry_handling_job.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow import DAG +from airflow.operators.bash_operator import BashOperator +from datetime import datetime, timedelta + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2016,10,5,19), + 'email': ['[email protected]'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 4, + 'retry_delay': timedelta(seconds=0), +} + +dag = DAG('test_retry_handling_job', default_args=default_args, schedule_interval='@once') + +task1 = BashOperator( + task_id='test_retry_handling_op', + bash_command='exit 1', + dag=dag) + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6948e40c/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index d7dfbe7..32c615d 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -975,6 +975,29 @@ class SchedulerJobTest(unittest.TestCase): ti.refresh_from_db() self.assertEqual(ti.state, State.QUEUED) + @unittest.skipUnless("INTEGRATION" in os.environ, "Can only run end to end") + def test_retry_handling_job(self): + """ + Integration test of the scheduler not accidentally resetting + the try_numbers for a task + """ + dag = self.dagbag.get_dag('test_retry_handling_job') + dag_task1 = dag.get_task("test_retry_handling_op") + dag.clear() + + scheduler = SchedulerJob(dag_id=dag.dag_id, + num_runs=1) + scheduler.heartrate = 0 + scheduler.run() + + session = settings.Session() + ti = session.query(TI).filter(TI.dag_id==dag.dag_id, + TI.task_id==dag_task1.task_id).first() + + # make sure the counter has increased + self.assertEqual(ti.try_number, 2) + self.assertEqual(ti.state, State.UP_FOR_RETRY) + def test_scheduler_run_duration(self): """ Verifies that the scheduler run duration limit is followed.
