This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new de94c067486 Remove AIP-44 from airflow/sensors/base.py (#44518)
de94c067486 is described below
commit de94c067486c5df68648a069796e06137608a73e
Author: Jens Scheffler <[email protected]>
AuthorDate: Sat Nov 30 22:49:27 2024 +0100
Remove AIP-44 from airflow/sensors/base.py (#44518)
---
airflow/sensors/base.py | 50 +++++++++++++++----------------------------------
1 file changed, 15 insertions(+), 35 deletions(-)
diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 1c56aa42005..c0cf255cad8 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -29,7 +29,6 @@ from typing import TYPE_CHECKING, Any, Callable
from sqlalchemy import select
from airflow import settings
-from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
@@ -49,8 +48,6 @@ from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
if TYPE_CHECKING:
- from sqlalchemy.orm.session import Session
-
from airflow.utils.context import Context
# As documented in https://dev.mysql.com/doc/refman/5.7/en/datetime.html.
@@ -83,31 +80,6 @@ class PokeReturnValue:
return self.is_done
-@internal_api_call
-@provide_session
-def _orig_start_date(
- dag_id: str, task_id: str, run_id: str, map_index: int, try_number: int,
session: Session = NEW_SESSION
-):
- """
- Get the original start_date for a rescheduled task.
-
- :meta private:
- """
- return session.scalar(
- select(TaskReschedule)
- .where(
- TaskReschedule.dag_id == dag_id,
- TaskReschedule.task_id == task_id,
- TaskReschedule.run_id == run_id,
- TaskReschedule.map_index == map_index,
- TaskReschedule.try_number == try_number,
- )
- .order_by(TaskReschedule.id.asc())
- .with_only_columns(TaskReschedule.start_date)
- .limit(1)
- )
-
-
class BaseSensorOperator(BaseOperator, SkipMixin):
"""
Sensor operators are derived from this class and inherit these attributes.
@@ -239,7 +211,8 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
"""Override when deriving this class."""
raise AirflowException("Override me.")
- def execute(self, context: Context) -> Any:
+ @provide_session
+ def execute(self, context: Context, session=NEW_SESSION) -> Any:
started_at: datetime.datetime | float
if self.reschedule:
@@ -249,12 +222,19 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
# If reschedule, use the start date of the first try (first try
can be either the very
# first execution of the task, or the first execution after the
task was cleared.)
first_try_number = max_tries - retries + 1
- start_date = _orig_start_date(
- dag_id=ti.dag_id,
- task_id=ti.task_id,
- run_id=ti.run_id,
- map_index=ti.map_index,
- try_number=first_try_number,
+
+ start_date = session.scalar(
+ select(TaskReschedule)
+ .where(
+ TaskReschedule.dag_id == ti.dag_id,
+ TaskReschedule.task_id == ti.task_id,
+ TaskReschedule.run_id == ti.run_id,
+ TaskReschedule.map_index == ti.map_index,
+ TaskReschedule.try_number == first_try_number,
+ )
+ .order_by(TaskReschedule.id.asc())
+ .with_only_columns(TaskReschedule.start_date)
+ .limit(1)
)
if not start_date:
start_date = timezone.utcnow()