ashb commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r501980867
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
tis_to_change: List[TI] = query.with_for_update().all()
for ti in tis_to_change:
ti.set_state(new_state, session=session)
+ ti.duration = 0
tis_changed += 1
else:
subq = query.subquery()
+ current_time = timezone.utcnow()
tis_changed = session \
.query(models.TaskInstance) \
.filter(
models.TaskInstance.dag_id == subq.c.dag_id,
models.TaskInstance.task_id == subq.c.task_id,
models.TaskInstance.execution_date ==
subq.c.execution_date) \
- .update({models.TaskInstance.state: new_state},
synchronize_session=False)
+ .update({
+ models.TaskInstance.state: new_state,
+ models.TaskInstance.start_date: current_time,
+ models.TaskInstance.end_date: current_time,
+ models.TaskInstance.duration: 0,
Review comment:
This doesn't feel right in the case of Sensing or Up-for-reschedule
tasks. Hmmm
Maybe we want:
```suggestion
models.TaskInstance.state: new_state,
models.TaskInstance.start_date:
func.coalesce(models.TaskInstance.start_date, current_time),
models.TaskInstance.end_date:
func.coalesce(models.TaskInstance.end_date, current_time),
models.TaskInstance.duration:
models.TaskInstance.end_date - models.TaskInstance.start_date,
```
(I'm not sure if that last line works right -- i.e. does it use the
pre-update values or post-update.)
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
tis_to_change: List[TI] = query.with_for_update().all()
for ti in tis_to_change:
ti.set_state(new_state, session=session)
+ ti.duration = 0
tis_changed += 1
else:
subq = query.subquery()
+ current_time = timezone.utcnow()
tis_changed = session \
.query(models.TaskInstance) \
.filter(
models.TaskInstance.dag_id == subq.c.dag_id,
models.TaskInstance.task_id == subq.c.task_id,
models.TaskInstance.execution_date ==
subq.c.execution_date) \
- .update({models.TaskInstance.state: new_state},
synchronize_session=False)
+ .update({
+ models.TaskInstance.state: new_state,
+ models.TaskInstance.start_date: current_time,
+ models.TaskInstance.end_date: current_time,
+ models.TaskInstance.duration: 0,
Review comment:
This doesn't feel right in the case of Sensing or Up-for-reschedule
tasks. Hmmm
Maybe we want:
```suggestion
models.TaskInstance.state: new_state,
models.TaskInstance.start_date:
func.coalesce(models.TaskInstance.start_date, func.now()),
models.TaskInstance.end_date:
func.coalesce(models.TaskInstance.end_date, func.now()),
models.TaskInstance.duration:
models.TaskInstance.end_date - models.TaskInstance.start_date,
```
(I'm not sure if that last line works right -- i.e. does it use the
pre-update values or post-update.)
We can let the DB give us the time, no need to do a `utcnow()` this way.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
tis_to_change: List[TI] = query.with_for_update().all()
for ti in tis_to_change:
ti.set_state(new_state, session=session)
+ ti.duration = 0
Review comment:
Shouldn't this be in ti.duration?
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
tis_changed += 1
else:
subq = query.subquery()
+ current_time = timezone.utcnow()
+ ti_prop_update = {
+ models.TaskInstance.state: new_state,
+ models.TaskInstance.start_date: current_time,
+ }
+
+ # Only add end_date and duration if the new_state is 'success',
'failed' or 'skipped'
+ if new_state in State.finished():
+ ti_prop_update.update({
+ models.TaskInstance.end_date: current_time,
+ models.TaskInstance.duration: 0,
Review comment:
```suggestion
models.TaskInstance.duration:
models.TaskInstance.end_date - models.TaskInstance.start_date,
```
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
tis_changed += 1
else:
subq = query.subquery()
+ current_time = timezone.utcnow()
+ ti_prop_update = {
+ models.TaskInstance.state: new_state,
+ models.TaskInstance.start_date: current_time,
+ }
+
+ # Only add end_date and duration if the new_state is 'success',
'failed' or 'skipped'
+ if new_state in State.finished():
+ ti_prop_update.update({
+ models.TaskInstance.end_date: current_time,
+ models.TaskInstance.duration: 0,
Review comment:
If the task is in sensing state it won't be 0 is my thought.
But this is not trivial to do portably it turns out: On postgres this is
`EXTRACT(EPOCH from end_date - start_date)` -- likely not worth it.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
tis_changed += 1
else:
subq = query.subquery()
+ current_time = timezone.utcnow()
+ ti_prop_update = {
+ models.TaskInstance.state: new_state,
+ models.TaskInstance.start_date: current_time,
+ }
+
+ # Only add end_date and duration if the new_state is 'success',
'failed' or 'skipped'
+ if new_state in State.finished():
+ ti_prop_update.update({
+ models.TaskInstance.end_date: current_time,
+ models.TaskInstance.duration: 0,
Review comment:
If it's finished, then won't it already have a duration?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]