This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 69f8682514a Remove change_sensor_mode_to_reschedule from base executor
(#48649)
69f8682514a is described below
commit 69f8682514ad3889ad24a067cf3ee7cde4ad9161
Author: Jake Ferriero <[email protected]>
AuthorDate: Tue Apr 1 17:29:02 2025 -0700
Remove change_sensor_mode_to_reschedule from base executor (#48649)
There are no longer any single process executors so this field is
not needed.
This allows us to remove a runtime dependency of the task runner
on loading the executor class when running sensors which used to
override the prepare_for_execution to load the executor class to
check this field which is False in all supported executors.
---
airflow-core/docs/core-concepts/executor/index.rst | 1 -
airflow-core/src/airflow/executors/base_executor.py | 1 -
.../src/airflow/ti_deps/deps/ready_to_reschedule.py | 6 +-----
.../celery/executors/celery_kubernetes_executor.py | 1 -
.../cncf/kubernetes/executors/local_kubernetes_executor.py | 1 -
task-sdk/src/airflow/sdk/bases/sensor.py | 14 --------------
6 files changed, 1 insertion(+), 23 deletions(-)
diff --git a/airflow-core/docs/core-concepts/executor/index.rst
b/airflow-core/docs/core-concepts/executor/index.rst
index eda91ecca24..ac52d0c3d7b 100644
--- a/airflow-core/docs/core-concepts/executor/index.rst
+++ b/airflow-core/docs/core-concepts/executor/index.rst
@@ -278,7 +278,6 @@ The ``BaseExecutor`` class interface contains a set of
attributes that Airflow c
* ``is_single_threaded``: Whether or not the executor is single threaded. This
is particularly relevant to what database backends are supported. Single
threaded executors can run with any backend, including SQLite.
* ``is_production``: Whether or not the executor should be used for production
purposes. A UI message is displayed to users when they are using a
non-production ready executor.
-* ``change_sensor_mode_to_reschedule``: Running Airflow sensors in poke mode
can block the thread of executors and in some cases Airflow.
* ``serve_logs``: Whether or not the executor supports serving logs, see
:doc:`/administration-and-deployment/logging-monitoring/logging-tasks`.
CLI
diff --git a/airflow-core/src/airflow/executors/base_executor.py
b/airflow-core/src/airflow/executors/base_executor.py
index 558eb952bb1..6ea9f62c6eb 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -122,7 +122,6 @@ class BaseExecutor(LoggingMixin):
is_local: bool = False
is_production: bool = True
- change_sensor_mode_to_reschedule: bool = False
serve_logs: bool = False
job_id: None | int | str = None
diff --git a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
index 2e49b5bc48e..abe0d38c2b7 100644
--- a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -50,11 +50,7 @@ class ReadyToRescheduleDep(BaseTIDep):
if (
# Mapped sensors don't have the reschedule property (it can only
be calculated after unmapping),
# so we don't check them here. They are handled below by checking
TaskReschedule instead.
- not is_mapped
- and not getattr(ti.task, "reschedule", False)
- # Executors can force running in reschedule mode,
- # in which case we ignore the value of the task property.
- and not executor.change_sensor_mode_to_reschedule
+ not is_mapped and not getattr(ti.task, "reschedule", False)
):
yield self._passing_status(reason="Task is not in reschedule
mode.")
return
diff --git
a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
index 9caf0a58668..8d63150f544 100644
---
a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
+++
b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
@@ -67,7 +67,6 @@ class CeleryKubernetesExecutor(BaseExecutor):
is_production: bool = True
serve_logs: bool = False
- change_sensor_mode_to_reschedule: bool = False
callback_sink: BaseCallbackSink | None = None
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
index cf7d784aa5c..6acce8b3009 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
@@ -58,7 +58,6 @@ class LocalKubernetesExecutor(BaseExecutor):
is_production: bool = True
serve_logs: bool = True
- change_sensor_mode_to_reschedule: bool = False
callback_sink: BaseCallbackSink | None = None
diff --git a/task-sdk/src/airflow/sdk/bases/sensor.py
b/task-sdk/src/airflow/sdk/bases/sensor.py
index 4347e684a42..3cbbac9a492 100644
--- a/task-sdk/src/airflow/sdk/bases/sensor.py
+++ b/task-sdk/src/airflow/sdk/bases/sensor.py
@@ -36,13 +36,11 @@ from airflow.exceptions import (
TaskDeferralError,
TaskDeferralTimeout,
)
-from airflow.executors.executor_loader import ExecutorLoader
from airflow.sdk.bases.operator import BaseOperator
from airflow.utils import timezone
if TYPE_CHECKING:
from airflow.sdk.definitions.context import Context
- from airflow.typing_compat import Self
class PokeReturnValue:
@@ -326,18 +324,6 @@ class BaseSensorOperator(BaseOperator):
self.log.info("new %s interval is %s", self.mode, new_interval)
return new_interval
- def prepare_for_execution(self) -> Self:
- task = super().prepare_for_execution()
-
- # Sensors in `poke` mode can block execution of DAGs when running
- # with single process executor, thus we change the mode to`reschedule`
- # to allow parallel task being scheduled and executed
- executor, _ = ExecutorLoader.import_default_executor_cls()
- if executor.change_sensor_mode_to_reschedule:
- self.log.warning("%s changes sensor mode to 'reschedule'.",
executor.__name__)
- task.mode = "reschedule"
- return task
-
@property
def reschedule(self):
"""Define mode rescheduled sensors."""