This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f8593503cb Add default_deferrable config (#31712)
f8593503cb is described below
commit f8593503cbe252c2f4dc5ff48a3f292c9e13baad
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jul 5 13:55:26 2023 +0800
Add default_deferrable config (#31712)
---
airflow/config_templates/config.yml | 7 +++++
airflow/config_templates/default_airflow.cfg | 3 +++
airflow/operators/trigger_dagrun.py | 6 ++---
airflow/providers/amazon/aws/operators/athena.py | 3 ++-
airflow/providers/amazon/aws/operators/batch.py | 5 ++--
airflow/providers/amazon/aws/operators/ecs.py | 19 +++++---------
airflow/providers/amazon/aws/operators/eks.py | 11 ++++----
airflow/providers/amazon/aws/operators/emr.py | 9 ++++---
airflow/providers/amazon/aws/operators/glue.py | 3 ++-
.../providers/amazon/aws/operators/glue_crawler.py | 3 ++-
airflow/providers/amazon/aws/operators/rds.py | 5 ++--
.../amazon/aws/operators/redshift_cluster.py | 12 ++++-----
.../providers/amazon/aws/operators/sagemaker.py | 11 ++++----
airflow/providers/amazon/aws/sensors/batch.py | 3 ++-
airflow/providers/amazon/aws/sensors/ec2.py | 3 ++-
airflow/providers/amazon/aws/sensors/emr.py | 7 ++---
airflow/providers/amazon/aws/sensors/s3.py | 7 ++---
airflow/providers/apache/livy/operators/livy.py | 4 +--
airflow/providers/cncf/kubernetes/operators/pod.py | 3 ++-
.../providers/databricks/operators/databricks.py | 5 ++--
airflow/providers/dbt/cloud/operators/dbt.py | 3 ++-
airflow/providers/dbt/cloud/sensors/dbt.py | 3 ++-
.../providers/google/cloud/operators/bigquery.py | 12 ++++-----
.../google/cloud/operators/bigquery_dts.py | 3 ++-
.../google/cloud/operators/cloud_build.py | 3 ++-
.../google/cloud/operators/cloud_composer.py | 7 ++---
.../providers/google/cloud/operators/cloud_sql.py | 3 ++-
.../providers/google/cloud/operators/dataflow.py | 6 ++---
.../providers/google/cloud/operators/dataproc.py | 17 ++++++------
.../google/cloud/operators/kubernetes_engine.py | 6 +++--
.../providers/google/cloud/operators/mlengine.py | 8 ++----
airflow/providers/google/cloud/sensors/bigquery.py | 5 ++--
airflow/providers/google/cloud/sensors/gcs.py | 12 ++++-----
airflow/providers/google/cloud/sensors/pubsub.py | 4 +--
.../google/cloud/transfers/bigquery_to_gcs.py | 3 ++-
.../google/cloud/transfers/gcs_to_bigquery.py | 5 ++--
.../microsoft/azure/operators/data_factory.py | 3 ++-
.../microsoft/azure/sensors/data_factory.py | 3 ++-
airflow/providers/microsoft/azure/sensors/wasb.py | 5 ++--
.../authoring-and-scheduling/deferring.rst | 30 ++++++++++++++++++++++
tests/models/test_baseoperator.py | 1 -
41 files changed, 160 insertions(+), 111 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 1ad8705a70..d588a7f26e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1305,6 +1305,13 @@ operators:
type: string
example: ~
default: "airflow"
+ default_deferrable:
+ description: |
+ The default value of attribute "deferrable" in operators and sensors.
+ version_added: ~
+ type: boolean
+ example: ~
+ default: "false"
default_cpus:
description: ~
version_added: ~
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 0cc99a3f4e..ae6bdec085 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -703,6 +703,9 @@ password =
# The default owner assigned to each new operator, unless
# provided explicitly or passed via ``default_args``
default_owner = airflow
+
+# The default value of attribute "deferrable" in operators and sensors.
+default_deferrable = false
default_cpus = 1
default_ram = 512
default_disk = 512
diff --git a/airflow/operators/trigger_dagrun.py
b/airflow/operators/trigger_dagrun.py
index 0165bb470e..548ef91894 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, Sequence, cast
from sqlalchemy.orm.exc import NoResultFound
from airflow.api.common.trigger_dag import trigger_dag
+from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagNotFound,
DagRunAlreadyExists
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
from airflow.models.dag import DagModel
@@ -113,7 +114,7 @@ class TriggerDagRunOperator(BaseOperator):
poke_interval: int = 60,
allowed_states: list | None = None,
failed_states: list | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -135,7 +136,6 @@ class TriggerDagRunOperator(BaseOperator):
self.execution_date = execution_date
def execute(self, context: Context):
-
if isinstance(self.execution_date, datetime.datetime):
parsed_execution_date = self.execution_date
elif isinstance(self.execution_date, str):
@@ -187,7 +187,6 @@ class TriggerDagRunOperator(BaseOperator):
ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)
if self.wait_for_completion:
-
# Kick off the deferral process
if self._defer:
self.defer(
@@ -219,7 +218,6 @@ class TriggerDagRunOperator(BaseOperator):
@provide_session
def execute_complete(self, context: Context, session: Session, event:
tuple[str, dict[str, Any]]):
-
# This execution date is parsed from the return trigger event
provided_execution_date = event[1]["execution_dates"][0]
try:
diff --git a/airflow/providers/amazon/aws/operators/athena.py
b/airflow/providers/amazon/aws/operators/athena.py
index 0467fe6d11..6dd1432ea4 100644
--- a/airflow/providers/amazon/aws/operators/athena.py
+++ b/airflow/providers/amazon/aws/operators/athena.py
@@ -21,6 +21,7 @@ from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence
from airflow import AirflowException
+from airflow.configuration import conf
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.athena import AthenaHook
from airflow.providers.amazon.aws.triggers.athena import AthenaTrigger
@@ -74,7 +75,7 @@ class AthenaOperator(BaseOperator):
sleep_time: int = 30,
max_polling_attempts: int | None = None,
log_query: bool = True,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
diff --git a/airflow/providers/amazon/aws/operators/batch.py
b/airflow/providers/amazon/aws/operators/batch.py
index b9b3322c49..9dd954d05c 100644
--- a/airflow/providers/amazon/aws/operators/batch.py
+++ b/airflow/providers/amazon/aws/operators/batch.py
@@ -29,6 +29,7 @@ from datetime import timedelta
from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
@@ -154,7 +155,7 @@ class BatchOperator(BaseOperator):
region_name: str | None = None,
tags: dict | None = None,
wait_for_completion: bool = True,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: int = 30,
awslogs_enabled: bool = False,
awslogs_fetch_interval: timedelta = timedelta(seconds=30),
@@ -437,7 +438,7 @@ class BatchCreateComputeEnvironmentOperator(BaseOperator):
max_retries: int | None = None,
aws_conn_id: str | None = None,
region_name: str | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
if "status_retries" in kwargs:
diff --git a/airflow/providers/amazon/aws/operators/ecs.py
b/airflow/providers/amazon/aws/operators/ecs.py
index 91533cfa62..e5833bf4c3 100644
--- a/airflow/providers/amazon/aws/operators/ecs.py
+++ b/airflow/providers/amazon/aws/operators/ecs.py
@@ -26,20 +26,14 @@ from typing import TYPE_CHECKING, Sequence
import boto3
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models import BaseOperator, XCom
from airflow.providers.amazon.aws.exceptions import EcsOperatorError,
EcsTaskFailToStart
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.providers.amazon.aws.hooks.ecs import (
- EcsClusterStates,
- EcsHook,
- should_retry_eni,
-)
+from airflow.providers.amazon.aws.hooks.ecs import EcsClusterStates, EcsHook,
should_retry_eni
from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
-from airflow.providers.amazon.aws.triggers.ecs import (
- ClusterWaiterTrigger,
- TaskDoneTrigger,
-)
+from airflow.providers.amazon.aws.triggers.ecs import ClusterWaiterTrigger,
TaskDoneTrigger
from airflow.providers.amazon.aws.utils.task_log_fetcher import
AwsTaskLogFetcher
from airflow.utils.helpers import prune_dict
from airflow.utils.session import provide_session
@@ -118,7 +112,7 @@ class EcsCreateClusterOperator(EcsBaseOperator):
wait_for_completion: bool = True,
waiter_delay: int = 15,
waiter_max_attempts: int = 60,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -201,7 +195,7 @@ class EcsDeleteClusterOperator(EcsBaseOperator):
wait_for_completion: bool = True,
waiter_delay: int = 15,
waiter_max_attempts: int = 60,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -482,7 +476,7 @@ class EcsRunTaskOperator(EcsBaseOperator):
wait_for_completion: bool = True,
waiter_delay: int = 6,
waiter_max_attempts: int = 100,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(**kwargs)
@@ -727,7 +721,6 @@ class EcsRunTaskOperator(EcsBaseOperator):
raise AirflowException(response)
for task in response["tasks"]:
-
if task.get("stopCode", "") == "TaskFailedToStart":
# Reset task arn here otherwise the retry run will not start
# a new task but keep polling the old dead one
diff --git a/airflow/providers/amazon/aws/operators/eks.py
b/airflow/providers/amazon/aws/operators/eks.py
index bea4223987..56e9269f88 100644
--- a/airflow/providers/amazon/aws/operators/eks.py
+++ b/airflow/providers/amazon/aws/operators/eks.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, List, Sequence, cast
from botocore.exceptions import ClientError, WaiterError
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.eks import EksHook
@@ -83,7 +84,6 @@ def _create_compute(
log = logging.getLogger(__name__)
eks_hook = EksHook(aws_conn_id=aws_conn_id, region_name=region)
if compute == "nodegroup" and nodegroup_name:
-
# this is to satisfy mypy
subnets = subnets or []
create_nodegroup_kwargs = create_nodegroup_kwargs or {}
@@ -107,7 +107,6 @@ def _create_compute(
status_args=["nodegroup.status"],
)
elif compute == "fargate" and fargate_profile_name:
-
# this is to satisfy mypy
create_fargate_profile_kwargs = create_fargate_profile_kwargs or {}
fargate_selectors = fargate_selectors or []
@@ -366,7 +365,7 @@ class EksCreateNodegroupOperator(BaseOperator):
region: str | None = None,
waiter_delay: int = 30,
waiter_max_attempts: int = 80,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
self.nodegroup_subnets = nodegroup_subnets
@@ -489,7 +488,7 @@ class EksCreateFargateProfileOperator(BaseOperator):
region: str | None = None,
waiter_delay: int = 10,
waiter_max_attempts: int = 60,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
self.cluster_name = cluster_name
@@ -690,7 +689,7 @@ class EksDeleteNodegroupOperator(BaseOperator):
region: str | None = None,
waiter_delay: int = 30,
waiter_max_attempts: int = 40,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
self.cluster_name = cluster_name
@@ -780,7 +779,7 @@ class EksDeleteFargateProfileOperator(BaseOperator):
region: str | None = None,
waiter_delay: int = 30,
waiter_max_attempts: int = 60,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
diff --git a/airflow/providers/amazon/aws/operators/emr.py
b/airflow/providers/amazon/aws/operators/emr.py
index f9eacdb79d..8330a586e4 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -24,6 +24,7 @@ from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence
from uuid import uuid4
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook,
EmrServerlessHook
@@ -96,7 +97,7 @@ class EmrAddStepsOperator(BaseOperator):
waiter_delay: int | None = None,
waiter_max_attempts: int | None = None,
execution_role_arn: str | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
if not exactly_one(job_flow_id is None, job_flow_name is None):
@@ -510,7 +511,7 @@ class EmrContainerOperator(BaseOperator):
max_tries: int | None = None,
tags: dict | None = None,
max_polling_attempts: int | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
@@ -695,7 +696,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
waiter_delay: int | None | ArgNotSet = NOTSET,
waiter_countdown: int | None = None,
waiter_check_interval_seconds: int = 60,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs: Any,
):
if waiter_max_attempts is NOTSET:
@@ -900,7 +901,7 @@ class EmrTerminateJobFlowOperator(BaseOperator):
aws_conn_id: str = "aws_default",
waiter_delay: int = 60,
waiter_max_attempts: int = 20,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(**kwargs)
diff --git a/airflow/providers/amazon/aws/operators/glue.py
b/airflow/providers/amazon/aws/operators/glue.py
index 060ac358a4..265d057de5 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -23,6 +23,7 @@ from functools import cached_property
from typing import TYPE_CHECKING, Sequence
from airflow import AirflowException
+from airflow.configuration import conf
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@@ -98,7 +99,7 @@ class GlueJobOperator(BaseOperator):
create_job_kwargs: dict | None = None,
run_job_kwargs: dict | None = None,
wait_for_completion: bool = True,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
verbose: bool = False,
update_config: bool = False,
job_poll_interval: int | float = 6,
diff --git a/airflow/providers/amazon/aws/operators/glue_crawler.py
b/airflow/providers/amazon/aws/operators/glue_crawler.py
index a7efb9f5c0..71e2607039 100644
--- a/airflow/providers/amazon/aws/operators/glue_crawler.py
+++ b/airflow/providers/amazon/aws/operators/glue_crawler.py
@@ -21,6 +21,7 @@ from functools import cached_property
from typing import TYPE_CHECKING, Sequence
from airflow import AirflowException
+from airflow.configuration import conf
from airflow.providers.amazon.aws.triggers.glue_crawler import
GlueCrawlerCompleteTrigger
if TYPE_CHECKING:
@@ -61,7 +62,7 @@ class GlueCrawlerOperator(BaseOperator):
region_name: str | None = None,
poll_interval: int = 5,
wait_for_completion: bool = True,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(**kwargs)
diff --git a/airflow/providers/amazon/aws/operators/rds.py
b/airflow/providers/amazon/aws/operators/rds.py
index 9aef670166..c58961db2e 100644
--- a/airflow/providers/amazon/aws/operators/rds.py
+++ b/airflow/providers/amazon/aws/operators/rds.py
@@ -24,6 +24,7 @@ from typing import TYPE_CHECKING, Sequence
from mypy_boto3_rds.type_defs import TagTypeDef
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.rds import RdsHook
@@ -554,7 +555,7 @@ class RdsCreateDbInstanceOperator(RdsBaseOperator):
rds_kwargs: dict | None = None,
aws_conn_id: str = "aws_default",
wait_for_completion: bool = True,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
waiter_delay: int = 30,
waiter_max_attempts: int = 60,
**kwargs,
@@ -645,7 +646,7 @@ class RdsDeleteDbInstanceOperator(RdsBaseOperator):
rds_kwargs: dict | None = None,
aws_conn_id: str = "aws_default",
wait_for_completion: bool = True,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
waiter_delay: int = 30,
waiter_max_attempts: int = 60,
**kwargs,
diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py
b/airflow/providers/amazon/aws/operators/redshift_cluster.py
index 905c34ff3a..cde4a32226 100644
--- a/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -20,6 +20,7 @@ import time
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Sequence
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
@@ -148,7 +149,7 @@ class RedshiftCreateClusterOperator(BaseOperator):
wait_for_completion: bool = False,
max_attempt: int = 5,
poll_interval: int = 60,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(**kwargs)
@@ -327,7 +328,7 @@ class RedshiftCreateClusterSnapshotOperator(BaseOperator):
poll_interval: int = 15,
max_attempt: int = 20,
aws_conn_id: str = "aws_default",
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(**kwargs)
@@ -470,7 +471,7 @@ class RedshiftResumeClusterOperator(BaseOperator):
cluster_identifier: str,
aws_conn_id: str = "aws_default",
wait_for_completion: bool = False,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: int = 10,
max_attempts: int = 10,
**kwargs,
@@ -560,7 +561,7 @@ class RedshiftPauseClusterOperator(BaseOperator):
*,
cluster_identifier: str,
aws_conn_id: str = "aws_default",
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: int = 10,
max_attempts: int = 15,
**kwargs,
@@ -647,7 +648,7 @@ class RedshiftDeleteClusterOperator(BaseOperator):
wait_for_completion: bool = True,
aws_conn_id: str = "aws_default",
poll_interval: int = 30,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
max_attempts: int = 30,
**kwargs,
):
@@ -668,7 +669,6 @@ class RedshiftDeleteClusterOperator(BaseOperator):
self.max_attempts = max_attempts
def execute(self, context: Context):
-
while self._attempts >= 1:
try:
self.redshift_hook.delete_cluster(
diff --git a/airflow/providers/amazon/aws/operators/sagemaker.py
b/airflow/providers/amazon/aws/operators/sagemaker.py
index 4dac7df007..ac1b7a73d2 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence
from botocore.exceptions import ClientError
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
@@ -198,7 +199,7 @@ class SageMakerProcessingOperator(SageMakerBaseOperator):
max_attempts: int | None = None,
max_ingestion_time: int | None = None,
action_if_job_exists: str = "timestamp",
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
@@ -392,7 +393,7 @@ class SageMakerEndpointOperator(SageMakerBaseOperator):
check_interval: int = CHECK_INTERVAL_SECOND,
max_ingestion_time: int | None = None,
operation: str = "create",
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
@@ -551,7 +552,7 @@ class SageMakerTransformOperator(SageMakerBaseOperator):
max_ingestion_time: int | None = None,
check_if_job_exists: bool = True,
action_if_job_exists: str = "timestamp",
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
@@ -700,7 +701,7 @@ class SageMakerTuningOperator(SageMakerBaseOperator):
wait_for_completion: bool = True,
check_interval: int = CHECK_INTERVAL_SECOND,
max_ingestion_time: int | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
@@ -862,7 +863,7 @@ class SageMakerTrainingOperator(SageMakerBaseOperator):
max_ingestion_time: int | None = None,
check_if_job_exists: bool = True,
action_if_job_exists: str = "timestamp",
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(config=config, aws_conn_id=aws_conn_id, **kwargs)
diff --git a/airflow/providers/amazon/aws/sensors/batch.py
b/airflow/providers/amazon/aws/sensors/batch.py
index 2033d1e86b..32da5b4cf2 100644
--- a/airflow/providers/amazon/aws/sensors/batch.py
+++ b/airflow/providers/amazon/aws/sensors/batch.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Any, Sequence
from deprecated import deprecated
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
from airflow.providers.amazon.aws.triggers.batch import BatchSensorTrigger
@@ -58,7 +59,7 @@ class BatchSensor(BaseSensorOperator):
job_id: str,
aws_conn_id: str = "aws_default",
region_name: str | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poke_interval: float = 5,
max_retries: int = 5,
**kwargs,
diff --git a/airflow/providers/amazon/aws/sensors/ec2.py
b/airflow/providers/amazon/aws/sensors/ec2.py
index c5d7761031..2b7b63f7e6 100644
--- a/airflow/providers/amazon/aws/sensors/ec2.py
+++ b/airflow/providers/amazon/aws/sensors/ec2.py
@@ -20,6 +20,7 @@ from __future__ import annotations
from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
from airflow.providers.amazon.aws.triggers.ec2 import EC2StateSensorTrigger
@@ -55,7 +56,7 @@ class EC2InstanceStateSensor(BaseSensorOperator):
instance_id: str,
aws_conn_id: str = "aws_default",
region_name: str | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
if target_state not in self.valid_states:
diff --git a/airflow/providers/amazon/aws/sensors/emr.py
b/airflow/providers/amazon/aws/sensors/emr.py
index 2f44caab06..9953dfa782 100644
--- a/airflow/providers/amazon/aws/sensors/emr.py
+++ b/airflow/providers/amazon/aws/sensors/emr.py
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING, Any, Iterable, Sequence
from deprecated import deprecated
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook,
EmrServerlessHook
from airflow.providers.amazon.aws.links.emr import EmrClusterLink,
EmrLogsLink, get_log_uri
@@ -271,7 +272,7 @@ class EmrContainerSensor(BaseSensorOperator):
max_retries: int | None = None,
aws_conn_id: str = "aws_default",
poll_interval: int = 10,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
@@ -425,7 +426,7 @@ class EmrJobFlowSensor(EmrBaseSensor):
target_states: Iterable[str] | None = None,
failed_states: Iterable[str] | None = None,
max_attempts: int = 60,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(**kwargs)
@@ -549,7 +550,7 @@ class EmrStepSensor(EmrBaseSensor):
target_states: Iterable[str] | None = None,
failed_states: Iterable[str] | None = None,
max_attempts: int = 60,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(**kwargs)
diff --git a/airflow/providers/amazon/aws/sensors/s3.py
b/airflow/providers/amazon/aws/sensors/s3.py
index 7192585afd..4d15cdc212 100644
--- a/airflow/providers/amazon/aws/sensors/s3.py
+++ b/airflow/providers/amazon/aws/sensors/s3.py
@@ -26,6 +26,8 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence,
cast
from deprecated import deprecated
+from airflow.configuration import conf
+
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -87,7 +89,7 @@ class S3KeySensor(BaseSensorOperator):
check_fn: Callable[..., bool] | None = None,
aws_conn_id: str = "aws_default",
verify: str | bool | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
):
super().__init__(**kwargs)
@@ -238,10 +240,9 @@ class S3KeysUnchangedSensor(BaseSensorOperator):
min_objects: int = 1,
previous_objects: set[str] | None = None,
allow_delete: bool = True,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
-
super().__init__(**kwargs)
self.bucket_name = bucket_name
diff --git a/airflow/providers/apache/livy/operators/livy.py
b/airflow/providers/apache/livy/operators/livy.py
index fa4f357343..f5e519315f 100644
--- a/airflow/providers/apache/livy/operators/livy.py
+++ b/airflow/providers/apache/livy/operators/livy.py
@@ -20,6 +20,7 @@ from __future__ import annotations
from time import sleep
from typing import TYPE_CHECKING, Any, Sequence
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.apache.livy.hooks.livy import BatchState, LivyHook
@@ -88,10 +89,9 @@ class LivyOperator(BaseOperator):
extra_options: dict[str, Any] | None = None,
extra_headers: dict[str, Any] | None = None,
retry_args: dict[str, Any] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs: Any,
) -> None:
-
super().__init__(**kwargs)
self.spark_params = {
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 696611c6c2..e3ac7708e7 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -33,6 +33,7 @@ from kubernetes.client import CoreV1Api, models as k8s
from slugify import slugify
from urllib3.exceptions import HTTPError
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
@@ -305,7 +306,7 @@ class KubernetesPodOperator(BaseOperator):
configmaps: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
base_container_name: str | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: float = 2,
log_pod_spec_on_failure: bool = True,
on_finish_action: str = "delete_pod",
diff --git a/airflow/providers/databricks/operators/databricks.py
b/airflow/providers/databricks/operators/databricks.py
index ab93d8f49b..fb27f0c01a 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -24,6 +24,7 @@ from functools import cached_property
from logging import Logger
from typing import TYPE_CHECKING, Any, Sequence
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models import BaseOperator, BaseOperatorLink, XCom
from airflow.providers.databricks.hooks.databricks import DatabricksHook,
RunState
@@ -315,7 +316,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
access_control_list: list[dict[str, str]] | None = None,
wait_for_termination: bool = True,
git_source: dict[str, str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
"""Creates a new ``DatabricksSubmitRunOperator``."""
@@ -605,7 +606,7 @@ class DatabricksRunNowOperator(BaseOperator):
databricks_retry_args: dict[Any, Any] | None = None,
do_xcom_push: bool = True,
wait_for_termination: bool = True,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
"""Creates a new ``DatabricksRunNowOperator``."""
diff --git a/airflow/providers/dbt/cloud/operators/dbt.py
b/airflow/providers/dbt/cloud/operators/dbt.py
index f316c47f3d..c977539afb 100644
--- a/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/airflow/providers/dbt/cloud/operators/dbt.py
@@ -22,6 +22,7 @@ import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Any
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, BaseOperatorLink, XCom
from airflow.providers.dbt.cloud.hooks.dbt import (
@@ -99,7 +100,7 @@ class DbtCloudRunJobOperator(BaseOperator):
timeout: int = 60 * 60 * 24 * 7,
check_interval: int = 60,
additional_run_config: dict[str, Any] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
diff --git a/airflow/providers/dbt/cloud/sensors/dbt.py
b/airflow/providers/dbt/cloud/sensors/dbt.py
index 5838f6d624..3b5ae549a3 100644
--- a/airflow/providers/dbt/cloud/sensors/dbt.py
+++ b/airflow/providers/dbt/cloud/sensors/dbt.py
@@ -20,6 +20,7 @@ import time
import warnings
from typing import TYPE_CHECKING, Any
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook,
DbtCloudJobRunException, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
@@ -50,7 +51,7 @@ class DbtCloudJobRunSensor(BaseSensorOperator):
dbt_cloud_conn_id: str = DbtCloudHook.default_conn_name,
run_id: int,
account_id: int | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
if deferrable:
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index 98929b6e6d..970e7813ed 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -29,6 +29,7 @@ from google.api_core.retry import Retry
from google.cloud.bigquery import DEFAULT_RETRY, CopyJob, ExtractJob, LoadJob,
QueryJob
from google.cloud.bigquery.table import RowIterator
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.models.xcom import XCom
@@ -200,7 +201,7 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin,
SQLCheckOperator):
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: float = 4.0,
**kwargs,
) -> None:
@@ -320,7 +321,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin,
SQLValueCheckOperator):
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: float = 4.0,
**kwargs,
) -> None:
@@ -460,7 +461,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperat
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: float = 4.0,
**kwargs,
) -> None:
@@ -854,7 +855,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
gcp_conn_id: str = "google_cloud_default",
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: float = 4.0,
as_dict: bool = False,
use_legacy_sql: bool = True,
@@ -1876,7 +1877,6 @@ class
BigQueryCreateEmptyDatasetOperator(GoogleCloudBaseOperator):
exists_ok: bool | None = None,
**kwargs,
) -> None:
-
self.dataset_id = dataset_id
self.project_id = project_id
self.location = location
@@ -2623,7 +2623,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
cancel_on_kill: bool = True,
result_retry: Retry = DEFAULT_RETRY,
result_timeout: float | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: float = 4.0,
**kwargs,
) -> None:
diff --git a/airflow/providers/google/cloud/operators/bigquery_dts.py
b/airflow/providers/google/cloud/operators/bigquery_dts.py
index d9e013afa6..e10618bc39 100644
--- a/airflow/providers/google/cloud/operators/bigquery_dts.py
+++ b/airflow/providers/google/cloud/operators/bigquery_dts.py
@@ -32,6 +32,7 @@ from google.cloud.bigquery_datatransfer_v1 import (
)
from airflow import AirflowException
+from airflow.configuration import conf
from airflow.providers.google.cloud.hooks.bigquery_dts import
BiqQueryDataTransferServiceHook, get_object_id
from airflow.providers.google.cloud.links.bigquery_dts import
BigQueryDataTransferConfigLink
from airflow.providers.google.cloud.operators.cloud_base import
GoogleCloudBaseOperator
@@ -279,7 +280,7 @@ class
BigQueryDataTransferServiceStartTransferRunsOperator(GoogleCloudBaseOperat
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id="google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
diff --git a/airflow/providers/google/cloud/operators/cloud_build.py
b/airflow/providers/google/cloud/operators/cloud_build.py
index 4242f561c1..14fed55a3a 100644
--- a/airflow/providers/google/cloud/operators/cloud_build.py
+++ b/airflow/providers/google/cloud/operators/cloud_build.py
@@ -28,6 +28,7 @@ from google.api_core.gapic_v1.method import DEFAULT,
_MethodDefault
from google.api_core.retry import Retry
from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger,
RepoSource
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
from airflow.providers.google.cloud.links.cloud_build import (
@@ -176,7 +177,7 @@ class
CloudBuildCreateBuildOperator(GoogleCloudBaseOperator):
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
poll_interval: float = 4.0,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
location: str = "global",
**kwargs,
) -> None:
diff --git a/airflow/providers/google/cloud/operators/cloud_composer.py
b/airflow/providers/google/cloud/operators/cloud_composer.py
index d04a1606fc..c9b52d8559 100644
--- a/airflow/providers/google/cloud/operators/cloud_composer.py
+++ b/airflow/providers/google/cloud/operators/cloud_composer.py
@@ -27,6 +27,7 @@ from google.cloud.orchestration.airflow.service_v1.types
import Environment
from google.protobuf.field_mask_pb2 import FieldMask
from airflow import AirflowException
+from airflow.configuration import conf
from airflow.providers.google.cloud.hooks.cloud_composer import
CloudComposerHook
from airflow.providers.google.cloud.links.base import BaseGoogleLink
from airflow.providers.google.cloud.operators.cloud_base import
GoogleCloudBaseOperator
@@ -135,7 +136,7 @@ class
CloudComposerCreateEnvironmentOperator(GoogleCloudBaseOperator):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
pooling_period_seconds: int = 30,
**kwargs,
) -> None:
@@ -264,7 +265,7 @@ class
CloudComposerDeleteEnvironmentOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
pooling_period_seconds: int = 30,
**kwargs,
) -> None:
@@ -509,7 +510,7 @@ class
CloudComposerUpdateEnvironmentOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
pooling_period_seconds: int = 30,
**kwargs,
) -> None:
diff --git a/airflow/providers/google/cloud/operators/cloud_sql.py
b/airflow/providers/google/cloud/operators/cloud_sql.py
index 5c77cbd86c..b1144663c7 100644
--- a/airflow/providers/google/cloud/operators/cloud_sql.py
+++ b/airflow/providers/google/cloud/operators/cloud_sql.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Iterable, Mapping, Sequence
from googleapiclient.errors import HttpError
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models import Connection
@@ -955,7 +956,7 @@ class CloudSQLExportInstanceOperator(CloudSQLBaseOperator):
api_version: str = "v1beta4",
validate_body: bool = True,
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poke_interval: int = 10,
**kwargs,
) -> None:
diff --git a/airflow/providers/google/cloud/operators/dataflow.py
b/airflow/providers/google/cloud/operators/dataflow.py
index 5ae1115a34..a5e9588214 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -28,6 +28,7 @@ from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence
from airflow import AirflowException
+from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType
from airflow.providers.google.cloud.hooks.dataflow import (
@@ -419,7 +420,6 @@ class
DataflowCreateJavaJobOperator(GoogleCloudBaseOperator):
variables=pipeline_options,
)
while is_running and self.check_if_running ==
CheckJobRunning.WaitForRun:
-
is_running = self.dataflow_hook.is_job_dataflow_running(
name=self.job_name,
variables=pipeline_options,
@@ -611,7 +611,7 @@ class
DataflowTemplatedJobStartOperator(GoogleCloudBaseOperator):
cancel_timeout: int | None = 10 * 60,
wait_until_finished: bool | None = None,
append_job_name: bool = True,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -801,7 +801,7 @@ class
DataflowStartFlexTemplateOperator(GoogleCloudBaseOperator):
cancel_timeout: int | None = 10 * 60,
wait_until_finished: bool | None = None,
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
append_job_name: bool = True,
*args,
**kwargs,
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index db7d785347..d14a495bc0 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -36,6 +36,7 @@ from google.cloud.dataproc_v1 import Batch, Cluster,
ClusterStatus, JobStatus
from google.protobuf.duration_pb2 import Duration
from google.protobuf.field_mask_pb2 import FieldMask
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.hooks.dataproc import DataprocHook,
DataProcJobBuilder
from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -484,7 +485,7 @@ class
DataprocCreateClusterOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
polling_interval_seconds: int = 10,
**kwargs,
) -> None:
@@ -849,7 +850,7 @@ class
DataprocDeleteClusterOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
polling_interval_seconds: int = 10,
**kwargs,
):
@@ -981,7 +982,7 @@ class DataprocJobBaseOperator(GoogleCloudBaseOperator):
job_error_states: set[str] | None = None,
impersonation_chain: str | Sequence[str] | None = None,
asynchronous: bool = False,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
polling_interval_seconds: int = 10,
**kwargs,
) -> None:
@@ -1731,7 +1732,7 @@ class
DataprocInstantiateWorkflowTemplateOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
polling_interval_seconds: int = 10,
**kwargs,
) -> None:
@@ -1859,7 +1860,7 @@ class
DataprocInstantiateInlineWorkflowTemplateOperator(GoogleCloudBaseOperator)
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
polling_interval_seconds: int = 10,
**kwargs,
) -> None:
@@ -1979,7 +1980,7 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
asynchronous: bool = False,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
polling_interval_seconds: int = 10,
cancel_on_kill: bool = True,
wait_timeout: int | None = None,
@@ -2139,7 +2140,7 @@ class
DataprocUpdateClusterOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
polling_interval_seconds: int = 10,
**kwargs,
):
@@ -2270,7 +2271,7 @@ class
DataprocCreateBatchOperator(GoogleCloudBaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
result_retry: Retry | _MethodDefault = DEFAULT,
asynchronous: bool = False,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
polling_interval_seconds: int = 5,
**kwargs,
):
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py
b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index bf14828d87..086a7d99b7 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -26,6 +26,7 @@ from google.api_core.exceptions import AlreadyExists
from google.cloud.container_v1.types import Cluster
from kubernetes.client.models import V1Pod
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
@@ -34,6 +35,7 @@ try:
except ImportError:
# preserve backward compatibility for older versions of cncf.kubernetes
provider
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import
KubernetesPodOperator
+
from airflow.providers.google.cloud.hooks.kubernetes_engine import GKEHook,
GKEPodHook
from airflow.providers.google.cloud.links.kubernetes_engine import (
KubernetesEngineClusterLink,
@@ -108,7 +110,7 @@ class GKEDeleteClusterOperator(GoogleCloudBaseOperator):
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v2",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: int = 10,
**kwargs,
) -> None:
@@ -255,7 +257,7 @@ class GKECreateClusterOperator(GoogleCloudBaseOperator):
api_version: str = "v2",
impersonation_chain: str | Sequence[str] | None = None,
poll_interval: int = 10,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
diff --git a/airflow/providers/google/cloud/operators/mlengine.py
b/airflow/providers/google/cloud/operators/mlengine.py
index b776d20dff..20d9d19886 100644
--- a/airflow/providers/google/cloud/operators/mlengine.py
+++ b/airflow/providers/google/cloud/operators/mlengine.py
@@ -27,6 +27,7 @@ from typing import TYPE_CHECKING, Any, Sequence
from googleapiclient.errors import HttpError
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.hooks.mlengine import MLEngineHook
from airflow.providers.google.cloud.links.mlengine import (
@@ -722,7 +723,6 @@ class
MLEngineCreateVersionOperator(GoogleCloudBaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
-
super().__init__(**kwargs)
self._project_id = project_id
self._model_name = model_name
@@ -804,7 +804,6 @@ class
MLEngineSetDefaultVersionOperator(GoogleCloudBaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
-
super().__init__(**kwargs)
self._project_id = project_id
self._model_name = model_name
@@ -883,7 +882,6 @@ class MLEngineListVersionsOperator(GoogleCloudBaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
-
super().__init__(**kwargs)
self._project_id = project_id
self._model_name = model_name
@@ -961,7 +959,6 @@ class
MLEngineDeleteVersionOperator(GoogleCloudBaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
-
super().__init__(**kwargs)
self._project_id = project_id
self._model_name = model_name
@@ -1098,7 +1095,7 @@ class
MLEngineStartTrainingJobOperator(GoogleCloudBaseOperator):
labels: dict[str, str] | None = None,
impersonation_chain: str | Sequence[str] | None = None,
hyperparameters: dict | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
cancel_on_kill: bool = True,
**kwargs,
) -> None:
@@ -1370,7 +1367,6 @@ class
MLEngineTrainingCancelJobOperator(GoogleCloudBaseOperator):
raise AirflowException("Google Cloud project id is required.")
def execute(self, context: Context):
-
hook = MLEngineHook(
gcp_conn_id=self._gcp_conn_id,
impersonation_chain=self._impersonation_chain,
diff --git a/airflow/providers/google/cloud/sensors/bigquery.py
b/airflow/providers/google/cloud/sensors/bigquery.py
index db109bf2c1..e4e1819ef1 100644
--- a/airflow/providers/google/cloud/sensors/bigquery.py
+++ b/airflow/providers/google/cloud/sensors/bigquery.py
@@ -22,6 +22,7 @@ import warnings
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Sequence
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.triggers.bigquery import (
@@ -71,7 +72,7 @@ class BigQueryTableExistenceSensor(BaseSensorOperator):
table_id: str,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
if deferrable and "poke_interval" not in kwargs:
@@ -184,7 +185,7 @@ class
BigQueryTablePartitionExistenceSensor(BaseSensorOperator):
partition_id: str,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
if deferrable and "poke_interval" not in kwargs:
diff --git a/airflow/providers/google/cloud/sensors/gcs.py
b/airflow/providers/google/cloud/sensors/gcs.py
index 08fd37022d..7048789601 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -27,6 +27,7 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence
from google.api_core.retry import Retry
from google.cloud.storage.retry import DEFAULT_RETRY
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.triggers.gcs import (
@@ -76,10 +77,9 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
google_cloud_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
retry: Retry = DEFAULT_RETRY,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
-
super().__init__(**kwargs)
self.bucket = bucket
self.object = object
@@ -208,10 +208,9 @@ class GCSObjectUpdateSensor(BaseSensorOperator):
ts_func: Callable = ts_function,
google_cloud_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
-
super().__init__(**kwargs)
self.bucket = bucket
self.object = object
@@ -298,7 +297,7 @@ class
GCSObjectsWithPrefixExistenceSensor(BaseSensorOperator):
prefix: str,
google_cloud_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -412,10 +411,9 @@ class GCSUploadSessionCompleteSensor(BaseSensorOperator):
allow_delete: bool = True,
google_cloud_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
-
super().__init__(**kwargs)
self.bucket = bucket
diff --git a/airflow/providers/google/cloud/sensors/pubsub.py
b/airflow/providers/google/cloud/sensors/pubsub.py
index 2e03b3669d..db9f39b19b 100644
--- a/airflow/providers/google/cloud/sensors/pubsub.py
+++ b/airflow/providers/google/cloud/sensors/pubsub.py
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING, Any, Callable, Sequence
from google.cloud.pubsub_v1.types import ReceivedMessage
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
from airflow.providers.google.cloud.triggers.pubsub import PubsubPullTrigger
@@ -103,10 +104,9 @@ class PubSubPullSensor(BaseSensorOperator):
messages_callback: Callable[[list[ReceivedMessage], Context], Any] |
None = None,
impersonation_chain: str | Sequence[str] | None = None,
poke_interval: float = 10.0,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
-
super().__init__(**kwargs)
self.gcp_conn_id = gcp_conn_id
self.project_id = project_id
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index 7ec62db9bf..8836e3ee35 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -25,6 +25,7 @@ from google.api_core.retry import Retry
from google.cloud.bigquery import DEFAULT_RETRY, UnknownJob
from airflow import AirflowException
+from airflow.configuration import conf
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook,
BigQueryJob
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
@@ -114,7 +115,7 @@ class BigQueryToGCSOperator(BaseOperator):
job_id: str | None = None,
force_rerun: bool = False,
reattach_states: set[str] | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 88b6d09323..da462eae74 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -36,6 +36,7 @@ from google.cloud.bigquery import (
from google.cloud.bigquery.table import EncryptionConfiguration, Table,
TableReference
from airflow import AirflowException
+from airflow.configuration import conf
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook,
BigQueryJob
from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -218,7 +219,7 @@ class GCSToBigQueryOperator(BaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
labels=None,
description=None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
result_retry: Retry = DEFAULT_RETRY,
result_timeout: float | None = None,
cancel_on_kill: bool = True,
@@ -228,7 +229,6 @@ class GCSToBigQueryOperator(BaseOperator):
project_id: str | None = None,
**kwargs,
) -> None:
-
super().__init__(**kwargs)
self.hook: BigQueryHook | None = None
self.configuration: dict[str, Any] = {}
@@ -718,7 +718,6 @@ class GCSToBigQueryOperator(BaseOperator):
def _cleanse_time_partitioning(
self, destination_dataset_table: str | None, time_partitioning_in:
dict | None
) -> dict: # if it is a partitioned table ($ is in the table name) add
partition load option
-
if time_partitioning_in is None:
time_partitioning_in = {}
diff --git a/airflow/providers/microsoft/azure/operators/data_factory.py
b/airflow/providers/microsoft/azure/operators/data_factory.py
index 8906c02ae1..a2b2c528bf 100644
--- a/airflow/providers/microsoft/azure/operators/data_factory.py
+++ b/airflow/providers/microsoft/azure/operators/data_factory.py
@@ -20,6 +20,7 @@ import time
import warnings
from typing import TYPE_CHECKING, Any, Sequence
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator, BaseOperatorLink, XCom
@@ -140,7 +141,7 @@ class AzureDataFactoryRunPipelineOperator(BaseOperator):
parameters: dict[str, Any] | None = None,
timeout: int = 60 * 60 * 24 * 7,
check_interval: int = 60,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
diff --git a/airflow/providers/microsoft/azure/sensors/data_factory.py
b/airflow/providers/microsoft/azure/sensors/data_factory.py
index 70ae1f69dc..b4ebedce69 100644
--- a/airflow/providers/microsoft/azure/sensors/data_factory.py
+++ b/airflow/providers/microsoft/azure/sensors/data_factory.py
@@ -20,6 +20,7 @@ import warnings
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Sequence
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.microsoft.azure.hooks.data_factory import (
AzureDataFactoryHook,
@@ -60,7 +61,7 @@ class
AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
azure_data_factory_conn_id: str =
AzureDataFactoryHook.default_conn_name,
resource_group_name: str | None = None,
factory_name: str | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
diff --git a/airflow/providers/microsoft/azure/sensors/wasb.py
b/airflow/providers/microsoft/azure/sensors/wasb.py
index 4e2ec2d502..0c227f2ea3 100644
--- a/airflow/providers/microsoft/azure/sensors/wasb.py
+++ b/airflow/providers/microsoft/azure/sensors/wasb.py
@@ -21,6 +21,7 @@ import warnings
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Sequence
+from airflow.configuration import conf
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.providers.microsoft.azure.triggers.wasb import
WasbBlobSensorTrigger, WasbPrefixSensorTrigger
@@ -53,7 +54,7 @@ class WasbBlobSensor(BaseSensorOperator):
wasb_conn_id: str = "wasb_default",
check_options: dict | None = None,
public_read: bool = False,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -151,7 +152,7 @@ class WasbPrefixSensor(BaseSensorOperator):
prefix: str,
wasb_conn_id: str = "wasb_default",
check_options: dict | None = None,
- deferrable: bool = False,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst
b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
index b1e6c6be98..81526737e8 100644
--- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
@@ -56,6 +56,36 @@ Writing a deferrable operator takes a bit more work. There
are some main points
* You can defer multiple times, and you can defer before/after your Operator
does significant work, or only defer if certain conditions are met (e.g. a
system does not have an immediate answer). Deferral is entirely under your
control.
* Any Operator can defer; no special marking on its class is needed, and it's
not limited to Sensors.
* In order for any changes to a Trigger to be reflected, the *triggerer* needs
to be restarted whenever the Trigger is modified.
+* If you want add an operator or sensor that supports both deferrable and
non-deferrable modes. It's suggested to add ``deferable: bool =
conf.getboolean("operators", "default_deferrable", fallback=False)`` to the
``__init__`` method of the operator and use it to decide whether to run the
operator in deferrable mode. You'll be able to configure the default value of
``deferrable`` of all the operators and sensors that supports switch between
deferrable and non-deferrable mode through ``de [...]
+
+ import time
+ from datetime import timedelta
+
+ from airflow.sensors.base import BaseSensorOperator
+ from airflow.triggers.temporal import TimeDeltaTrigger
+
+
+ class WaitOneHourSensor(BaseSensorOperator):
+ def __init__(
+ self,
+ deferable: bool = conf.getboolean("operators",
"default_deferrable", fallback=False),
+ **kwargs
+ ):
+ super().__init__(**kwargs)
+ self.deferrable = deferable
+
+ def execute(self, context):
+ if deferrable:
+ self.defer(
+ trigger=TimeDeltaTrigger(timedelta(hours=1)),
+ method_name="execute_complete"
+ )
+ else:
+ time.sleep(3600)
+
+ def execute_complete(self, context, event=None):
+ # We have no more work to do here. Mark as complete.
+ return
Triggering Deferral
diff --git a/tests/models/test_baseoperator.py
b/tests/models/test_baseoperator.py
index 2b11817672..c49e0bb034 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -919,7 +919,6 @@ def test_render_template_fields_logging(
caplog, monkeypatch, task, context, expected_exception,
expected_rendering, expected_log, not_expected_log
):
"""Verify if operator attributes are correctly templated."""
-
# Trigger templating and verify results
def _do_render():
task.render_template_fields(context=context)