vandonr-amz commented on code in PR #28528:
URL: https://github.com/apache/airflow/pull/28528#discussion_r1055725594
##########
airflow/sensors/base.py:
##########
@@ -256,14 +256,13 @@ def _get_next_poke_interval(
return new_interval
def prepare_for_execution(self) -> BaseOperator:
- task = super().prepare_for_execution()
# Sensors in `poke` mode can block execution of DAGs when running
# with single process executor, thus we change the mode to`reschedule`
# to allow parallel task being scheduled and executed
if conf.get("core", "executor") == "DebugExecutor":
self.log.warning("DebugExecutor changes sensor mode to
'reschedule'.")
- task.mode = "reschedule"
- return task
+ self.mode = "reschedule"
Review Comment:
Because it's the original task object (and not the copy) that goes through
this code
https://github.com/apache/airflow/blob/681835a67c89784944f41fce86099bcb2c3a0614/airflow/ti_deps/deps/ready_to_reschedule.py#L47-L52,
and if the `mode` is not 'reschedule', the method returns early and the task
is immediately rescheduled instead of using the date in the future.
The best location to permanently change the mode would be the constructor of
the task, but at that time, we don't have access to the type of executor yet
(or at least I couldn't find how to get it). So I think the second best place
is here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]