kaxil closed pull request #3568: AIRFLOW-1104 Update jobs.py so Airflow does
not over schedule tasks
URL: https://github.com/apache/incubator-airflow/pull/3568
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 224ff185fb..a4252473cd 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1075,9 +1075,6 @@ def _find_executable_task_instances(self, simple_dag_bag,
states, session=None):
:type states: Tuple[State]
:return: List[TaskInstance]
"""
- # TODO(saguziel): Change this to include QUEUED, for concurrency
- # purposes we may want to count queued tasks
- states_to_count_as_running = [State.RUNNING]
executable_tis = []
# Get all the queued task instances from associated with scheduled
@@ -1123,6 +1120,7 @@ def _find_executable_task_instances(self, simple_dag_bag,
states, session=None):
for task_instance in task_instances_to_examine:
pool_to_task_instances[task_instance.pool].append(task_instance)
+ states_to_count_as_running = [State.RUNNING, State.QUEUED]
task_concurrency_map = self.__get_task_concurrency_map(
states=states_to_count_as_running, session=session)
@@ -1173,7 +1171,6 @@ def _find_executable_task_instances(self, simple_dag_bag,
states, session=None):
simple_dag = simple_dag_bag.get_dag(dag_id)
if dag_id not in dag_id_to_possibly_running_task_count:
- # TODO(saguziel): also check against QUEUED state, see
AIRFLOW-1104
dag_id_to_possibly_running_task_count[dag_id] = \
DAG.get_num_task_instances(
dag_id,
diff --git a/tests/jobs.py b/tests/jobs.py
index 93f6574df4..c701214f1e 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1493,6 +1493,39 @@ def
test_find_executable_task_instances_concurrency(self):
self.assertEqual(0, len(res))
+ def test_find_executable_task_instances_concurrency_queued(self):
+ dag_id =
'SchedulerJobTest.test_find_executable_task_instances_concurrency_queued'
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3)
+ task1 = DummyOperator(dag=dag, task_id='dummy1')
+ task2 = DummyOperator(dag=dag, task_id='dummy2')
+ task3 = DummyOperator(dag=dag, task_id='dummy3')
+ dagbag = self._make_simple_dag_bag([dag])
+
+ scheduler = SchedulerJob()
+ session = settings.Session()
+ dag_run = scheduler.create_dag_run(dag)
+
+ ti1 = TI(task1, dag_run.execution_date)
+ ti2 = TI(task2, dag_run.execution_date)
+ ti3 = TI(task3, dag_run.execution_date)
+ ti1.state = State.RUNNING
+ ti2.state = State.QUEUED
+ ti3.state = State.SCHEDULED
+
+ session.merge(ti1)
+ session.merge(ti2)
+ session.merge(ti3)
+
+ session.commit()
+
+ res = scheduler._find_executable_task_instances(
+ dagbag,
+ states=[State.SCHEDULED],
+ session=session)
+
+ self.assertEqual(1, len(res))
+ self.assertEqual(res[0].key, ti3.key)
+
def test_find_executable_task_instances_task_concurrency(self):
dag_id =
'SchedulerJobTest.test_find_executable_task_instances_task_concurrency'
task_id_1 = 'dummy'
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services