This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b3f54e046a Implement get_dagrun on TaskInstancePydantic (#38295)
b3f54e046a is described below

commit b3f54e046a7f663a3a2e1754b8e2b1c153227df4
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Mar 20 01:02:31 2024 -0700

    Implement get_dagrun on TaskInstancePydantic (#38295)
    
    Co-authored-by: Vincent <[email protected]>
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  1 +
 airflow/models/taskinstance.py                     | 13 +++++++++----
 airflow/serialization/pydantic/taskinstance.py     |  6 ++----
 3 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index d0bb10117d..7a8f2edfdb 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -80,6 +80,7 @@ def _initialize_map() -> dict[str, Callable]:
         SerializedDagModel.get_serialized_dag,
         TaskInstance._check_and_change_state_before_execution,
         TaskInstance.get_task_instance,
+        TaskInstance._get_dagrun,
         TaskInstance.fetch_handle_failure_context,
         TaskInstance.save_to_db,
         TaskInstance._schedule_downstream_tasks,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6f8130c4aa..0d27404292 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2115,6 +2115,14 @@ class TaskInstance(Base, LoggingMixin):
         """Check on whether the task instance is in the right state and 
timeframe to be retried."""
         return self.state == TaskInstanceState.UP_FOR_RETRY and 
self.next_retry_datetime() < timezone.utcnow()
 
+    @staticmethod
+    @internal_api_call
+    def _get_dagrun(dag_id, run_id, session) -> DagRun:
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
+        dr = session.query(DagRun).filter(DagRun.dag_id == dag_id, 
DagRun.run_id == run_id).one()
+        return dr
+
     @provide_session
     def get_dagrun(self, session: Session = NEW_SESSION) -> DagRun:
         """
@@ -2131,13 +2139,10 @@ class TaskInstance(Base, LoggingMixin):
                 self.dag_run.dag = self.task.dag
             return self.dag_run
 
-        from airflow.models.dagrun import DagRun  # Avoid circular import
-
-        dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, 
DagRun.run_id == self.run_id).one()
+        dr = self._get_dagrun(self.dag_id, self.run_id, session)
         if getattr(self, "task", None) is not None:
             if TYPE_CHECKING:
                 assert self.task
-
             dr.dag = self.task.dag
         # Record it in the instance for next time. This means that 
`self.execution_date` will work correctly
         set_committed_value(self, "dag_run", dr)
diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index 51379fdf6d..44db550e5b 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -176,11 +176,9 @@ class TaskInstancePydantic(BaseModelPydantic, 
LoggingMixin):
 
         :param session: SQLAlchemy ORM Session
 
-        TODO: make it works for AIP-44
-
-        :return: Pydantic serialized version of DaGrun
+        :return: Pydantic serialized version of DagRun
         """
-        raise NotImplementedError()
+        return TaskInstance._get_dagrun(dag_id=self.dag_id, 
run_id=self.run_id, session=session)
 
     def _execute_task(self, context, task_orig):
         """

Reply via email to