tirkarthi commented on code in PR #23079:
URL: https://github.com/apache/airflow/pull/23079#discussion_r856844448


##########
airflow/models/dag.py:
##########
@@ -1687,18 +1687,25 @@ def set_task_instance_state(
         end_date = resolve_execution_date if not future else None
         start_date = resolve_execution_date if not past else None
 
-        subdag.clear(
-            start_date=start_date,
-            end_date=end_date,
-            include_subdags=True,
-            include_parentdag=True,
-            only_failed=True,
-            session=session,
-            # Exclude the task itself from being cleared
-            exclude_task_ids={task_id},
-        )
+        # When state to be set is FAILED with downstream as True then
+        # passing only_failed will clear all failed instances just marked in
+        # downstream. Don't clear and return altered task instances.
+        # When state to be set is FAILED on an already Failed task with 
downstream as False
+        # then there are no altered objects and hence we don't need to clear 
them.
+        if state == TaskInstanceState.FAILED and (downstream or not altered):
+            return altered
+        else:
+            subdag.clear(
+                start_date=start_date,
+                end_date=end_date,
+                include_subdags=True,
+                include_parentdag=True,
+                only_failed=True,
+                session=session,
+                exclude_task_ids=[task_id],
+            )

Review Comment:
   Thanks, done. Inversing the if clause conditions made it slightly harder for 
me to read the logic in if clause and explain it in comment so I have kept it 
as same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to