This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bb89d488b3 Remove provide_session decorator from TaskInstancePydantic
methods (#37853)
bb89d488b3 is described below
commit bb89d488b335ada906b516217c66dffdfa732ac4
Author: Daniel Standish <[email protected]>
AuthorDate: Sat Mar 2 22:33:07 2024 -0800
Remove provide_session decorator from TaskInstancePydantic methods (#37853)
If we decorate these methods then the worker will try to create a session.
But there's no reason to do this. Sessions should be created on the static
methods invoked by the API server right?
---
airflow/serialization/pydantic/taskinstance.py | 26 ++++++++------------------
1 file changed, 8 insertions(+), 18 deletions(-)
diff --git a/airflow/serialization/pydantic/taskinstance.py
b/airflow/serialization/pydantic/taskinstance.py
index a818e3b3b3..01d9417ed6 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -35,7 +35,6 @@ from airflow.utils.pydantic import (
PlainValidator,
is_pydantic_2_installed,
)
-from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.xcom import XCOM_RETURN_KEY
if TYPE_CHECKING:
@@ -148,13 +147,12 @@ class TaskInstancePydantic(BaseModelPydantic,
LoggingMixin):
"""
return None
- @provide_session
def xcom_push(
self,
key: str,
value: Any,
execution_date: datetime | None = None,
- session: Session = NEW_SESSION,
+ session: Session | None = None,
) -> None:
"""
Push an XCom value for this task instance.
@@ -166,8 +164,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
"""
pass
- @provide_session
- def get_dagrun(self, session: Session = NEW_SESSION) -> DagRunPydantic:
+ def get_dagrun(self, session: Session | None = None) -> DagRunPydantic:
"""
Return the DagRun for this TaskInstance.
@@ -190,8 +187,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
return _execute_task(task_instance=self, context=context,
task_orig=task_orig)
- @provide_session
- def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update:
bool = False) -> None:
+ def refresh_from_db(self, session: Session | None = None, lock_for_update:
bool = False) -> None:
"""
Refresh the task instance from the database based on the primary key.
@@ -248,14 +244,13 @@ class TaskInstancePydantic(BaseModelPydantic,
LoggingMixin):
return _is_eligible_to_retry(task_instance=self)
- @provide_session
def handle_failure(
self,
error: None | str | Exception | KeyboardInterrupt,
test_mode: bool | None = None,
context: Context | None = None,
force_fail: bool = False,
- session: Session = NEW_SESSION,
+ session: Session | None = None,
) -> None:
"""
Handle Failure for a task instance.
@@ -288,7 +283,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
_refresh_from_task(task_instance=self, task=task,
pool_override=pool_override)
- @provide_session
def get_previous_dagrun(
self,
state: DagRunState | None = None,
@@ -304,11 +298,10 @@ class TaskInstancePydantic(BaseModelPydantic,
LoggingMixin):
return _get_previous_dagrun(task_instance=self, state=state,
session=session)
- @provide_session
def get_previous_execution_date(
self,
state: DagRunState | None = None,
- session: Session = NEW_SESSION,
+ session: Session | None = None,
) -> pendulum.DateTime | None:
"""
Return the execution date from property previous_ti_success.
@@ -344,11 +337,10 @@ class TaskInstancePydantic(BaseModelPydantic,
LoggingMixin):
return _get_email_subject_content(task_instance=self,
exception=exception, task=task)
- @provide_session
def get_previous_ti(
self,
state: DagRunState | None = None,
- session: Session = NEW_SESSION,
+ session: Session | None = None,
) -> TaskInstance | TaskInstancePydantic | None:
"""
Return the task instance for the task that ran before this task
instance.
@@ -360,7 +352,6 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
return _get_previous_ti(task_instance=self, state=state,
session=session)
- @provide_session
def check_and_change_state_before_execution(
self,
verbose: bool = True,
@@ -374,7 +365,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
job_id: str | None = None,
pool: str | None = None,
external_executor_id: str | None = None,
- session: Session = NEW_SESSION,
+ session: Session | None = None,
) -> bool:
return TaskInstance._check_and_change_state_before_execution(
task_instance=self,
@@ -393,8 +384,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
session=session,
)
- @provide_session
- def schedule_downstream_tasks(self, session: Session = NEW_SESSION,
max_tis_per_query: int | None = None):
+ def schedule_downstream_tasks(self, session: Session | None = None,
max_tis_per_query: int | None = None):
"""
Schedule downstream tasks of this task instance.