This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5cc5f43 Use original task's `start_date` if a task continues after
deferral (#20062)
5cc5f43 is described below
commit 5cc5f434d745452651bca76ba6d2406167b7e2b9
Author: Eugene Karimov <[email protected]>
AuthorDate: Wed Jan 5 19:19:44 2022 +0100
Use original task's `start_date` if a task continues after deferral (#20062)
---
airflow/models/taskinstance.py | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index fba12c1..ceead7f 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1207,7 +1207,8 @@ class TaskInstance(Base, LoggingMixin):
# Attempt 0 for the first attempt).
# Set the task start date. In case it was re-scheduled use the
initial
# start date that is recorded in task_reschedule table
- self.start_date = timezone.utcnow()
+ # If the task continues after being deferred (next_method is set),
use the original start_date
+ self.start_date = self.start_date if self.next_method else
timezone.utcnow()
if self.state == State.UP_FOR_RESCHEDULE:
task_reschedule: TR = TR.query_for_task_instance(self,
session=session).first()
if task_reschedule:
@@ -1533,6 +1534,8 @@ class TaskInstance(Base, LoggingMixin):
session.flush()
# Then, update ourselves so it matches the deferral request
+ # Keep an eye on the logic in
`check_and_change_state_before_execution()`
+ # depending on self.next_method semantics
self.state = State.DEFERRED
self.trigger_id = trigger_row.id
self.next_method = defer.method_name