eladkal commented on code in PR #32122:
URL: https://github.com/apache/airflow/pull/32122#discussion_r1276092489
##########
tests/jobs/test_scheduler_job.py:
##########
@@ -1137,6 +1138,142 @@ def
test_find_executable_task_instances_max_active_tis_per_dag(self, dag_maker):
assert 1 == len(res)
session.rollback()
+ @pytest.mark.parametrize(
+
"concurrency_limit,mapped_tis_state,schedule_count,schedule_tids,queued_count,queued_tids",
+ [
+ [
+ 1,
+ dict(),
+ 5,
+ {"tg.dummy1"},
+ 1,
+ {"tg.dummy1"},
+ ],
+ [
+ 1,
+ {0: (State.RUNNING, 30, None)},
+ 4,
+ {"tg.dummy1"},
+ 0,
+ set(),
+ ],
+ [
+ 1,
+ {0: (State.SUCCESS, 30, 15)},
+ 5,
+ {"tg.dummy1", "tg.dummy2"},
+ 1,
+ {"tg.dummy2"},
+ ],
+ [
+ 2,
+ dict(),
+ 5,
+ {"tg.dummy1"},
+ 2,
+ {"tg.dummy1"},
+ ],
+ [
+ 2,
+ {0: (State.RUNNING, 30, None), 1: (State.RUNNING, 30, None)},
+ 3,
+ {"tg.dummy1"},
+ 0,
+ set(),
+ ],
+ [
+ 2,
+ {0: (State.SUCCESS, 30, 15), 1: (State.RUNNING, 30, None)},
+ 4,
+ {"tg.dummy1", "tg.dummy2"},
+ 1,
+ {"tg.dummy2"},
+ ],
+ [
+ 2,
+ {0: (State.RUNNING, 30, None), 1: (State.SUCCESS, 30, 15)},
+ 4,
+ {"tg.dummy1", "tg.dummy2"},
+ 1,
+ {"tg.dummy2"},
+ ],
+ [
+ 2,
+ {0: (State.SUCCESS, 30, 15), 1: (State.SUCCESS, 30, 15)},
+ 5,
+ {"tg.dummy1", "tg.dummy2"},
+ 2,
+ {"tg.dummy2"},
+ ],
+ ],
+ )
+ def test_find_executable_task_instances_with_task_group_concurrency_limit(
+ self,
+ dag_maker,
+ concurrency_limit: int,
+ mapped_tis_state: dict,
+ schedule_count: int,
+ schedule_tids: set,
+ queued_count: int,
+ queued_tids: set,
+ ):
+ """
+ Test if _executable_task_instances_to_queued puts the right task
instances into the
+ mock_list.
+ """
+ with
dag_maker(dag_id="test_find_executable_task_instances_with_task_group_concurrency_limit"):
+
+ t1 = BashOperator(task_id="dummy", bash_command="sleep 300")
+
+ @task_group(max_active_groups_per_dagrun=concurrency_limit)
Review Comment:
Please add a test case for nested task group
It's important to test how this behave when `max_active_groups_per_dagrun=1`
and the code has task group inside task group:
```
with TaskGroup(group_id='group1') as tg1:
...
with TaskGroup(group_id='group2') as tg2:
...
```
We need to make sure it doesn't cause deadlock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]