This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 73b0ea1  Update BaseOperator type hints for retry_delay, 
max_retry_delay, dag. (#19142)
73b0ea1 is described below

commit 73b0ea18edb2bf8df79f11c7a7c746b2dc510861
Author: Eugene Nikolaiev <[email protected]>
AuthorDate: Fri Oct 29 10:33:07 2021 +0300

    Update BaseOperator type hints for retry_delay, max_retry_delay, dag. 
(#19142)
---
 airflow/models/baseoperator.py | 24 ++++++++++++++----------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 281e8de..197d681 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -77,6 +77,7 @@ from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.weight_rule import WeightRule
 
 if TYPE_CHECKING:
+    from airflow.models.dag import DAG
     from airflow.models.xcom_arg import XComArg
     from airflow.utils.task_group import TaskGroup
 
@@ -241,14 +242,17 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, 
metaclass=BaseOperatorMeta
     :param retries: the number of retries that should be performed before
         failing the task
     :type retries: int
-    :param retry_delay: delay between retries
-    :type retry_delay: datetime.timedelta
-    :param retry_exponential_backoff: allow progressive longer waits between
+    :param retry_delay: delay between retries, can be set as ``timedelta`` or
+        ``float`` seconds, which will be converted into ``timedelta``,
+        the default is ``timedelta(seconds=300)``.
+    :type retry_delay: datetime.timedelta or float
+    :param retry_exponential_backoff: allow progressively longer waits between
         retries by using exponential backoff algorithm on retry delay (delay
         will be converted into seconds)
     :type retry_exponential_backoff: bool
-    :param max_retry_delay: maximum delay interval between retries
-    :type max_retry_delay: datetime.timedelta
+    :param max_retry_delay: maximum delay interval between retries, can be set 
as
+        ``timedelta`` or ``float`` seconds, which will be converted into 
``timedelta``.
+    :type max_retry_delay: datetime.timedelta or float
     :param start_date: The ``start_date`` for the task, determines
         the ``execution_date`` for the first task instance. The best practice
         is to have the start_date rounded
@@ -486,14 +490,14 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, 
metaclass=BaseOperatorMeta
         email_on_retry: bool = conf.getboolean('email', 
'default_email_on_retry', fallback=True),
         email_on_failure: bool = conf.getboolean('email', 
'default_email_on_failure', fallback=True),
         retries: Optional[int] = conf.getint('core', 'default_task_retries', 
fallback=0),
-        retry_delay: timedelta = timedelta(seconds=300),
+        retry_delay: Union[timedelta, float] = timedelta(seconds=300),
         retry_exponential_backoff: bool = False,
-        max_retry_delay: Optional[timedelta] = None,
+        max_retry_delay: Optional[Union[timedelta, float]] = None,
         start_date: Optional[datetime] = None,
         end_date: Optional[datetime] = None,
         depends_on_past: bool = False,
         wait_for_downstream: bool = False,
-        dag=None,
+        dag: Optional['DAG'] = None,
         params: Optional[Dict] = None,
         default_args: Optional[Dict] = None,
         priority_weight: int = 1,
@@ -804,7 +808,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, 
metaclass=BaseOperatorMeta
         return self._outlets
 
     @property
-    def dag(self) -> Any:
+    def dag(self) -> 'DAG':
         """Returns the Operator's DAG if set, otherwise raises an error"""
         if self.has_dag():
             return self._dag
@@ -812,7 +816,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, 
metaclass=BaseOperatorMeta
             raise AirflowException(f'Operator {self} has not been assigned to 
a DAG yet')
 
     @dag.setter
-    def dag(self, dag: Any):
+    def dag(self, dag: Optional['DAG']):
         """
         Operators can be assigned to one DAG, one time. Repeat assignments to
         that same DAG are ok.

Reply via email to