This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a35ec9549d Fixes to how DebugExecutor handles sensors (#28528)
a35ec9549d is described below
commit a35ec9549dc0b4b311f126f3022309ec5b33fa62
Author: Raphaƫl Vandon <[email protected]>
AuthorDate: Tue Jan 24 21:40:10 2023 -0800
Fixes to how DebugExecutor handles sensors (#28528)
* move fix to ready_to_reschedule
* replace check on debug exec with new property
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/executors/debug_executor.py | 6 ++++++
airflow/ti_deps/deps/ready_to_reschedule.py | 15 +++++++++++----
2 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/airflow/executors/debug_executor.py
b/airflow/executors/debug_executor.py
index 18ba0f8798..4355bd1dcf 100644
--- a/airflow/executors/debug_executor.py
+++ b/airflow/executors/debug_executor.py
@@ -25,6 +25,7 @@ DebugExecutor.
from __future__ import annotations
import threading
+import time
from typing import Any
from airflow.configuration import conf
@@ -120,6 +121,11 @@ class DebugExecutor(BaseExecutor):
:param open_slots: Number of open slots
"""
+ if not self.queued_tasks:
+ # wait a bit if there are no tasks ready to be executed to avoid
spinning too fast in the void
+ time.sleep(0.5)
+ return
+
sorted_queue = sorted(
self.queued_tasks.items(),
key=lambda x: x[1][1],
diff --git a/airflow/ti_deps/deps/ready_to_reschedule.py
b/airflow/ti_deps/deps/ready_to_reschedule.py
index 467f08536e..66aa5c5613 100644
--- a/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+from airflow.executors.executor_loader import ExecutorLoader
from airflow.models.taskreschedule import TaskReschedule
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.utils import timezone
@@ -44,10 +45,16 @@ class ReadyToRescheduleDep(BaseTIDep):
from airflow.models.mappedoperator import MappedOperator
is_mapped = isinstance(ti.task, MappedOperator)
- if not is_mapped and not getattr(ti.task, "reschedule", False):
- # Mapped sensors don't have the reschedule property (it can only
- # be calculated after unmapping), so we don't check them here.
- # They are handled below by checking TaskReschedule instead.
+ executor, _ = ExecutorLoader.import_default_executor_cls()
+ if (
+ # Mapped sensors don't have the reschedule property (it can only
be calculated after unmapping),
+ # so we don't check them here. They are handled below by checking
TaskReschedule instead.
+ not is_mapped
+ and not getattr(ti.task, "reschedule", False)
+ # Executors can force running in reschedule mode,
+ # in which case we ignore the value of the task property.
+ and not executor.change_sensor_mode_to_reschedule
+ ):
yield self._passing_status(reason="Task is not in reschedule
mode.")
return