ferruzzi commented on code in PR #55088:
URL: https://github.com/apache/airflow/pull/55088#discussion_r2350244092


##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -366,6 +366,62 @@ def _evaluate_with(self, *, session: Session, **kwargs: 
Any) -> datetime:
 
             return _fetch_from_db(DagRun.queued_at, session=session, **kwargs)
 
+    @dataclass
+    class AverageRuntimeDeadline(BaseDeadlineReference):
+        """A deadline that calculates the average runtime from past DAG 
runs."""
+
+        DEFAULT_LIMIT = 10
+        limit: int
+        required_kwargs = {"dag_id"}
+
+        @provide_session
+        def _evaluate_with(self, *, session: Session, **kwargs: Any) -> 
datetime:
+            from airflow.models import DagRun
+
+            dag_id = kwargs["dag_id"]
+
+            # Query for completed DAG runs with both start and end dates
+            # Order by logical_date descending to get most recent runs first
+            query = (
+                select(func.extract("epoch", DagRun.end_date - 
DagRun.start_date))
+                .filter(DagRun.dag_id == dag_id, 
DagRun.start_date.isnot(None), DagRun.end_date.isnot(None))
+                .order_by(DagRun.logical_date.desc())
+            )
+
+            # Apply limit
+            query = query.limit(self.limit)
+
+            # Get all durations and calculate average
+            durations = session.execute(query).scalars().all()
+
+            if len(durations) < self.limit:
+                logger.warning(
+                    "In the AverageRuntimeDeadline: Only %d completed DAG runs 
found for dag_id: %s (need %d), using 48 hour default",
+                    len(durations),
+                    dag_id,
+                    self.limit,
+                )
+                avg_seconds = 48 * 3600  # 48 hours as default

Review Comment:
   This is in _evaluate_with which returns the calculated deadline timestamp 
which then gets written to the `deadline` database, right?  Then, later, the 
scheduler checks this table to see if any of these calculated values are in the 
past and handles any that are.  An easy answer then could be to return a 
specific datetime which the scheduler never considers a miss, maybe 
`datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0, 0)` but this feels like a 
hack... I think the better way would be to add a mechanism to skip the "write 
the calculated deadline to the DB" step if there is no historical data.  That 
could maybe be as simple as making _evaluate_with return `datetime | None` and 
then check `if None` before adding it to the deadline table?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to