Ashish0253 commented on code in PR #62688:
URL: https://github.com/apache/airflow/pull/62688#discussion_r2955159236


##########
airflow-core/src/airflow/serialization/definitions/deadline.py:
##########
@@ -184,6 +170,33 @@ def _evaluate_with(self, *, session: Session, **kwargs: 
Any) -> datetime | None:
 
             return _fetch_from_db(DagRun.queued_at, session=session, **kwargs)
 
+    class JobStartDateDeadline(SerializedBaseDeadlineReference):
+        """A deadline that returns a background Job's start date."""
+
+        required_kwargs: set[str] = set()
+
+        def _evaluate_with(self, *, session: Session, **kwargs: Any) -> 
datetime | None:
+            """Find the baseline timestamp by querying the latest running 
SchedulerJob."""
+            # 1. Construct a query to look into the 'job' table.
+            # We target 'SchedulerJob' specifically because this is the primary
+            # engine responsible for DAG execution and heartbeat management.
+            from sqlalchemy import desc
+
+            from airflow.jobs.job import Job
+
+            stmt = (
+                select(Job.start_date)
+                .where(Job.job_type == "SchedulerJob")  # Remove state == 
"running"

Review Comment:
   there are multiple records in job table with running and failed, so if the 
latest record for any kind of job is "failed", in the next commit I will log 
error for this scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to