This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f8593503cb Add default_deferrable config (#31712)
f8593503cb is described below

commit f8593503cbe252c2f4dc5ff48a3f292c9e13baad
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jul 5 13:55:26 2023 +0800

    Add default_deferrable config (#31712)
---
 airflow/config_templates/config.yml                |  7 +++++
 airflow/config_templates/default_airflow.cfg       |  3 +++
 airflow/operators/trigger_dagrun.py                |  6 ++---
 airflow/providers/amazon/aws/operators/athena.py   |  3 ++-
 airflow/providers/amazon/aws/operators/batch.py    |  5 ++--
 airflow/providers/amazon/aws/operators/ecs.py      | 19 +++++---------
 airflow/providers/amazon/aws/operators/eks.py      | 11 ++++----
 airflow/providers/amazon/aws/operators/emr.py      |  9 ++++---
 airflow/providers/amazon/aws/operators/glue.py     |  3 ++-
 .../providers/amazon/aws/operators/glue_crawler.py |  3 ++-
 airflow/providers/amazon/aws/operators/rds.py      |  5 ++--
 .../amazon/aws/operators/redshift_cluster.py       | 12 ++++-----
 .../providers/amazon/aws/operators/sagemaker.py    | 11 ++++----
 airflow/providers/amazon/aws/sensors/batch.py      |  3 ++-
 airflow/providers/amazon/aws/sensors/ec2.py        |  3 ++-
 airflow/providers/amazon/aws/sensors/emr.py        |  7 ++---
 airflow/providers/amazon/aws/sensors/s3.py         |  7 ++---
 airflow/providers/apache/livy/operators/livy.py    |  4 +--
 airflow/providers/cncf/kubernetes/operators/pod.py |  3 ++-
 .../providers/databricks/operators/databricks.py   |  5 ++--
 airflow/providers/dbt/cloud/operators/dbt.py       |  3 ++-
 airflow/providers/dbt/cloud/sensors/dbt.py         |  3 ++-
 .../providers/google/cloud/operators/bigquery.py   | 12 ++++-----
 .../google/cloud/operators/bigquery_dts.py         |  3 ++-
 .../google/cloud/operators/cloud_build.py          |  3 ++-
 .../google/cloud/operators/cloud_composer.py       |  7 ++---
 .../providers/google/cloud/operators/cloud_sql.py  |  3 ++-
 .../providers/google/cloud/operators/dataflow.py   |  6 ++---
 .../providers/google/cloud/operators/dataproc.py   | 17 ++++++------
 .../google/cloud/operators/kubernetes_engine.py    |  6 +++--
 .../providers/google/cloud/operators/mlengine.py   |  8 ++----
 airflow/providers/google/cloud/sensors/bigquery.py |  5 ++--
 airflow/providers/google/cloud/sensors/gcs.py      | 12 ++++-----
 airflow/providers/google/cloud/sensors/pubsub.py   |  4 +--
 .../google/cloud/transfers/bigquery_to_gcs.py      |  3 ++-
 .../google/cloud/transfers/gcs_to_bigquery.py      |  5 ++--
 .../microsoft/azure/operators/data_factory.py      |  3 ++-
 .../microsoft/azure/sensors/data_factory.py        |  3 ++-
 airflow/providers/microsoft/azure/sensors/wasb.py  |  5 ++--
 .../authoring-and-scheduling/deferring.rst         | 30 ++++++++++++++++++++++
 tests/models/test_baseoperator.py                  |  1 -
 41 files changed, 160 insertions(+), 111 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 1ad8705a70..d588a7f26e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1305,6 +1305,13 @@ operators:
       type: string
       example: ~
       default: "airflow"
+    default_deferrable:
+      description: |
+        The default value of attribute "deferrable" in operators and sensors.
+      version_added: ~
+      type: boolean
+      example: ~
+      default: "false"
     default_cpus:
       description: ~
       version_added: ~
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 0cc99a3f4e..ae6bdec085 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -703,6 +703,9 @@ password =
 # The default owner assigned to each new operator, unless
 # provided explicitly or passed via ``default_args``
 default_owner = airflow
+
+# The default value of attribute "deferrable" in operators and sensors.
+default_deferrable = false
 default_cpus = 1
 default_ram = 512
 default_disk = 512
diff --git a/airflow/operators/trigger_dagrun.py 
b/airflow/operators/trigger_dagrun.py
index 0165bb470e..548ef91894 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, Sequence, cast
 from sqlalchemy.orm.exc import NoResultFound
 
 from airflow.api.common.trigger_dag import trigger_dag
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, DagNotFound, 
DagRunAlreadyExists
 from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
 from airflow.models.dag import DagModel
@@ -113,7 +114,7 @@ class TriggerDagRunOperator(BaseOperator):
         poke_interval: int = 60,
         allowed_states: list | None = None,
         failed_states: list | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -135,7 +136,6 @@ class TriggerDagRunOperator(BaseOperator):
         self.execution_date = execution_date
 
     def execute(self, context: Context):
-
         if isinstance(self.execution_date, datetime.datetime):
             parsed_execution_date = self.execution_date
         elif isinstance(self.execution_date, str):
@@ -187,7 +187,6 @@ class TriggerDagRunOperator(BaseOperator):
         ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)
 
         if self.wait_for_completion:
-
             # Kick off the deferral process
             if self._defer:
                 self.defer(
@@ -219,7 +218,6 @@ class TriggerDagRunOperator(BaseOperator):
 
     @provide_session
     def execute_complete(self, context: Context, session: Session, event: 
tuple[str, dict[str, Any]]):
-
         # This execution date is parsed from the return trigger event
         provided_execution_date = event[1]["execution_dates"][0]
         try:
diff --git a/airflow/providers/amazon/aws/operators/athena.py 
b/airflow/providers/amazon/aws/operators/athena.py
index 0467fe6d11..6dd1432ea4 100644
--- a/airflow/providers/amazon/aws/operators/athena.py
+++ b/airflow/providers/amazon/aws/operators/athena.py
@@ -21,6 +21,7 @@ from functools import cached_property
 from typing import TYPE_CHECKING, Any, Sequence
 
 from airflow import AirflowException
+from airflow.configuration import conf
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.athena import AthenaHook
 from airflow.providers.amazon.aws.triggers.athena import AthenaTrigger
@@ -74,7 +75,7 @@ class AthenaOperator(BaseOperator):
         sleep_time: int = 30,
         max_polling_attempts: int | None = None,
         log_query: bool = True,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs: Any,
     ) -> None:
         super().__init__(**kwargs)
diff --git a/airflow/providers/amazon/aws/operators/batch.py 
b/airflow/providers/amazon/aws/operators/batch.py
index b9b3322c49..9dd954d05c 100644
--- a/airflow/providers/amazon/aws/operators/batch.py
+++ b/airflow/providers/amazon/aws/operators/batch.py
@@ -29,6 +29,7 @@ from datetime import timedelta
 from functools import cached_property
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
@@ -154,7 +155,7 @@ class BatchOperator(BaseOperator):
         region_name: str | None = None,
         tags: dict | None = None,
         wait_for_completion: bool = True,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: int = 30,
         awslogs_enabled: bool = False,
         awslogs_fetch_interval: timedelta = timedelta(seconds=30),
@@ -437,7 +438,7 @@ class BatchCreateComputeEnvironmentOperator(BaseOperator):
         max_retries: int | None = None,
         aws_conn_id: str | None = None,
         region_name: str | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         if "status_retries" in kwargs:
diff --git a/airflow/providers/amazon/aws/operators/ecs.py 
b/airflow/providers/amazon/aws/operators/ecs.py
index 91533cfa62..e5833bf4c3 100644
--- a/airflow/providers/amazon/aws/operators/ecs.py
+++ b/airflow/providers/amazon/aws/operators/ecs.py
@@ -26,20 +26,14 @@ from typing import TYPE_CHECKING, Sequence
 
 import boto3
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator, XCom
 from airflow.providers.amazon.aws.exceptions import EcsOperatorError, 
EcsTaskFailToStart
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.providers.amazon.aws.hooks.ecs import (
-    EcsClusterStates,
-    EcsHook,
-    should_retry_eni,
-)
+from airflow.providers.amazon.aws.hooks.ecs import EcsClusterStates, EcsHook, 
should_retry_eni
 from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
-from airflow.providers.amazon.aws.triggers.ecs import (
-    ClusterWaiterTrigger,
-    TaskDoneTrigger,
-)
+from airflow.providers.amazon.aws.triggers.ecs import ClusterWaiterTrigger, 
TaskDoneTrigger
 from airflow.providers.amazon.aws.utils.task_log_fetcher import 
AwsTaskLogFetcher
 from airflow.utils.helpers import prune_dict
 from airflow.utils.session import provide_session
@@ -118,7 +112,7 @@ class EcsCreateClusterOperator(EcsBaseOperator):
         wait_for_completion: bool = True,
         waiter_delay: int = 15,
         waiter_max_attempts: int = 60,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -201,7 +195,7 @@ class EcsDeleteClusterOperator(EcsBaseOperator):
         wait_for_completion: bool = True,
         waiter_delay: int = 15,
         waiter_max_attempts: int = 60,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -482,7 +476,7 @@ class EcsRunTaskOperator(EcsBaseOperator):
         wait_for_completion: bool = True,
         waiter_delay: int = 6,
         waiter_max_attempts: int = 100,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -727,7 +721,6 @@ class EcsRunTaskOperator(EcsBaseOperator):
             raise AirflowException(response)
 
         for task in response["tasks"]:
-
             if task.get("stopCode", "") == "TaskFailedToStart":
                 # Reset task arn here otherwise the retry run will not start
                 # a new task but keep polling the old dead one
diff --git a/airflow/providers/amazon/aws/operators/eks.py 
b/airflow/providers/amazon/aws/operators/eks.py
index bea4223987..56e9269f88 100644
--- a/airflow/providers/amazon/aws/operators/eks.py
+++ b/airflow/providers/amazon/aws/operators/eks.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, List, Sequence, cast
 
 from botocore.exceptions import ClientError, WaiterError
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.eks import EksHook
@@ -83,7 +84,6 @@ def _create_compute(
     log = logging.getLogger(__name__)
     eks_hook = EksHook(aws_conn_id=aws_conn_id, region_name=region)
     if compute == "nodegroup" and nodegroup_name:
-
         # this is to satisfy mypy
         subnets = subnets or []
         create_nodegroup_kwargs = create_nodegroup_kwargs or {}
@@ -107,7 +107,6 @@ def _create_compute(
                 status_args=["nodegroup.status"],
             )
     elif compute == "fargate" and fargate_profile_name:
-
         # this is to satisfy mypy
         create_fargate_profile_kwargs = create_fargate_profile_kwargs or {}
         fargate_selectors = fargate_selectors or []
@@ -366,7 +365,7 @@ class EksCreateNodegroupOperator(BaseOperator):
         region: str | None = None,
         waiter_delay: int = 30,
         waiter_max_attempts: int = 80,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         self.nodegroup_subnets = nodegroup_subnets
@@ -489,7 +488,7 @@ class EksCreateFargateProfileOperator(BaseOperator):
         region: str | None = None,
         waiter_delay: int = 10,
         waiter_max_attempts: int = 60,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         self.cluster_name = cluster_name
@@ -690,7 +689,7 @@ class EksDeleteNodegroupOperator(BaseOperator):
         region: str | None = None,
         waiter_delay: int = 30,
         waiter_max_attempts: int = 40,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         self.cluster_name = cluster_name
@@ -780,7 +779,7 @@ class EksDeleteFargateProfileOperator(BaseOperator):
         region: str | None = None,
         waiter_delay: int = 30,
         waiter_max_attempts: int = 60,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
diff --git a/airflow/providers/amazon/aws/operators/emr.py 
b/airflow/providers/amazon/aws/operators/emr.py
index f9eacdb79d..8330a586e4 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -24,6 +24,7 @@ from functools import cached_property
 from typing import TYPE_CHECKING, Any, Sequence
 from uuid import uuid4
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, 
EmrServerlessHook
@@ -96,7 +97,7 @@ class EmrAddStepsOperator(BaseOperator):
         waiter_delay: int | None = None,
         waiter_max_attempts: int | None = None,
         execution_role_arn: str | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         if not exactly_one(job_flow_id is None, job_flow_name is None):
@@ -510,7 +511,7 @@ class EmrContainerOperator(BaseOperator):
         max_tries: int | None = None,
         tags: dict | None = None,
         max_polling_attempts: int | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs: Any,
     ) -> None:
         super().__init__(**kwargs)
@@ -695,7 +696,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
         waiter_delay: int | None | ArgNotSet = NOTSET,
         waiter_countdown: int | None = None,
         waiter_check_interval_seconds: int = 60,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs: Any,
     ):
         if waiter_max_attempts is NOTSET:
@@ -900,7 +901,7 @@ class EmrTerminateJobFlowOperator(BaseOperator):
         aws_conn_id: str = "aws_default",
         waiter_delay: int = 60,
         waiter_max_attempts: int = 20,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(**kwargs)
diff --git a/airflow/providers/amazon/aws/operators/glue.py 
b/airflow/providers/amazon/aws/operators/glue.py
index 060ac358a4..265d057de5 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -23,6 +23,7 @@ from functools import cached_property
 from typing import TYPE_CHECKING, Sequence
 
 from airflow import AirflowException
+from airflow.configuration import conf
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@@ -98,7 +99,7 @@ class GlueJobOperator(BaseOperator):
         create_job_kwargs: dict | None = None,
         run_job_kwargs: dict | None = None,
         wait_for_completion: bool = True,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         verbose: bool = False,
         update_config: bool = False,
         job_poll_interval: int | float = 6,
diff --git a/airflow/providers/amazon/aws/operators/glue_crawler.py 
b/airflow/providers/amazon/aws/operators/glue_crawler.py
index a7efb9f5c0..71e2607039 100644
--- a/airflow/providers/amazon/aws/operators/glue_crawler.py
+++ b/airflow/providers/amazon/aws/operators/glue_crawler.py
@@ -21,6 +21,7 @@ from functools import cached_property
 from typing import TYPE_CHECKING, Sequence
 
 from airflow import AirflowException
+from airflow.configuration import conf
 from airflow.providers.amazon.aws.triggers.glue_crawler import 
GlueCrawlerCompleteTrigger
 
 if TYPE_CHECKING:
@@ -61,7 +62,7 @@ class GlueCrawlerOperator(BaseOperator):
         region_name: str | None = None,
         poll_interval: int = 5,
         wait_for_completion: bool = True,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(**kwargs)
diff --git a/airflow/providers/amazon/aws/operators/rds.py 
b/airflow/providers/amazon/aws/operators/rds.py
index 9aef670166..c58961db2e 100644
--- a/airflow/providers/amazon/aws/operators/rds.py
+++ b/airflow/providers/amazon/aws/operators/rds.py
@@ -24,6 +24,7 @@ from typing import TYPE_CHECKING, Sequence
 
 from mypy_boto3_rds.type_defs import TagTypeDef
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.rds import RdsHook
@@ -554,7 +555,7 @@ class RdsCreateDbInstanceOperator(RdsBaseOperator):
         rds_kwargs: dict | None = None,
         aws_conn_id: str = "aws_default",
         wait_for_completion: bool = True,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         waiter_delay: int = 30,
         waiter_max_attempts: int = 60,
         **kwargs,
@@ -645,7 +646,7 @@ class RdsDeleteDbInstanceOperator(RdsBaseOperator):
         rds_kwargs: dict | None = None,
         aws_conn_id: str = "aws_default",
         wait_for_completion: bool = True,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         waiter_delay: int = 30,
         waiter_max_attempts: int = 60,
         **kwargs,
diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py 
b/airflow/providers/amazon/aws/operators/redshift_cluster.py
index 905c34ff3a..cde4a32226 100644
--- a/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -20,6 +20,7 @@ import time
 from datetime import timedelta
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
@@ -148,7 +149,7 @@ class RedshiftCreateClusterOperator(BaseOperator):
         wait_for_completion: bool = False,
         max_attempt: int = 5,
         poll_interval: int = 60,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -327,7 +328,7 @@ class RedshiftCreateClusterSnapshotOperator(BaseOperator):
         poll_interval: int = 15,
         max_attempt: int = 20,
         aws_conn_id: str = "aws_default",
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -470,7 +471,7 @@ class RedshiftResumeClusterOperator(BaseOperator):
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
         wait_for_completion: bool = False,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: int = 10,
         max_attempts: int = 10,
         **kwargs,
@@ -560,7 +561,7 @@ class RedshiftPauseClusterOperator(BaseOperator):
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: int = 10,
         max_attempts: int = 15,
         **kwargs,
@@ -647,7 +648,7 @@ class RedshiftDeleteClusterOperator(BaseOperator):
         wait_for_completion: bool = True,
         aws_conn_id: str = "aws_default",
         poll_interval: int = 30,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         max_attempts: int = 30,
         **kwargs,
     ):
@@ -668,7 +669,6 @@ class RedshiftDeleteClusterOperator(BaseOperator):
         self.max_attempts = max_attempts
 
     def execute(self, context: Context):
-
         while self._attempts >= 1:
             try:
                 self.redshift_hook.delete_cluster(
diff --git a/airflow/providers/amazon/aws/operators/sagemaker.py 
b/airflow/providers/amazon/aws/operators/sagemaker.py
index 4dac7df007..ac1b7a73d2 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence
 
 from botocore.exceptions import ClientError
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
@@ -198,7 +199,7 @@ class SageMakerProcessingOperator(SageMakerBaseOperator):
         max_attempts: int | None = None,
         max_ingestion_time: int | None = None,
         action_if_job_exists: str = "timestamp",
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
@@ -392,7 +393,7 @@ class SageMakerEndpointOperator(SageMakerBaseOperator):
         check_interval: int = CHECK_INTERVAL_SECOND,
         max_ingestion_time: int | None = None,
         operation: str = "create",
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
@@ -551,7 +552,7 @@ class SageMakerTransformOperator(SageMakerBaseOperator):
         max_ingestion_time: int | None = None,
         check_if_job_exists: bool = True,
         action_if_job_exists: str = "timestamp",
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
@@ -700,7 +701,7 @@ class SageMakerTuningOperator(SageMakerBaseOperator):
         wait_for_completion: bool = True,
         check_interval: int = CHECK_INTERVAL_SECOND,
         max_ingestion_time: int | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
@@ -862,7 +863,7 @@ class SageMakerTrainingOperator(SageMakerBaseOperator):
         max_ingestion_time: int | None = None,
         check_if_job_exists: bool = True,
         action_if_job_exists: str = "timestamp",
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
diff --git a/airflow/providers/amazon/aws/sensors/batch.py 
b/airflow/providers/amazon/aws/sensors/batch.py
index 2033d1e86b..32da5b4cf2 100644
--- a/airflow/providers/amazon/aws/sensors/batch.py
+++ b/airflow/providers/amazon/aws/sensors/batch.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Any, Sequence
 
 from deprecated import deprecated
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
 from airflow.providers.amazon.aws.triggers.batch import BatchSensorTrigger
@@ -58,7 +59,7 @@ class BatchSensor(BaseSensorOperator):
         job_id: str,
         aws_conn_id: str = "aws_default",
         region_name: str | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poke_interval: float = 5,
         max_retries: int = 5,
         **kwargs,
diff --git a/airflow/providers/amazon/aws/sensors/ec2.py 
b/airflow/providers/amazon/aws/sensors/ec2.py
index c5d7761031..2b7b63f7e6 100644
--- a/airflow/providers/amazon/aws/sensors/ec2.py
+++ b/airflow/providers/amazon/aws/sensors/ec2.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 from functools import cached_property
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
 from airflow.providers.amazon.aws.triggers.ec2 import EC2StateSensorTrigger
@@ -55,7 +56,7 @@ class EC2InstanceStateSensor(BaseSensorOperator):
         instance_id: str,
         aws_conn_id: str = "aws_default",
         region_name: str | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         if target_state not in self.valid_states:
diff --git a/airflow/providers/amazon/aws/sensors/emr.py 
b/airflow/providers/amazon/aws/sensors/emr.py
index 2f44caab06..9953dfa782 100644
--- a/airflow/providers/amazon/aws/sensors/emr.py
+++ b/airflow/providers/amazon/aws/sensors/emr.py
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING, Any, Iterable, Sequence
 
 from deprecated import deprecated
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, 
EmrServerlessHook
 from airflow.providers.amazon.aws.links.emr import EmrClusterLink, 
EmrLogsLink, get_log_uri
@@ -271,7 +272,7 @@ class EmrContainerSensor(BaseSensorOperator):
         max_retries: int | None = None,
         aws_conn_id: str = "aws_default",
         poll_interval: int = 10,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs: Any,
     ) -> None:
         super().__init__(**kwargs)
@@ -425,7 +426,7 @@ class EmrJobFlowSensor(EmrBaseSensor):
         target_states: Iterable[str] | None = None,
         failed_states: Iterable[str] | None = None,
         max_attempts: int = 60,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -549,7 +550,7 @@ class EmrStepSensor(EmrBaseSensor):
         target_states: Iterable[str] | None = None,
         failed_states: Iterable[str] | None = None,
         max_attempts: int = 60,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(**kwargs)
diff --git a/airflow/providers/amazon/aws/sensors/s3.py 
b/airflow/providers/amazon/aws/sensors/s3.py
index 7192585afd..4d15cdc212 100644
--- a/airflow/providers/amazon/aws/sensors/s3.py
+++ b/airflow/providers/amazon/aws/sensors/s3.py
@@ -26,6 +26,8 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence, 
cast
 
 from deprecated import deprecated
 
+from airflow.configuration import conf
+
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
@@ -87,7 +89,7 @@ class S3KeySensor(BaseSensorOperator):
         check_fn: Callable[..., bool] | None = None,
         aws_conn_id: str = "aws_default",
         verify: str | bool | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -238,10 +240,9 @@ class S3KeysUnchangedSensor(BaseSensorOperator):
         min_objects: int = 1,
         previous_objects: set[str] | None = None,
         allow_delete: bool = True,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
-
         super().__init__(**kwargs)
 
         self.bucket_name = bucket_name
diff --git a/airflow/providers/apache/livy/operators/livy.py 
b/airflow/providers/apache/livy/operators/livy.py
index fa4f357343..f5e519315f 100644
--- a/airflow/providers/apache/livy/operators/livy.py
+++ b/airflow/providers/apache/livy/operators/livy.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 from time import sleep
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.apache.livy.hooks.livy import BatchState, LivyHook
@@ -88,10 +89,9 @@ class LivyOperator(BaseOperator):
         extra_options: dict[str, Any] | None = None,
         extra_headers: dict[str, Any] | None = None,
         retry_args: dict[str, Any] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs: Any,
     ) -> None:
-
         super().__init__(**kwargs)
 
         self.spark_params = {
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 696611c6c2..e3ac7708e7 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -33,6 +33,7 @@ from kubernetes.client import CoreV1Api, models as k8s
 from slugify import slugify
 from urllib3.exceptions import HTTPError
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
 from airflow.kubernetes import pod_generator
 from airflow.kubernetes.pod_generator import PodGenerator
@@ -305,7 +306,7 @@ class KubernetesPodOperator(BaseOperator):
         configmaps: list[str] | None = None,
         skip_on_exit_code: int | Container[int] | None = None,
         base_container_name: str | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: float = 2,
         log_pod_spec_on_failure: bool = True,
         on_finish_action: str = "delete_pod",
diff --git a/airflow/providers/databricks/operators/databricks.py 
b/airflow/providers/databricks/operators/databricks.py
index ab93d8f49b..fb27f0c01a 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -24,6 +24,7 @@ from functools import cached_property
 from logging import Logger
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator, BaseOperatorLink, XCom
 from airflow.providers.databricks.hooks.databricks import DatabricksHook, 
RunState
@@ -315,7 +316,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
         access_control_list: list[dict[str, str]] | None = None,
         wait_for_termination: bool = True,
         git_source: dict[str, str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         """Creates a new ``DatabricksSubmitRunOperator``."""
@@ -605,7 +606,7 @@ class DatabricksRunNowOperator(BaseOperator):
         databricks_retry_args: dict[Any, Any] | None = None,
         do_xcom_push: bool = True,
         wait_for_termination: bool = True,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         """Creates a new ``DatabricksRunNowOperator``."""
diff --git a/airflow/providers/dbt/cloud/operators/dbt.py 
b/airflow/providers/dbt/cloud/operators/dbt.py
index f316c47f3d..c977539afb 100644
--- a/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/airflow/providers/dbt/cloud/operators/dbt.py
@@ -22,6 +22,7 @@ import warnings
 from pathlib import Path
 from typing import TYPE_CHECKING, Any
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator, BaseOperatorLink, XCom
 from airflow.providers.dbt.cloud.hooks.dbt import (
@@ -99,7 +100,7 @@ class DbtCloudRunJobOperator(BaseOperator):
         timeout: int = 60 * 60 * 24 * 7,
         check_interval: int = 60,
         additional_run_config: dict[str, Any] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
diff --git a/airflow/providers/dbt/cloud/sensors/dbt.py 
b/airflow/providers/dbt/cloud/sensors/dbt.py
index 5838f6d624..3b5ae549a3 100644
--- a/airflow/providers/dbt/cloud/sensors/dbt.py
+++ b/airflow/providers/dbt/cloud/sensors/dbt.py
@@ -20,6 +20,7 @@ import time
 import warnings
 from typing import TYPE_CHECKING, Any
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, 
DbtCloudJobRunException, DbtCloudJobRunStatus
 from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
@@ -50,7 +51,7 @@ class DbtCloudJobRunSensor(BaseSensorOperator):
         dbt_cloud_conn_id: str = DbtCloudHook.default_conn_name,
         run_id: int,
         account_id: int | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         if deferrable:
diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 98929b6e6d..970e7813ed 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -29,6 +29,7 @@ from google.api_core.retry import Retry
 from google.cloud.bigquery import DEFAULT_RETRY, CopyJob, ExtractJob, LoadJob, 
QueryJob
 from google.cloud.bigquery.table import RowIterator
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
 from airflow.models import BaseOperator, BaseOperatorLink
 from airflow.models.xcom import XCom
@@ -200,7 +201,7 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, 
SQLCheckOperator):
         location: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         labels: dict | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: float = 4.0,
         **kwargs,
     ) -> None:
@@ -320,7 +321,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
         location: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         labels: dict | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: float = 4.0,
         **kwargs,
     ) -> None:
@@ -460,7 +461,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperat
         location: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         labels: dict | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: float = 4.0,
         **kwargs,
     ) -> None:
@@ -854,7 +855,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
         gcp_conn_id: str = "google_cloud_default",
         location: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: float = 4.0,
         as_dict: bool = False,
         use_legacy_sql: bool = True,
@@ -1876,7 +1877,6 @@ class 
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
         exists_ok: bool | None = None,
         **kwargs,
     ) -> None:
-
         self.dataset_id = dataset_id
         self.project_id = project_id
         self.location = location
@@ -2623,7 +2623,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
         cancel_on_kill: bool = True,
         result_retry: Retry = DEFAULT_RETRY,
         result_timeout: float | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: float = 4.0,
         **kwargs,
     ) -> None:
diff --git a/airflow/providers/google/cloud/operators/bigquery_dts.py 
b/airflow/providers/google/cloud/operators/bigquery_dts.py
index d9e013afa6..e10618bc39 100644
--- a/airflow/providers/google/cloud/operators/bigquery_dts.py
+++ b/airflow/providers/google/cloud/operators/bigquery_dts.py
@@ -32,6 +32,7 @@ from google.cloud.bigquery_datatransfer_v1 import (
 )
 
 from airflow import AirflowException
+from airflow.configuration import conf
 from airflow.providers.google.cloud.hooks.bigquery_dts import 
BiqQueryDataTransferServiceHook, get_object_id
 from airflow.providers.google.cloud.links.bigquery_dts import 
BigQueryDataTransferConfigLink
 from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
@@ -279,7 +280,7 @@ class 
BigQueryDataTransferServiceStartTransferRunsOperator(GoogleCloudBaseOperat
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id="google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
diff --git a/airflow/providers/google/cloud/operators/cloud_build.py 
b/airflow/providers/google/cloud/operators/cloud_build.py
index 4242f561c1..14fed55a3a 100644
--- a/airflow/providers/google/cloud/operators/cloud_build.py
+++ b/airflow/providers/google/cloud/operators/cloud_build.py
@@ -28,6 +28,7 @@ from google.api_core.gapic_v1.method import DEFAULT, 
_MethodDefault
 from google.api_core.retry import Retry
 from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, 
RepoSource
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 from airflow.providers.google.cloud.links.cloud_build import (
@@ -176,7 +177,7 @@ class 
CloudBuildCreateBuildOperator(GoogleCloudBaseOperator):
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         poll_interval: float = 4.0,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         location: str = "global",
         **kwargs,
     ) -> None:
diff --git a/airflow/providers/google/cloud/operators/cloud_composer.py 
b/airflow/providers/google/cloud/operators/cloud_composer.py
index d04a1606fc..c9b52d8559 100644
--- a/airflow/providers/google/cloud/operators/cloud_composer.py
+++ b/airflow/providers/google/cloud/operators/cloud_composer.py
@@ -27,6 +27,7 @@ from google.cloud.orchestration.airflow.service_v1.types 
import Environment
 from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow import AirflowException
+from airflow.configuration import conf
 from airflow.providers.google.cloud.hooks.cloud_composer import 
CloudComposerHook
 from airflow.providers.google.cloud.links.base import BaseGoogleLink
 from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
@@ -135,7 +136,7 @@ class 
CloudComposerCreateEnvironmentOperator(GoogleCloudBaseOperator):
         retry: Retry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         pooling_period_seconds: int = 30,
         **kwargs,
     ) -> None:
@@ -264,7 +265,7 @@ class 
CloudComposerDeleteEnvironmentOperator(GoogleCloudBaseOperator):
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         pooling_period_seconds: int = 30,
         **kwargs,
     ) -> None:
@@ -509,7 +510,7 @@ class 
CloudComposerUpdateEnvironmentOperator(GoogleCloudBaseOperator):
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         pooling_period_seconds: int = 30,
         **kwargs,
     ) -> None:
diff --git a/airflow/providers/google/cloud/operators/cloud_sql.py 
b/airflow/providers/google/cloud/operators/cloud_sql.py
index 5c77cbd86c..b1144663c7 100644
--- a/airflow/providers/google/cloud/operators/cloud_sql.py
+++ b/airflow/providers/google/cloud/operators/cloud_sql.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Iterable, Mapping, Sequence
 
 from googleapiclient.errors import HttpError
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
@@ -955,7 +956,7 @@ class CloudSQLExportInstanceOperator(CloudSQLBaseOperator):
         api_version: str = "v1beta4",
         validate_body: bool = True,
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poke_interval: int = 10,
         **kwargs,
     ) -> None:
diff --git a/airflow/providers/google/cloud/operators/dataflow.py 
b/airflow/providers/google/cloud/operators/dataflow.py
index 5ae1115a34..a5e9588214 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -28,6 +28,7 @@ from functools import cached_property
 from typing import TYPE_CHECKING, Any, Sequence
 
 from airflow import AirflowException
+from airflow.configuration import conf
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType
 from airflow.providers.google.cloud.hooks.dataflow import (
@@ -419,7 +420,6 @@ class 
DataflowCreateJavaJobOperator(GoogleCloudBaseOperator):
                     variables=pipeline_options,
                 )
                 while is_running and self.check_if_running == 
CheckJobRunning.WaitForRun:
-
                     is_running = self.dataflow_hook.is_job_dataflow_running(
                         name=self.job_name,
                         variables=pipeline_options,
@@ -611,7 +611,7 @@ class 
DataflowTemplatedJobStartOperator(GoogleCloudBaseOperator):
         cancel_timeout: int | None = 10 * 60,
         wait_until_finished: bool | None = None,
         append_job_name: bool = True,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -801,7 +801,7 @@ class 
DataflowStartFlexTemplateOperator(GoogleCloudBaseOperator):
         cancel_timeout: int | None = 10 * 60,
         wait_until_finished: bool | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         append_job_name: bool = True,
         *args,
         **kwargs,
diff --git a/airflow/providers/google/cloud/operators/dataproc.py 
b/airflow/providers/google/cloud/operators/dataproc.py
index db7d785347..d14a495bc0 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -36,6 +36,7 @@ from google.cloud.dataproc_v1 import Batch, Cluster, 
ClusterStatus, JobStatus
 from google.protobuf.duration_pb2 import Duration
 from google.protobuf.field_mask_pb2 import FieldMask
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.google.cloud.hooks.dataproc import DataprocHook, 
DataProcJobBuilder
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -484,7 +485,7 @@ class 
DataprocCreateClusterOperator(GoogleCloudBaseOperator):
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         polling_interval_seconds: int = 10,
         **kwargs,
     ) -> None:
@@ -849,7 +850,7 @@ class 
DataprocDeleteClusterOperator(GoogleCloudBaseOperator):
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         polling_interval_seconds: int = 10,
         **kwargs,
     ):
@@ -981,7 +982,7 @@ class DataprocJobBaseOperator(GoogleCloudBaseOperator):
         job_error_states: set[str] | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         asynchronous: bool = False,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         polling_interval_seconds: int = 10,
         **kwargs,
     ) -> None:
@@ -1731,7 +1732,7 @@ class 
DataprocInstantiateWorkflowTemplateOperator(GoogleCloudBaseOperator):
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         polling_interval_seconds: int = 10,
         **kwargs,
     ) -> None:
@@ -1859,7 +1860,7 @@ class 
DataprocInstantiateInlineWorkflowTemplateOperator(GoogleCloudBaseOperator)
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         polling_interval_seconds: int = 10,
         **kwargs,
     ) -> None:
@@ -1979,7 +1980,7 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         asynchronous: bool = False,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         polling_interval_seconds: int = 10,
         cancel_on_kill: bool = True,
         wait_timeout: int | None = None,
@@ -2139,7 +2140,7 @@ class 
DataprocUpdateClusterOperator(GoogleCloudBaseOperator):
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         polling_interval_seconds: int = 10,
         **kwargs,
     ):
@@ -2270,7 +2271,7 @@ class 
DataprocCreateBatchOperator(GoogleCloudBaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         result_retry: Retry | _MethodDefault = DEFAULT,
         asynchronous: bool = False,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         polling_interval_seconds: int = 5,
         **kwargs,
     ):
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py 
b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index bf14828d87..086a7d99b7 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -26,6 +26,7 @@ from google.api_core.exceptions import AlreadyExists
 from google.cloud.container_v1.types import Cluster
 from kubernetes.client.models import V1Pod
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
 
@@ -34,6 +35,7 @@ try:
 except ImportError:
     # preserve backward compatibility for older versions of cncf.kubernetes 
provider
     from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+
 from airflow.providers.google.cloud.hooks.kubernetes_engine import GKEHook, 
GKEPodHook
 from airflow.providers.google.cloud.links.kubernetes_engine import (
     KubernetesEngineClusterLink,
@@ -108,7 +110,7 @@ class GKEDeleteClusterOperator(GoogleCloudBaseOperator):
         gcp_conn_id: str = "google_cloud_default",
         api_version: str = "v2",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: int = 10,
         **kwargs,
     ) -> None:
@@ -255,7 +257,7 @@ class GKECreateClusterOperator(GoogleCloudBaseOperator):
         api_version: str = "v2",
         impersonation_chain: str | Sequence[str] | None = None,
         poll_interval: int = 10,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
diff --git a/airflow/providers/google/cloud/operators/mlengine.py 
b/airflow/providers/google/cloud/operators/mlengine.py
index b776d20dff..20d9d19886 100644
--- a/airflow/providers/google/cloud/operators/mlengine.py
+++ b/airflow/providers/google/cloud/operators/mlengine.py
@@ -27,6 +27,7 @@ from typing import TYPE_CHECKING, Any, Sequence
 
 from googleapiclient.errors import HttpError
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.google.cloud.hooks.mlengine import MLEngineHook
 from airflow.providers.google.cloud.links.mlengine import (
@@ -722,7 +723,6 @@ class 
MLEngineCreateVersionOperator(GoogleCloudBaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
-
         super().__init__(**kwargs)
         self._project_id = project_id
         self._model_name = model_name
@@ -804,7 +804,6 @@ class 
MLEngineSetDefaultVersionOperator(GoogleCloudBaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
-
         super().__init__(**kwargs)
         self._project_id = project_id
         self._model_name = model_name
@@ -883,7 +882,6 @@ class MLEngineListVersionsOperator(GoogleCloudBaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
-
         super().__init__(**kwargs)
         self._project_id = project_id
         self._model_name = model_name
@@ -961,7 +959,6 @@ class 
MLEngineDeleteVersionOperator(GoogleCloudBaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
-
         super().__init__(**kwargs)
         self._project_id = project_id
         self._model_name = model_name
@@ -1098,7 +1095,7 @@ class 
MLEngineStartTrainingJobOperator(GoogleCloudBaseOperator):
         labels: dict[str, str] | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         hyperparameters: dict | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         cancel_on_kill: bool = True,
         **kwargs,
     ) -> None:
@@ -1370,7 +1367,6 @@ class 
MLEngineTrainingCancelJobOperator(GoogleCloudBaseOperator):
             raise AirflowException("Google Cloud project id is required.")
 
     def execute(self, context: Context):
-
         hook = MLEngineHook(
             gcp_conn_id=self._gcp_conn_id,
             impersonation_chain=self._impersonation_chain,
diff --git a/airflow/providers/google/cloud/sensors/bigquery.py 
b/airflow/providers/google/cloud/sensors/bigquery.py
index db109bf2c1..e4e1819ef1 100644
--- a/airflow/providers/google/cloud/sensors/bigquery.py
+++ b/airflow/providers/google/cloud/sensors/bigquery.py
@@ -22,6 +22,7 @@ import warnings
 from datetime import timedelta
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
 from airflow.providers.google.cloud.triggers.bigquery import (
@@ -71,7 +72,7 @@ class BigQueryTableExistenceSensor(BaseSensorOperator):
         table_id: str,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         if deferrable and "poke_interval" not in kwargs:
@@ -184,7 +185,7 @@ class 
BigQueryTablePartitionExistenceSensor(BaseSensorOperator):
         partition_id: str,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         if deferrable and "poke_interval" not in kwargs:
diff --git a/airflow/providers/google/cloud/sensors/gcs.py 
b/airflow/providers/google/cloud/sensors/gcs.py
index 08fd37022d..7048789601 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -27,6 +27,7 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence
 from google.api_core.retry import Retry
 from google.cloud.storage.retry import DEFAULT_RETRY
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.providers.google.cloud.triggers.gcs import (
@@ -76,10 +77,9 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
         google_cloud_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         retry: Retry = DEFAULT_RETRY,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
-
         super().__init__(**kwargs)
         self.bucket = bucket
         self.object = object
@@ -208,10 +208,9 @@ class GCSObjectUpdateSensor(BaseSensorOperator):
         ts_func: Callable = ts_function,
         google_cloud_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
-
         super().__init__(**kwargs)
         self.bucket = bucket
         self.object = object
@@ -298,7 +297,7 @@ class 
GCSObjectsWithPrefixExistenceSensor(BaseSensorOperator):
         prefix: str,
         google_cloud_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -412,10 +411,9 @@ class GCSUploadSessionCompleteSensor(BaseSensorOperator):
         allow_delete: bool = True,
         google_cloud_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
-
         super().__init__(**kwargs)
 
         self.bucket = bucket
diff --git a/airflow/providers/google/cloud/sensors/pubsub.py 
b/airflow/providers/google/cloud/sensors/pubsub.py
index 2e03b3669d..db9f39b19b 100644
--- a/airflow/providers/google/cloud/sensors/pubsub.py
+++ b/airflow/providers/google/cloud/sensors/pubsub.py
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence
 
 from google.cloud.pubsub_v1.types import ReceivedMessage
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
 from airflow.providers.google.cloud.triggers.pubsub import PubsubPullTrigger
@@ -103,10 +104,9 @@ class PubSubPullSensor(BaseSensorOperator):
         messages_callback: Callable[[list[ReceivedMessage], Context], Any] | 
None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         poke_interval: float = 10.0,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
-
         super().__init__(**kwargs)
         self.gcp_conn_id = gcp_conn_id
         self.project_id = project_id
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py 
b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index 7ec62db9bf..8836e3ee35 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -25,6 +25,7 @@ from google.api_core.retry import Retry
 from google.cloud.bigquery import DEFAULT_RETRY, UnknownJob
 
 from airflow import AirflowException
+from airflow.configuration import conf
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, 
BigQueryJob
 from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
@@ -114,7 +115,7 @@ class BigQueryToGCSOperator(BaseOperator):
         job_id: str | None = None,
         force_rerun: bool = False,
         reattach_states: set[str] | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py 
b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 88b6d09323..da462eae74 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -36,6 +36,7 @@ from google.cloud.bigquery import (
 from google.cloud.bigquery.table import EncryptionConfiguration, Table, 
TableReference
 
 from airflow import AirflowException
+from airflow.configuration import conf
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, 
BigQueryJob
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -218,7 +219,7 @@ class GCSToBigQueryOperator(BaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         labels=None,
         description=None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         result_retry: Retry = DEFAULT_RETRY,
         result_timeout: float | None = None,
         cancel_on_kill: bool = True,
@@ -228,7 +229,6 @@ class GCSToBigQueryOperator(BaseOperator):
         project_id: str | None = None,
         **kwargs,
     ) -> None:
-
         super().__init__(**kwargs)
         self.hook: BigQueryHook | None = None
         self.configuration: dict[str, Any] = {}
@@ -718,7 +718,6 @@ class GCSToBigQueryOperator(BaseOperator):
     def _cleanse_time_partitioning(
         self, destination_dataset_table: str | None, time_partitioning_in: 
dict | None
     ) -> dict:  # if it is a partitioned table ($ is in the table name) add 
partition load option
-
         if time_partitioning_in is None:
             time_partitioning_in = {}
 
diff --git a/airflow/providers/microsoft/azure/operators/data_factory.py 
b/airflow/providers/microsoft/azure/operators/data_factory.py
index 8906c02ae1..a2b2c528bf 100644
--- a/airflow/providers/microsoft/azure/operators/data_factory.py
+++ b/airflow/providers/microsoft/azure/operators/data_factory.py
@@ -20,6 +20,7 @@ import time
 import warnings
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 from airflow.models import BaseOperator, BaseOperatorLink, XCom
@@ -140,7 +141,7 @@ class AzureDataFactoryRunPipelineOperator(BaseOperator):
         parameters: dict[str, Any] | None = None,
         timeout: int = 60 * 60 * 24 * 7,
         check_interval: int = 60,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
diff --git a/airflow/providers/microsoft/azure/sensors/data_factory.py 
b/airflow/providers/microsoft/azure/sensors/data_factory.py
index 70ae1f69dc..b4ebedce69 100644
--- a/airflow/providers/microsoft/azure/sensors/data_factory.py
+++ b/airflow/providers/microsoft/azure/sensors/data_factory.py
@@ -20,6 +20,7 @@ import warnings
 from datetime import timedelta
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.microsoft.azure.hooks.data_factory import (
     AzureDataFactoryHook,
@@ -60,7 +61,7 @@ class 
AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
         azure_data_factory_conn_id: str = 
AzureDataFactoryHook.default_conn_name,
         resource_group_name: str | None = None,
         factory_name: str | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
diff --git a/airflow/providers/microsoft/azure/sensors/wasb.py 
b/airflow/providers/microsoft/azure/sensors/wasb.py
index 4e2ec2d502..0c227f2ea3 100644
--- a/airflow/providers/microsoft/azure/sensors/wasb.py
+++ b/airflow/providers/microsoft/azure/sensors/wasb.py
@@ -21,6 +21,7 @@ import warnings
 from datetime import timedelta
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
 from airflow.providers.microsoft.azure.triggers.wasb import 
WasbBlobSensorTrigger, WasbPrefixSensorTrigger
@@ -53,7 +54,7 @@ class WasbBlobSensor(BaseSensorOperator):
         wasb_conn_id: str = "wasb_default",
         check_options: dict | None = None,
         public_read: bool = False,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -151,7 +152,7 @@ class WasbPrefixSensor(BaseSensorOperator):
         prefix: str,
         wasb_conn_id: str = "wasb_default",
         check_options: dict | None = None,
-        deferrable: bool = False,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst 
b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
index b1e6c6be98..81526737e8 100644
--- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
@@ -56,6 +56,36 @@ Writing a deferrable operator takes a bit more work. There 
are some main points
 * You can defer multiple times, and you can defer before/after your Operator 
does significant work, or only defer if certain conditions are met (e.g. a 
system does not have an immediate answer). Deferral is entirely under your 
control.
 * Any Operator can defer; no special marking on its class is needed, and it's 
not limited to Sensors.
 * In order for any changes to a Trigger to be reflected, the *triggerer* needs 
to be restarted whenever the Trigger is modified.
+* If you want add an operator or sensor that supports both deferrable and 
non-deferrable modes. It's suggested to add ``deferable: bool = 
conf.getboolean("operators", "default_deferrable", fallback=False)`` to the 
``__init__`` method of the operator and use it to decide whether to run the 
operator in deferrable mode. You'll be able to configure the default value of 
``deferrable`` of all the operators and sensors that supports switch between 
deferrable and non-deferrable mode through ``de [...]
+
+    import time
+    from datetime import timedelta
+
+    from airflow.sensors.base import BaseSensorOperator
+    from airflow.triggers.temporal import TimeDeltaTrigger
+
+
+    class WaitOneHourSensor(BaseSensorOperator):
+        def __init__(
+            self,
+            deferable: bool = conf.getboolean("operators", 
"default_deferrable", fallback=False),
+            **kwargs
+        ):
+            super().__init__(**kwargs)
+            self.deferrable = deferable
+
+        def execute(self, context):
+            if deferrable:
+                self.defer(
+                    trigger=TimeDeltaTrigger(timedelta(hours=1)),
+                    method_name="execute_complete"
+                )
+            else:
+                time.sleep(3600)
+
+        def execute_complete(self, context, event=None):
+            # We have no more work to do here. Mark as complete.
+            return
 
 
 Triggering Deferral
diff --git a/tests/models/test_baseoperator.py 
b/tests/models/test_baseoperator.py
index 2b11817672..c49e0bb034 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -919,7 +919,6 @@ def test_render_template_fields_logging(
     caplog, monkeypatch, task, context, expected_exception, 
expected_rendering, expected_log, not_expected_log
 ):
     """Verify if operator attributes are correctly templated."""
-
     # Trigger templating and verify results
     def _do_render():
         task.render_template_fields(context=context)

Reply via email to