This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b3f54e046a Implement get_dagrun on TaskInstancePydantic (#38295)
b3f54e046a is described below
commit b3f54e046a7f663a3a2e1754b8e2b1c153227df4
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Mar 20 01:02:31 2024 -0700
Implement get_dagrun on TaskInstancePydantic (#38295)
Co-authored-by: Vincent <[email protected]>
---
airflow/api_internal/endpoints/rpc_api_endpoint.py | 1 +
airflow/models/taskinstance.py | 13 +++++++++----
airflow/serialization/pydantic/taskinstance.py | 6 ++----
3 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index d0bb10117d..7a8f2edfdb 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -80,6 +80,7 @@ def _initialize_map() -> dict[str, Callable]:
SerializedDagModel.get_serialized_dag,
TaskInstance._check_and_change_state_before_execution,
TaskInstance.get_task_instance,
+ TaskInstance._get_dagrun,
TaskInstance.fetch_handle_failure_context,
TaskInstance.save_to_db,
TaskInstance._schedule_downstream_tasks,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6f8130c4aa..0d27404292 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2115,6 +2115,14 @@ class TaskInstance(Base, LoggingMixin):
"""Check on whether the task instance is in the right state and
timeframe to be retried."""
return self.state == TaskInstanceState.UP_FOR_RETRY and
self.next_retry_datetime() < timezone.utcnow()
+ @staticmethod
+ @internal_api_call
+ def _get_dagrun(dag_id, run_id, session) -> DagRun:
+ from airflow.models.dagrun import DagRun # Avoid circular import
+
+ dr = session.query(DagRun).filter(DagRun.dag_id == dag_id,
DagRun.run_id == run_id).one()
+ return dr
+
@provide_session
def get_dagrun(self, session: Session = NEW_SESSION) -> DagRun:
"""
@@ -2131,13 +2139,10 @@ class TaskInstance(Base, LoggingMixin):
self.dag_run.dag = self.task.dag
return self.dag_run
- from airflow.models.dagrun import DagRun # Avoid circular import
-
- dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id,
DagRun.run_id == self.run_id).one()
+ dr = self._get_dagrun(self.dag_id, self.run_id, session)
if getattr(self, "task", None) is not None:
if TYPE_CHECKING:
assert self.task
-
dr.dag = self.task.dag
# Record it in the instance for next time. This means that
`self.execution_date` will work correctly
set_committed_value(self, "dag_run", dr)
diff --git a/airflow/serialization/pydantic/taskinstance.py
b/airflow/serialization/pydantic/taskinstance.py
index 51379fdf6d..44db550e5b 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -176,11 +176,9 @@ class TaskInstancePydantic(BaseModelPydantic,
LoggingMixin):
:param session: SQLAlchemy ORM Session
- TODO: make it works for AIP-44
-
- :return: Pydantic serialized version of DaGrun
+ :return: Pydantic serialized version of DagRun
"""
- raise NotImplementedError()
+ return TaskInstance._get_dagrun(dag_id=self.dag_id,
run_id=self.run_id, session=session)
def _execute_task(self, context, task_orig):
"""