[AIRFLOW-931] Do not set QUEUED in TaskInstances The contract of TaskInstances stipulates that end states for Tasks can only be UP_FOR_RETRY, SUCCESS, FAILED, UPSTREAM_FAILED or SKIPPED. If concurrency was reached task instances were set to QUEUED by the task instance themselves. This would prevent the scheduler to pick them up again.
We set the state to NONE now, to ensure integrity. Closes #2127 from bolkedebruin/AIRFLOW-931 (cherry picked from commit e42398100a3248eddb6b511ade73f6a239e58090) Signed-off-by: Bolke de Bruin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4db8f079 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4db8f079 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4db8f079 Branch: refs/heads/v1-8-stable Commit: 4db8f0796642691255b0632d599f33cb9d0ce423 Parents: 3a5a323 Author: Bolke de Bruin <[email protected]> Authored: Thu Mar 9 08:32:46 2017 -0800 Committer: Bolke de Bruin <[email protected]> Committed: Thu Mar 9 08:32:59 2017 -0800 ---------------------------------------------------------------------- airflow/models.py | 27 ++++++++++++++------------- tests/models.py | 13 +++++++++++++ 2 files changed, 27 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4db8f079/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index ba8d051..62457f0 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1291,19 +1291,20 @@ class TaskInstance(Base): verbose=True) if not runnable and not mark_success: - if self.state != State.QUEUED: - # If a task's dependencies are met but it can't be run yet then queue it - # instead - self.state = State.QUEUED - msg = "Queuing attempt {attempt} of {total}".format( - attempt=self.try_number % (task.retries + 1) + 1, - total=task.retries + 1) - logging.info(hr + msg + hr) - - self.queued_dttm = datetime.now() - msg = "Queuing into pool {}".format(self.pool) - logging.info(msg) - session.merge(self) + # FIXME: we might have hit concurrency limits, which means we probably + # have been running prematurely. This should be handled in the + # scheduling mechanism. + self.state = State.NONE + msg = ("FIXME: Rescheduling due to concurrency limits reached at task " + "runtime. Attempt {attempt} of {total}. State set to NONE.").format( + attempt=self.try_number % (task.retries + 1) + 1, + total=task.retries + 1) + logging.warning(hr + msg + hr) + + self.queued_dttm = datetime.now() + msg = "Queuing into pool {}".format(self.pool) + logging.info(msg) + session.merge(self) session.commit() return http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4db8f079/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 868ea36..867e293 100644 --- a/tests/models.py +++ b/tests/models.py @@ -289,6 +289,19 @@ class TaskInstanceTest(unittest.TestCase): dag >> op5 self.assertIs(op5.dag, dag) + @patch.object(DAG, 'concurrency_reached') + def test_requeue_over_concurrency(self, mock_concurrency_reached): + mock_concurrency_reached.return_value = True + + dag = DAG(dag_id='test_requeue_over_concurrency', start_date=DEFAULT_DATE, + max_active_runs=1, concurrency=2) + task = DummyOperator(task_id='test_requeue_over_concurrency_op', dag=dag) + + ti = TI(task=task, execution_date=datetime.datetime.now()) + ti.run() + self.assertEqual(ti.state, models.State.NONE) + + @patch.object(TI, 'pool_full') def test_run_pooling_task(self, mock_pool_full): """
