kaxil commented on a change in pull request #15389:
URL: https://github.com/apache/airflow/pull/15389#discussion_r638314079
##########
File path: airflow/exceptions.py
##########
@@ -225,3 +226,30 @@ def __str__(self):
class ConnectionNotUnique(AirflowException):
"""Raise when multiple values are found for the same conn_id"""
+
+
+class TaskDeferred(BaseException):
+ """
+ Special exception raised to signal that the operator it was raised from
+ wishes to defer until a trigger fires.
+ """
+
+ def __init__(
+ self,
+ *,
+ trigger,
+ method_name: str,
+ kwargs: Optional[Dict[str, Any]] = None,
+ timeout: Optional[datetime.timedelta] = None,
+ ):
+ super().__init__()
+ self.trigger = trigger
+ self.method_name = method_name
+ self.kwargs = kwargs
+ self.timeout = timeout
+ # Check timeout type at runtime
+ if self.timeout is not None and not hasattr(self.timeout,
"total_seconds"):
+ raise ValueError("Timeout value must be a timedelta")
Review comment:
Curious why not `isinstance(self.timeout, datetime.timedelta)`?
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1899,3 +1910,17 @@ def adopt_or_reset_orphaned_tasks(self, session: Session
= None):
raise
return len(to_reset)
+
+ @provide_session
+ def check_trigger_timeouts(self, session: Session = None):
+ """
+ Looks at all tasks that are in the "deferred" state and whose trigger
+ timeout has passed, so they can be marked as failed.
+ """
+ timed_out_tasks = session.query(TaskInstance).filter(
+ TaskInstance.state == State.DEFERRED, TaskInstance.trigger_timeout
< timezone.utcnow()
+ )
+ num_tasks = timed_out_tasks.count()
Review comment:
Should we change this to use `func.count` ? We will have an extra line
or two and some duplication but more reliable:
https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.count
https://stackoverflow.com/questions/14754994/why-is-sqlalchemy-count-much-slower-than-the-raw-query
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]