ashb commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r501980867



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
             tis_changed = session \
                 .query(models.TaskInstance) \
                 .filter(
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
                     subq.c.execution_date) \
-                .update({models.TaskInstance.state: new_state}, 
synchronize_session=False)
+                .update({
+                    models.TaskInstance.state: new_state,
+                    models.TaskInstance.start_date: current_time,
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       This doesn't feel right in the case of Sensing or Up-for-reschedule 
tasks. Hmmm
   
   Maybe we want:
   
   ```suggestion
                       models.TaskInstance.state: new_state,
                       models.TaskInstance.start_date: 
func.coalesce(models.TaskInstance.start_date, func.now()),
                       models.TaskInstance.end_date: 
func.coalesce(models.TaskInstance.end_date, func.now()),
                       models.TaskInstance.duration: 
models.TaskInstance.end_date - models.TaskInstance.start_date,
   ```
   
   (I'm not sure if that last line works right -- i.e. does it use the 
pre-update values or post-update.)
   
   We can let the DB give us the time, no need to do a `utcnow()` this way.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0

Review comment:
       Shouldn't this be in ti.duration?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to