nguyenmphu commented on a change in pull request #19123:
URL: https://github.com/apache/airflow/pull/19123#discussion_r734326251



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -532,7 +532,9 @@ def _process_executor_events(self, session: Session = None) 
-> int:
                 self.log.info("Setting external_id for %s to %s", ti, info)
                 continue
 
-            if ti.try_number == buffer_key.try_number and ti.state == 
State.QUEUED:
+            if ti.try_number == buffer_key.try_number and (
+                ti.state == State.QUEUED and not 
TaskReschedule.find_for_task_instance(ti, session=session)
+            ):

Review comment:
       @ephraimbuddy Our case is similar to this 
https://github.com/apache/airflow/issues/10790. Our project has a lot of 
external sensors with `rescheduled` mode. The error occurs randomly. Sometimes 
all DAGs work smoothly, but usually, there are several sensors that raise the 
error although they work normally. Because they are reported to fail, all their 
downstream tasks are not be triggered and we must mark these sensors as success.
   Because the time that a task is in the `QUEUED` state is very short, we 
cannot reproduce this behavior and the error occurs randomly.
   Also, I think there are very few queries to check if the task instance is 
rescheduled because Python evaluates boolean conditions lazily (docs: 
https://docs.python.org/3/reference/expressions.html#boolean-operations). If 
the state of the task instance is not `QUEUED`, the expression will be false 
and no query will be executed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to