ferruzzi commented on code in PR #50677: URL: https://github.com/apache/airflow/pull/50677#discussion_r2103372721
########## airflow-core/src/airflow/models/deadline.py: ########## @@ -183,3 +156,36 @@ def serialize_deadline_alert(self): "callback_kwargs": self.callback_kwargs, } ) + + +@provide_session +def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: + """ + Fetch a datetime stored in the database. + + :param model_reference: SQLAlchemy Column reference (e.g., DagRun.logical_date, TaskInstance.queued_dttm, etc.) + :param conditions: Key-value pairs which are passed to the WHERE clause. + + :param session: SQLAlchemy session (provided by decorator) + """ + query = select(model_reference) + + for key, value in conditions.items(): + query = query.where(getattr(model_reference.class_, key) == value) + + # This should build a query similar to: + # session.scalar(select(DagRun.logical_date).where(DagRun.dag_id == dag_id)) + logger.debug("db query: session.scalar(%s)", query) + + try: + result = session.scalar(query) + except SQLAlchemyError as e: + logger.error("Database query failed: (%s)", str(e)) + raise + + if result is None: + message = "No matching record found in the database." + logger.error(message) Review Comment: I've updated all messaging and the docstring for this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org