ferruzzi commented on code in PR #62688:
URL: https://github.com/apache/airflow/pull/62688#discussion_r2943662296
##########
airflow-core/src/airflow/serialization/definitions/deadline.py:
##########
@@ -184,6 +170,33 @@ def _evaluate_with(self, *, session: Session, **kwargs:
Any) -> datetime | None:
return _fetch_from_db(DagRun.queued_at, session=session, **kwargs)
+ class JobStartDateDeadline(SerializedBaseDeadlineReference):
+ """A deadline that returns a background Job's start date."""
+
+ required_kwargs: set[str] = set()
+
+ def _evaluate_with(self, *, session: Session, **kwargs: Any) ->
datetime | None:
+ """Find the baseline timestamp by querying the latest running
SchedulerJob."""
+ # 1. Construct a query to look into the 'job' table.
Review Comment:
Claude comments? You can clean these up.
##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -482,6 +478,36 @@ def deserialize_reference(cls, reference_data: dict):
min_runs=min_runs,
)
+ class JobStartDateDeadline(BaseDeadlineReference):
+ """A deadline that returns the start date of the latest active
SchedulerJob."""
+
+ # By setting this to an empty set, we tell the system this reference
+ # doesn't need an 'id' or any other data from the DAG definition.
+ required_kwargs: set[str] = set()
+
+ def _evaluate_with(self, *, session: Session, **kwargs: Any) ->
datetime | None:
+ """Find the start date of the most recently started running
SchedulerJob."""
+ from sqlalchemy import desc
+
+ from airflow.jobs.job import Job
+
+ # We query the job table directly for the newest SchedulerJob that
is still 'running'.
+ # This ensures we are always measuring against the current active
platform engine.
+ stmt = (
+ select(Job.start_date)
+ .where(
+ Job.job_type == "SchedulerJob",
+ Job.state == "running",
+ )
+ .order_by(desc(Job.start_date))
+ .limit(1)
+ )
Review Comment:
Here and below, are you able to make use of the existing `_fetch_from_db`
that the other references use?
##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -482,6 +478,36 @@ def deserialize_reference(cls, reference_data: dict):
min_runs=min_runs,
)
+ class JobStartDateDeadline(BaseDeadlineReference):
+ """A deadline that returns the start date of the latest active
SchedulerJob."""
+
+ # By setting this to an empty set, we tell the system this reference
+ # doesn't need an 'id' or any other data from the DAG definition.
+ required_kwargs: set[str] = set()
Review Comment:
I don't think we need this if there aren't any required kwargs, do we?
Pretty sure it'll just inherit this from BaseDeadlineReference.
That said, shouldn't this be based off of a job_id like you had it before?
I'm not sure I understand this change.
##########
airflow-core/src/airflow/serialization/definitions/deadline.py:
##########
@@ -184,6 +170,33 @@ def _evaluate_with(self, *, session: Session, **kwargs:
Any) -> datetime | None:
return _fetch_from_db(DagRun.queued_at, session=session, **kwargs)
+ class JobStartDateDeadline(SerializedBaseDeadlineReference):
+ """A deadline that returns a background Job's start date."""
+
+ required_kwargs: set[str] = set()
+
+ def _evaluate_with(self, *, session: Session, **kwargs: Any) ->
datetime | None:
+ """Find the baseline timestamp by querying the latest running
SchedulerJob."""
+ # 1. Construct a query to look into the 'job' table.
+ # We target 'SchedulerJob' specifically because this is the primary
+ # engine responsible for DAG execution and heartbeat management.
+ from sqlalchemy import desc
+
+ from airflow.jobs.job import Job
+
+ stmt = (
+ select(Job.start_date)
+ .where(Job.job_type == "SchedulerJob") # Remove state ==
"running"
Review Comment:
Not sure I understand this comment. Is it leftover from planning, maybe?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]