This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 69f8682514a Remove change_sensor_mode_to_reschedule from base executor 
(#48649)
69f8682514a is described below

commit 69f8682514ad3889ad24a067cf3ee7cde4ad9161
Author: Jake Ferriero <[email protected]>
AuthorDate: Tue Apr 1 17:29:02 2025 -0700

    Remove change_sensor_mode_to_reschedule from base executor (#48649)
    
    
    There are no longer any single process executors so this field is
    not needed.
    
    This allows us to remove a runtime dependency of the task runner
    on loading the executor class when running sensors which used to
    override the prepare_for_execution to load the executor class to
    check this field which is False in all supported executors.
---
 airflow-core/docs/core-concepts/executor/index.rst         |  1 -
 airflow-core/src/airflow/executors/base_executor.py        |  1 -
 .../src/airflow/ti_deps/deps/ready_to_reschedule.py        |  6 +-----
 .../celery/executors/celery_kubernetes_executor.py         |  1 -
 .../cncf/kubernetes/executors/local_kubernetes_executor.py |  1 -
 task-sdk/src/airflow/sdk/bases/sensor.py                   | 14 --------------
 6 files changed, 1 insertion(+), 23 deletions(-)

diff --git a/airflow-core/docs/core-concepts/executor/index.rst 
b/airflow-core/docs/core-concepts/executor/index.rst
index eda91ecca24..ac52d0c3d7b 100644
--- a/airflow-core/docs/core-concepts/executor/index.rst
+++ b/airflow-core/docs/core-concepts/executor/index.rst
@@ -278,7 +278,6 @@ The ``BaseExecutor`` class interface contains a set of 
attributes that Airflow c
 * ``is_single_threaded``: Whether or not the executor is single threaded. This 
is particularly relevant to what database backends are supported. Single 
threaded executors can run with any backend, including SQLite.
 * ``is_production``: Whether or not the executor should be used for production 
purposes. A UI message is displayed to users when they are using a 
non-production ready executor.
 
-* ``change_sensor_mode_to_reschedule``: Running Airflow sensors in poke mode 
can block the thread of executors and in some cases Airflow.
 * ``serve_logs``: Whether or not the executor supports serving logs, see 
:doc:`/administration-and-deployment/logging-monitoring/logging-tasks`.
 
 CLI
diff --git a/airflow-core/src/airflow/executors/base_executor.py 
b/airflow-core/src/airflow/executors/base_executor.py
index 558eb952bb1..6ea9f62c6eb 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -122,7 +122,6 @@ class BaseExecutor(LoggingMixin):
     is_local: bool = False
     is_production: bool = True
 
-    change_sensor_mode_to_reschedule: bool = False
     serve_logs: bool = False
 
     job_id: None | int | str = None
diff --git a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py 
b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
index 2e49b5bc48e..abe0d38c2b7 100644
--- a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -50,11 +50,7 @@ class ReadyToRescheduleDep(BaseTIDep):
         if (
             # Mapped sensors don't have the reschedule property (it can only 
be calculated after unmapping),
             # so we don't check them here. They are handled below by checking 
TaskReschedule instead.
-            not is_mapped
-            and not getattr(ti.task, "reschedule", False)
-            # Executors can force running in reschedule mode,
-            # in which case we ignore the value of the task property.
-            and not executor.change_sensor_mode_to_reschedule
+            not is_mapped and not getattr(ti.task, "reschedule", False)
         ):
             yield self._passing_status(reason="Task is not in reschedule 
mode.")
             return
diff --git 
a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
 
b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
index 9caf0a58668..8d63150f544 100644
--- 
a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
+++ 
b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
@@ -67,7 +67,6 @@ class CeleryKubernetesExecutor(BaseExecutor):
     is_production: bool = True
 
     serve_logs: bool = False
-    change_sensor_mode_to_reschedule: bool = False
 
     callback_sink: BaseCallbackSink | None = None
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
index cf7d784aa5c..6acce8b3009 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
@@ -58,7 +58,6 @@ class LocalKubernetesExecutor(BaseExecutor):
     is_production: bool = True
 
     serve_logs: bool = True
-    change_sensor_mode_to_reschedule: bool = False
 
     callback_sink: BaseCallbackSink | None = None
 
diff --git a/task-sdk/src/airflow/sdk/bases/sensor.py 
b/task-sdk/src/airflow/sdk/bases/sensor.py
index 4347e684a42..3cbbac9a492 100644
--- a/task-sdk/src/airflow/sdk/bases/sensor.py
+++ b/task-sdk/src/airflow/sdk/bases/sensor.py
@@ -36,13 +36,11 @@ from airflow.exceptions import (
     TaskDeferralError,
     TaskDeferralTimeout,
 )
-from airflow.executors.executor_loader import ExecutorLoader
 from airflow.sdk.bases.operator import BaseOperator
 from airflow.utils import timezone
 
 if TYPE_CHECKING:
     from airflow.sdk.definitions.context import Context
-    from airflow.typing_compat import Self
 
 
 class PokeReturnValue:
@@ -326,18 +324,6 @@ class BaseSensorOperator(BaseOperator):
         self.log.info("new %s interval is %s", self.mode, new_interval)
         return new_interval
 
-    def prepare_for_execution(self) -> Self:
-        task = super().prepare_for_execution()
-
-        # Sensors in `poke` mode can block execution of DAGs when running
-        # with single process executor, thus we change the mode to`reschedule`
-        # to allow parallel task being scheduled and executed
-        executor, _ = ExecutorLoader.import_default_executor_cls()
-        if executor.change_sensor_mode_to_reschedule:
-            self.log.warning("%s changes sensor mode to 'reschedule'.", 
executor.__name__)
-            task.mode = "reschedule"
-        return task
-
     @property
     def reschedule(self):
         """Define mode rescheduled sensors."""

Reply via email to