vincbeck commented on code in PR #28900:
URL: https://github.com/apache/airflow/pull/28900#discussion_r1247177551
##########
airflow/serialization/pydantic/dag_run.py:
##########
@@ -14,38 +14,80 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
from datetime import datetime
-from typing import List, Optional
+from typing import Iterable
from pendulum import DateTime
from pydantic import BaseModel as BaseModelPydantic
+from sqlalchemy import PickleType
+from sqlalchemy.orm import Session
-from airflow.serialization.pydantic.dataset import DatasetEventPydantic
+from airflow import DAG
+from airflow.jobs.scheduler_job_runner import TI
+from airflow.models.dagrun import DagRun, _get_previous_dagrun,
_get_previous_scheduled_dagrun
+from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.state import DagRunState, TaskInstanceState
class DagRunPydantic(BaseModelPydantic):
"""Serializable representation of the DagRun ORM SqlAlchemyModel used by
internal API."""
id: int
dag_id: str
- queued_at: Optional[datetime]
+ queued_at: datetime | None
execution_date: DateTime
- start_date: Optional[datetime]
- end_date: Optional[datetime]
+ start_date: datetime | None
+ end_date: datetime | None
state: str
run_id: str
- creating_job_id: Optional[int]
+ creating_job_id: int | None
external_trigger: bool
run_type: str
- data_interval_start: Optional[datetime]
- data_interval_end: Optional[datetime]
- last_scheduling_decision: Optional[datetime]
- dag_hash: Optional[str]
+ conf: PickleType
+ data_interval_start: datetime | None
+ data_interval_end: datetime | None
+ last_scheduling_decision: datetime | None
+ dag_hash: str | None
updated_at: datetime
- consumed_dataset_events: List[DatasetEventPydantic]
+ dag: DAG | None
class Config:
"""Make sure it deals automatically with SQLAlchemy ORM classes."""
orm_mode = True
+
+ @provide_session
+ def get_task_instances(
+ self,
+ state: Iterable[TaskInstanceState | None] | None = None,
+ session: Session = NEW_SESSION,
+ ) -> list[TI]:
+ """
+ Returns the task instances for this dag run
+
+ TODO: make it works for AIP-44
+ """
+ raise NotImplementedError()
+
+ @provide_session
+ def get_previous_scheduled_dagrun(self, session: Session = NEW_SESSION) ->
DagRun | None:
+ """
+ The previous, SCHEDULED DagRun, if there is one.
+
+ :param session: SQLAlchemy ORM Session
+ """
+ return _get_previous_scheduled_dagrun(self, session)
Review Comment:
Good catch! I converted the method then
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]