uranusjr commented on code in PR #28528:
URL: https://github.com/apache/airflow/pull/28528#discussion_r1057585104
##########
airflow/sensors/base.py:
##########
@@ -256,14 +256,13 @@ def _get_next_poke_interval(
return new_interval
def prepare_for_execution(self) -> BaseOperator:
- task = super().prepare_for_execution()
# Sensors in `poke` mode can block execution of DAGs when running
# with single process executor, thus we change the mode to`reschedule`
# to allow parallel task being scheduled and executed
if conf.get("core", "executor") == "DebugExecutor":
self.log.warning("DebugExecutor changes sensor mode to
'reschedule'.")
- task.mode = "reschedule"
- return task
+ self.mode = "reschedule"
Review Comment:
In `TaskInstance`, there’s a line that assigns the copied task to `task`:
https://github.com/apache/airflow/blob/4e545c6e54712eedb6ca9cbb8333393ae3f6cba2/airflow/models/taskinstance.py#L1373
So why is `ti.task` here still referencing the original task?
---
Regardless of the problem above, I think it’s a bad idea to modify the
original task in any case. Maybe a better alternative would be to simply add
the DebugExecutor exception in ReadyToRescheduleDep so we don’t need to change
the value assigned to `reschedule` at all.
##########
airflow/executors/debug_executor.py:
##########
@@ -118,6 +119,11 @@ def trigger_tasks(self, open_slots: int) -> None:
:param open_slots: Number of open slots
"""
+ if not self.queued_tasks:
+ # wait a bit if there is no task ready to be executed to avoid
spinning too fast in the void
Review Comment:
```suggestion
# wait a bit if there are no tasks ready to be executed to avoid
spinning too fast in the void
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]