Repository: incubator-airflow
Updated Branches:
  refs/heads/master abbb4ee5c -> e42398100


[AIRFLOW-931] Do not set QUEUED in TaskInstances

The contract of TaskInstances stipulates that end
states for Tasks
can only be UP_FOR_RETRY, SUCCESS, FAILED,
UPSTREAM_FAILED or
SKIPPED. If concurrency was reached task instances
were set to
QUEUED by the task instance themselves. This would
prevent the
scheduler to pick them up again.

We set the state to NONE now, to ensure integrity.

Closes #2127 from bolkedebruin/AIRFLOW-931


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e4239810
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e4239810
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e4239810

Branch: refs/heads/master
Commit: e42398100a3248eddb6b511ade73f6a239e58090
Parents: abbb4ee
Author: Bolke de Bruin <[email protected]>
Authored: Thu Mar 9 08:32:46 2017 -0800
Committer: Bolke de Bruin <[email protected]>
Committed: Thu Mar 9 08:32:46 2017 -0800

----------------------------------------------------------------------
 airflow/models.py | 27 ++++++++++++++-------------
 tests/models.py   | 13 +++++++++++++
 2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4239810/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 37f8823..b9fda9f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1288,19 +1288,20 @@ class TaskInstance(Base):
             verbose=True)
 
         if not runnable and not mark_success:
-            if self.state != State.QUEUED:
-                # If a task's dependencies are met but it can't be run yet 
then queue it
-                # instead
-                self.state = State.QUEUED
-                msg = "Queuing attempt {attempt} of {total}".format(
-                    attempt=self.try_number % (task.retries + 1) + 1,
-                    total=task.retries + 1)
-                logging.info(hr + msg + hr)
-
-                self.queued_dttm = datetime.now()
-                msg = "Queuing into pool {}".format(self.pool)
-                logging.info(msg)
-                session.merge(self)
+            # FIXME: we might have hit concurrency limits, which means we 
probably
+            # have been running prematurely. This should be handled in the
+            # scheduling mechanism.
+            self.state = State.NONE
+            msg = ("FIXME: Rescheduling due to concurrency limits reached at 
task "
+                   "runtime. Attempt {attempt} of {total}. State set to 
NONE.").format(
+                attempt=self.try_number % (task.retries + 1) + 1,
+                total=task.retries + 1)
+            logging.warning(hr + msg + hr)
+
+            self.queued_dttm = datetime.now()
+            msg = "Queuing into pool {}".format(self.pool)
+            logging.info(msg)
+            session.merge(self)
             session.commit()
             return
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4239810/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index d758972..9da6f16 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -366,6 +366,19 @@ class TaskInstanceTest(unittest.TestCase):
         dag >> op5
         self.assertIs(op5.dag, dag)
 
+    @patch.object(DAG, 'concurrency_reached')
+    def test_requeue_over_concurrency(self, mock_concurrency_reached):
+        mock_concurrency_reached.return_value = True
+
+        dag = DAG(dag_id='test_requeue_over_concurrency', 
start_date=DEFAULT_DATE,
+                  max_active_runs=1, concurrency=2)
+        task = DummyOperator(task_id='test_requeue_over_concurrency_op', 
dag=dag)
+
+        ti = TI(task=task, execution_date=datetime.datetime.now())
+        ti.run()
+        self.assertEqual(ti.state, models.State.NONE)
+
+
     @patch.object(TI, 'pool_full')
     def test_run_pooling_task(self, mock_pool_full):
         """

Reply via email to