This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bb89d488b3 Remove provide_session decorator from TaskInstancePydantic 
methods (#37853)
bb89d488b3 is described below

commit bb89d488b335ada906b516217c66dffdfa732ac4
Author: Daniel Standish <[email protected]>
AuthorDate: Sat Mar 2 22:33:07 2024 -0800

    Remove provide_session decorator from TaskInstancePydantic methods (#37853)
    
    If we decorate these methods then the worker will try to create a session.  
But there's no reason to do this.  Sessions should be created on the static 
methods invoked by the API server right?
---
 airflow/serialization/pydantic/taskinstance.py | 26 ++++++++------------------
 1 file changed, 8 insertions(+), 18 deletions(-)

diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index a818e3b3b3..01d9417ed6 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -35,7 +35,6 @@ from airflow.utils.pydantic import (
     PlainValidator,
     is_pydantic_2_installed,
 )
-from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.xcom import XCOM_RETURN_KEY
 
 if TYPE_CHECKING:
@@ -148,13 +147,12 @@ class TaskInstancePydantic(BaseModelPydantic, 
LoggingMixin):
         """
         return None
 
-    @provide_session
     def xcom_push(
         self,
         key: str,
         value: Any,
         execution_date: datetime | None = None,
-        session: Session = NEW_SESSION,
+        session: Session | None = None,
     ) -> None:
         """
         Push an XCom value for this task instance.
@@ -166,8 +164,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
         """
         pass
 
-    @provide_session
-    def get_dagrun(self, session: Session = NEW_SESSION) -> DagRunPydantic:
+    def get_dagrun(self, session: Session | None = None) -> DagRunPydantic:
         """
         Return the DagRun for this TaskInstance.
 
@@ -190,8 +187,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
 
         return _execute_task(task_instance=self, context=context, 
task_orig=task_orig)
 
-    @provide_session
-    def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: 
bool = False) -> None:
+    def refresh_from_db(self, session: Session | None = None, lock_for_update: 
bool = False) -> None:
         """
         Refresh the task instance from the database based on the primary key.
 
@@ -248,14 +244,13 @@ class TaskInstancePydantic(BaseModelPydantic, 
LoggingMixin):
 
         return _is_eligible_to_retry(task_instance=self)
 
-    @provide_session
     def handle_failure(
         self,
         error: None | str | Exception | KeyboardInterrupt,
         test_mode: bool | None = None,
         context: Context | None = None,
         force_fail: bool = False,
-        session: Session = NEW_SESSION,
+        session: Session | None = None,
     ) -> None:
         """
         Handle Failure for a task instance.
@@ -288,7 +283,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
 
         _refresh_from_task(task_instance=self, task=task, 
pool_override=pool_override)
 
-    @provide_session
     def get_previous_dagrun(
         self,
         state: DagRunState | None = None,
@@ -304,11 +298,10 @@ class TaskInstancePydantic(BaseModelPydantic, 
LoggingMixin):
 
         return _get_previous_dagrun(task_instance=self, state=state, 
session=session)
 
-    @provide_session
     def get_previous_execution_date(
         self,
         state: DagRunState | None = None,
-        session: Session = NEW_SESSION,
+        session: Session | None = None,
     ) -> pendulum.DateTime | None:
         """
         Return the execution date from property previous_ti_success.
@@ -344,11 +337,10 @@ class TaskInstancePydantic(BaseModelPydantic, 
LoggingMixin):
 
         return _get_email_subject_content(task_instance=self, 
exception=exception, task=task)
 
-    @provide_session
     def get_previous_ti(
         self,
         state: DagRunState | None = None,
-        session: Session = NEW_SESSION,
+        session: Session | None = None,
     ) -> TaskInstance | TaskInstancePydantic | None:
         """
         Return the task instance for the task that ran before this task 
instance.
@@ -360,7 +352,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
 
         return _get_previous_ti(task_instance=self, state=state, 
session=session)
 
-    @provide_session
     def check_and_change_state_before_execution(
         self,
         verbose: bool = True,
@@ -374,7 +365,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
         job_id: str | None = None,
         pool: str | None = None,
         external_executor_id: str | None = None,
-        session: Session = NEW_SESSION,
+        session: Session | None = None,
     ) -> bool:
         return TaskInstance._check_and_change_state_before_execution(
             task_instance=self,
@@ -393,8 +384,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
             session=session,
         )
 
-    @provide_session
-    def schedule_downstream_tasks(self, session: Session = NEW_SESSION, 
max_tis_per_query: int | None = None):
+    def schedule_downstream_tasks(self, session: Session | None = None, 
max_tis_per_query: int | None = None):
         """
         Schedule downstream tasks of this task instance.
 

Reply via email to