[AIRFLOW-727] try_number is not increased

A dag that has retries enabled will retry indefinitely
as try_number gets reset to 0 in LocalTaskJob as
task_instance is not fully populated, but nevertheless
saved to the databases.

This was caused by a commit in
https://github.com/apache/incubator-airflow/pull/1939


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6948e40c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6948e40c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6948e40c

Branch: refs/heads/master
Commit: 6948e40cb82295ef30c7bd05b216d2201523f9e2
Parents: c8a4eb3
Author: Bolke de Bruin <[email protected]>
Authored: Mon Jan 2 21:55:01 2017 +0100
Committer: Bolke de Bruin <[email protected]>
Committed: Tue Jan 3 10:28:33 2017 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py                    |  1 +
 tests/dags/test_retry_handling_job.py | 36 ++++++++++++++++++++++++++++++
 tests/jobs.py                         | 23 +++++++++++++++++++
 3 files changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6948e40c/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index eab4a30..fc5a242 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -355,6 +355,7 @@ def run(args, dag=None):
     task = dag.get_task(task_id=args.task_id)
 
     ti = TaskInstance(task, args.execution_date)
+    ti.refresh_from_db()
 
     if args.local:
         print("Logging into: " + filename)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6948e40c/tests/dags/test_retry_handling_job.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_retry_handling_job.py 
b/tests/dags/test_retry_handling_job.py
new file mode 100644
index 0000000..111dfd4
--- /dev/null
+++ b/tests/dags/test_retry_handling_job.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow import DAG
+from airflow.operators.bash_operator import BashOperator
+from datetime import datetime, timedelta
+
+default_args = {
+    'owner': 'airflow',
+    'depends_on_past': False,
+    'start_date': datetime(2016,10,5,19),
+    'email': ['[email protected]'],
+    'email_on_failure': False,
+    'email_on_retry': False,
+    'retries': 4,
+    'retry_delay': timedelta(seconds=0),
+}
+
+dag = DAG('test_retry_handling_job', default_args=default_args, 
schedule_interval='@once')
+
+task1 = BashOperator(
+    task_id='test_retry_handling_op',
+    bash_command='exit 1',
+    dag=dag)
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6948e40c/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index d7dfbe7..32c615d 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -975,6 +975,29 @@ class SchedulerJobTest(unittest.TestCase):
         ti.refresh_from_db()
         self.assertEqual(ti.state, State.QUEUED)
 
+    @unittest.skipUnless("INTEGRATION" in os.environ, "Can only run end to 
end")
+    def test_retry_handling_job(self):
+        """
+        Integration test of the scheduler not accidentally resetting
+        the try_numbers for a task
+        """
+        dag = self.dagbag.get_dag('test_retry_handling_job')
+        dag_task1 = dag.get_task("test_retry_handling_op")
+        dag.clear()
+
+        scheduler = SchedulerJob(dag_id=dag.dag_id,
+                                 num_runs=1)
+        scheduler.heartrate = 0
+        scheduler.run()
+
+        session = settings.Session()
+        ti = session.query(TI).filter(TI.dag_id==dag.dag_id,
+                                      TI.task_id==dag_task1.task_id).first()
+
+        # make sure the counter has increased
+        self.assertEqual(ti.try_number, 2)
+        self.assertEqual(ti.state, State.UP_FOR_RETRY)
+
     def test_scheduler_run_duration(self):
         """
         Verifies that the scheduler run duration limit is followed.

Reply via email to