eladkal commented on code in PR #32122:
URL: https://github.com/apache/airflow/pull/32122#discussion_r1276090447


##########
airflow/decorators/task_group.py:
##########
@@ -184,6 +184,7 @@ def task_group(
     ui_color: str = "CornflowerBlue",
     ui_fgcolor: str = "#000",
     add_suffix_on_collision: bool = False,
+    max_active_groups_per_dagrun: int | None = None,

Review Comment:
   How would this work with nested task group?
   
   
   



##########
tests/jobs/test_scheduler_job.py:
##########
@@ -1137,6 +1138,142 @@ def 
test_find_executable_task_instances_max_active_tis_per_dag(self, dag_maker):
         assert 1 == len(res)
         session.rollback()
 
+    @pytest.mark.parametrize(
+        
"concurrency_limit,mapped_tis_state,schedule_count,schedule_tids,queued_count,queued_tids",
+        [
+            [
+                1,
+                dict(),
+                5,
+                {"tg.dummy1"},
+                1,
+                {"tg.dummy1"},
+            ],
+            [
+                1,
+                {0: (State.RUNNING, 30, None)},
+                4,
+                {"tg.dummy1"},
+                0,
+                set(),
+            ],
+            [
+                1,
+                {0: (State.SUCCESS, 30, 15)},
+                5,
+                {"tg.dummy1", "tg.dummy2"},
+                1,
+                {"tg.dummy2"},
+            ],
+            [
+                2,
+                dict(),
+                5,
+                {"tg.dummy1"},
+                2,
+                {"tg.dummy1"},
+            ],
+            [
+                2,
+                {0: (State.RUNNING, 30, None), 1: (State.RUNNING, 30, None)},
+                3,
+                {"tg.dummy1"},
+                0,
+                set(),
+            ],
+            [
+                2,
+                {0: (State.SUCCESS, 30, 15), 1: (State.RUNNING, 30, None)},
+                4,
+                {"tg.dummy1", "tg.dummy2"},
+                1,
+                {"tg.dummy2"},
+            ],
+            [
+                2,
+                {0: (State.RUNNING, 30, None), 1: (State.SUCCESS, 30, 15)},
+                4,
+                {"tg.dummy1", "tg.dummy2"},
+                1,
+                {"tg.dummy2"},
+            ],
+            [
+                2,
+                {0: (State.SUCCESS, 30, 15), 1: (State.SUCCESS, 30, 15)},
+                5,
+                {"tg.dummy1", "tg.dummy2"},
+                2,
+                {"tg.dummy2"},
+            ],
+        ],
+    )
+    def test_find_executable_task_instances_with_task_group_concurrency_limit(
+        self,
+        dag_maker,
+        concurrency_limit: int,
+        mapped_tis_state: dict,
+        schedule_count: int,
+        schedule_tids: set,
+        queued_count: int,
+        queued_tids: set,
+    ):
+        """
+        Test if _executable_task_instances_to_queued puts the right task 
instances into the
+        mock_list.
+        """
+        with 
dag_maker(dag_id="test_find_executable_task_instances_with_task_group_concurrency_limit"):
+
+            t1 = BashOperator(task_id="dummy", bash_command="sleep 300")
+
+            @task_group(max_active_groups_per_dagrun=concurrency_limit)

Review Comment:
   Please add a test case for nested task group
   
   It's important to test how this behave when `max_active_groups_per_dagrun=1` 
and the code has task group inside task group. We need to make sure it doesn't 
cause deadlock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to