Taragolis commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1306689074
##########
tests/ti_deps/deps/test_ready_to_reschedule_dep.py:
##########
@@ -64,118 +76,127 @@ def _get_mapped_task_reschedule(self, reschedule_date):
)
return reschedule
- def test_should_pass_if_ignore_in_reschedule_period_is_set(self):
+ def test_should_pass_if_ignore_in_reschedule_period_is_set(self,
mocked_find_last_for_task_instance):
+ mocked_find_last_for_task_instance.side_effect = NotExpectedCall
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
dep_context = DepContext(ignore_in_reschedule_period=True)
assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)
- def test_should_pass_if_not_reschedule_mode(self):
+ def test_should_pass_if_not_reschedule_mode(self,
mocked_find_last_for_task_instance):
+ mocked_find_last_for_task_instance.side_effect = NotExpectedCall
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
del ti.task.reschedule
assert ReadyToRescheduleDep().is_met(ti=ti)
- def test_should_pass_if_not_in_none_state(self):
+ def test_should_pass_if_not_in_none_state(self,
mocked_find_last_for_task_instance):
+ mocked_find_last_for_task_instance.side_effect = NotExpectedCall
ti = self._get_task_instance(State.UP_FOR_RETRY)
assert ReadyToRescheduleDep().is_met(ti=ti)
-
@patch("airflow.models.taskreschedule.TaskReschedule.query_for_task_instance")
- def test_should_pass_if_no_reschedule_record_exists(self,
mock_query_for_task_instance):
-
mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value
= []
+ def test_should_pass_if_no_reschedule_record_exists(self,
mocked_find_last_for_task_instance):
+ mocked_find_last_for_task_instance.return_value = None
ti = self._get_task_instance(State.NONE)
assert ReadyToRescheduleDep().is_met(ti=ti)
-
@patch("airflow.models.taskreschedule.TaskReschedule.query_for_task_instance")
- def test_should_pass_after_reschedule_date_one(self,
mock_query_for_task_instance):
-
mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value
= (
- self._get_task_reschedule(utcnow() - timedelta(minutes=1))
+ def test_should_pass_after_reschedule_date_one(self,
mocked_find_last_for_task_instance):
+ mocked_find_last_for_task_instance.return_value =
self._get_task_reschedule(
+ utcnow() - timedelta(minutes=1)
)
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
assert ReadyToRescheduleDep().is_met(ti=ti)
-
@patch("airflow.models.taskreschedule.TaskReschedule.query_for_task_instance")
- def test_should_pass_after_reschedule_date_multiple(self,
mock_query_for_task_instance):
-
mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value
= [
+ def test_should_pass_after_reschedule_date_multiple(self,
mocked_find_last_for_task_instance):
+ mocked_find_last_for_task_instance.side_effect = [
self._get_task_reschedule(utcnow() - timedelta(minutes=21)),
self._get_task_reschedule(utcnow() - timedelta(minutes=11)),
self._get_task_reschedule(utcnow() - timedelta(minutes=1)),
- ][-1]
+ ]
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
+ # All TaskReschedules meet requirements
+ assert ReadyToRescheduleDep().is_met(ti=ti)
+ assert ReadyToRescheduleDep().is_met(ti=ti)
assert ReadyToRescheduleDep().is_met(ti=ti)
Review Comment:
I'm not sure what kind of behaviour expected here before, so I've change
test for emulate sequential query over time
##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
:param try_number: Look for TaskReschedule of the given try_number.
Default is None which
looks for the same try_number of the given task_instance.
"""
- return TaskReschedule.query_for_task_instance(
- task_instance, session=session, try_number=try_number
- ).all()
+ return session.scalars(cls.stmt_for_task_instance(task_instance,
try_number=try_number)).all()
+
+ @classmethod
+ @provide_session
+ def find_last_for_task_instance(
+ cls,
+ task_instance: TaskInstance,
+ session: Session = NEW_SESSION,
+ try_number: int | None = None,
+ ) -> TaskReschedule | None:
+ """
+ Return last task reschedule for the task instance and try number.
+
+ :param session: the database session object
+ :param task_instance: the task instance to find task reschedules for
+ :param try_number: Look for TaskReschedule of the given try_number.
Default is None which
+ looks for the same try_number of the given task_instance.
+
+ :meta private:
+ """
+ return session.scalar(cls.stmt_for_task_instance(task_instance,
try_number=try_number, limit=1))
+
+ @classmethod
+ @provide_session
+ def find_date_for_task_instance(
+ cls,
+ task_instance: TaskInstance,
+ *,
+ try_number: int | None = None,
+ find_end_date: bool = False,
+ session: Session = NEW_SESSION,
+ ):
+ """
+ Return date for task reschedule for the task instance and try number.
+
+ :param task_instance: the task instance to find task reschedules for
+ :param try_number: Look for TaskReschedule of the given try_number.
Default is None which
+ looks for the same try_number of the given task_instance.
+ :param find_end_date: Should return `end_date` or `start_date`?
+ :param session: the database session object
+
+ :meta private:
+ """
+ return session.scalar(
+ cls.stmt_for_task_instance(
+ task_instance, try_number=try_number,
descending=find_end_date, limit=1
+ ).with_only_columns(cls.start_date if not find_end_date else
cls.start_date)
+ )
+
+ @staticmethod
+ @provide_session
+ def query_for_task_instance(
+ task_instance: TaskInstance,
+ descending: bool = False,
+ session: Session = NEW_SESSION,
+ try_number: int | None = None,
+ ) -> Query:
+ """
+ Return query for task reschedules for a given the task instance
(deprecated).
+
+ :param session: the database session object
+ :param task_instance: the task instance to find task reschedules for
+ :param descending: If True then records are returned in descending
order
+ :param try_number: Look for TaskReschedule of the given try_number.
Default is None which
+ looks for the same try_number of the given task_instance.
+ """
+ warnings.warn(
+ "`query_for_task_instance` use SQLAlchemy's Legacy Query API.",
+ category=RemovedInAirflow3Warning,
+ stacklevel=2,
+ )
+
+ if try_number is None:
+ try_number = task_instance.try_number
+
+ TR = TaskReschedule
+ qry = session.query(TR).filter(
+ TR.dag_id == task_instance.dag_id,
+ TR.task_id == task_instance.task_id,
+ TR.run_id == task_instance.run_id,
+ TR.map_index == task_instance.map_index,
+ TR.try_number == try_number,
+ )
+ if descending:
+ return qry.order_by(desc(TR.id))
+ else:
+ return qry.order_by(asc(TR.id))
Review Comment:
I've decided to keep `query_for_task_instance` as is, because if some one
use it before, than expects that this method return `sqlalchemy.orm.Query` and
there is no direct replacement for this object, which are implements same
methods.
If we think that this part never been a part of public interface, we could
remove it, this PR also remove usage of this method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]