ashb commented on code in PR #61809:
URL: https://github.com/apache/airflow/pull/61809#discussion_r2802757607


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -183,6 +184,70 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance, 
task_teardown_map=None
             log.info("Not skipping teardown task '%s'", ti.task_id)
 
 
+def _stop_remaining_tasks_in_group(
+    *,
+    task_instance: TaskInstance,
+    task_group: _TaskGroupRetryLike,
+    session: Session,
+) -> None:
+    """
+    Stop non-teardown tasks within a TaskGroup.
+
+    :meta private:
+    """
+    if not task_instance.dag_run:
+        raise ValueError("``task_instance`` must have ``dag_run`` set")
+    tasks = list(task_group.iter_tasks())
+    task_ids = {task.task_id for task in tasks}
+    if not task_ids:
+        return
+    task_teardown_map = {task.task_id: task.is_teardown for task in tasks}
+
+    tis = task_instance.dag_run.get_task_instances(session=session)
+    for ti in tis:
+        if ti.task_id == task_instance.task_id or ti.task_id not in task_ids:
+            continue
+        if ti.state in (TaskInstanceState.SUCCESS, TaskInstanceState.FAILED):

Review Comment:
   Upstream failed should be here too. There's almost certainly a 
terminal_states list defined already



##########
airflow-core/src/airflow/serialization/definitions/baseoperator.py:
##########
@@ -54,6 +55,7 @@
         TriggerRuleDep(),
         NotPreviouslySkippedDep(),
         MappedTaskUpstreamDep(),
+        TaskGroupRetryDep(),

Review Comment:
   Why do we need this new dep? We need to be extra super careful about adding 
new ones as this slowed down every. single. Scheduling decision 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to