jedcunningham commented on code in PR #23079:
URL: https://github.com/apache/airflow/pull/23079#discussion_r856552185


##########
airflow/models/dag.py:
##########
@@ -1687,18 +1687,25 @@ def set_task_instance_state(
         end_date = resolve_execution_date if not future else None
         start_date = resolve_execution_date if not past else None
 
-        subdag.clear(
-            start_date=start_date,
-            end_date=end_date,
-            include_subdags=True,
-            include_parentdag=True,
-            only_failed=True,
-            session=session,
-            # Exclude the task itself from being cleared
-            exclude_task_ids={task_id},
-        )
+        # When state to be set is FAILED with downstream as True then
+        # passing only_failed will clear all failed instances just marked in
+        # downstream. Don't clear and return altered task instances.
+        # When state to be set is FAILED on an already Failed task with 
downstream as False
+        # then there are no altered objects and hence we don't need to clear 
them.
+        if state == TaskInstanceState.FAILED and (downstream or not altered):
+            return altered
+        else:
+            subdag.clear(
+                start_date=start_date,
+                end_date=end_date,
+                include_subdags=True,
+                include_parentdag=True,
+                only_failed=True,
+                session=session,
+                exclude_task_ids=[task_id],
+            )
 
-        return altered
+            return altered

Review Comment:
   ```suggestion
           return altered
   ```



##########
airflow/models/dag.py:
##########
@@ -1687,18 +1687,25 @@ def set_task_instance_state(
         end_date = resolve_execution_date if not future else None
         start_date = resolve_execution_date if not past else None
 
-        subdag.clear(
-            start_date=start_date,
-            end_date=end_date,
-            include_subdags=True,
-            include_parentdag=True,
-            only_failed=True,
-            session=session,
-            # Exclude the task itself from being cleared
-            exclude_task_ids={task_id},
-        )
+        # When state to be set is FAILED with downstream as True then
+        # passing only_failed will clear all failed instances just marked in
+        # downstream. Don't clear and return altered task instances.
+        # When state to be set is FAILED on an already Failed task with 
downstream as False
+        # then there are no altered objects and hence we don't need to clear 
them.
+        if state == TaskInstanceState.FAILED and (downstream or not altered):
+            return altered
+        else:
+            subdag.clear(
+                start_date=start_date,
+                end_date=end_date,
+                include_subdags=True,
+                include_parentdag=True,
+                only_failed=True,
+                session=session,
+                exclude_task_ids=[task_id],
+            )

Review Comment:
   ```suggestion
   
           subdag.clear(
               start_date=start_date,
               end_date=end_date,
               include_subdags=True,
               include_parentdag=True,
               only_failed=True,
               session=session,
               exclude_task_ids=[task_id],
           )
   ```
   
   We don't need the else.
   
   Alternatively, we could inverse the if and have a single return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to