kaxil commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502006285



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0

Review comment:
       in `ti.set_state` right? fixed in 
https://github.com/apache/airflow/pull/11358/commits/3b0d977d413b38e6b394287a6cbc85a6bc25b414

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
             tis_changed = session \
                 .query(models.TaskInstance) \
                 .filter(
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
                     subq.c.execution_date) \
-                .update({models.TaskInstance.state: new_state}, 
synchronize_session=False)
+                .update({
+                    models.TaskInstance.state: new_state,
+                    models.TaskInstance.start_date: current_time,
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       >This doesn't feel right in the case of Sensing or Up-for-reschedule 
tasks. Hmmm
   
   That is true, we should only set end_date and duration when the new TI state 
is either in one of "failed", "success" or "skipped". fixed in 
https://github.com/apache/airflow/pull/11358/commits/3b0d977d413b38e6b394287a6cbc85a6bc25b414
   
   >We can let the DB give us the time, no need to do a utcnow() this way.
   
   I did that to have parity between MySQL, Sqlite and Postgres as for MySQL 
and Sqlite `ti.set_state` is called which also uses `timezone.utcnow()`
   

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 
'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       Errors with the following, this will always be 0 so I think setting it 
to 0 here explicitly is fine. 
   
   ```
       cursor.execute(statement, parameters)
   sqlalchemy.exc.ProgrammingError: (psycopg2.errors.DatatypeMismatch) column 
"duration" is of type double precision but expression is of type interval
   LINE 1: ...09T09:37:10.897145+00:00'::timestamptz, duration=(task_insta...
                                                                ^
   HINT:  You will need to rewrite or cast the expression.
   
   [SQL: UPDATE task_instance SET start_date=%(start_date)s, 
end_date=%(end_date)s, duration=(task_instance.end_date - 
task_instance.start_date), state=%(state)s FROM (SELECT 
task_instance.try_number AS try_number, task_instance.task_id AS task_id, 
task_instance.dag_id AS dag_id, task_instance.execution_date AS execution_date, 
task_instance.start_date AS start_date, task_instance.end_date AS end_date, 
task_instance.duration AS duration, task_instance.state AS state, 
task_instance.max_tries AS max_tries, task_instance.hostname AS hostname, 
task_instance.unixname AS unixname, task_instance.job_id AS job_id, 
task_instance.pool AS pool, task_instance.pool_slots AS pool_slots, 
task_instance.queue AS queue, task_instance.priority_weight AS priority_weight, 
task_instance.operator AS operator, task_instance.queued_dttm AS queued_dttm, 
task_instance.queued_by_job_id AS queued_by_job_id, task_instance.pid AS pid, 
task_instance.executor_config AS executor_config, 
task_instance.external_executor
 _id AS external_executor_id
   FROM task_instance LEFT OUTER JOIN dag_run ON task_instance.dag_id = 
dag_run.dag_id AND task_instance.execution_date = dag_run.execution_date
   WHERE task_instance.dag_id IN (%(dag_id_1)s) AND task_instance.state IN 
(%(state_1)s) AND (dag_run.state != %(state_2)s OR dag_run.state IS NULL)) AS 
anon_1 WHERE task_instance.dag_id = anon_1.dag_id AND task_instance.task_id = 
anon_1.task_id AND task_instance.execution_date = anon_1.execution_date]
   [parameters: {'start_date': datetime.datetime(2020, 10, 9, 9, 37, 10, 
897145, tzinfo=Timezone('UTC')), 'end_date': datetime.datetime(2020, 10, 9, 9, 
37, 10, 897145, tzinfo=Timezone('UTC')), 'state': 'failed', 'dag_id_1': 
'test_execute_helper_should_change_state_for_tis_without_dagrun', 'state_1': 
'up_for_retry', 'state_2': 'running'}]
   (Background on this error at: http://sqlalche.me/e/13/f405)
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 
'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       Errors with the following, this will always be 0 so I think setting it 
to 0 here explicitly is fine compared to casting it to float 
   
   ```
       cursor.execute(statement, parameters)
   sqlalchemy.exc.ProgrammingError: (psycopg2.errors.DatatypeMismatch) column 
"duration" is of type double precision but expression is of type interval
   LINE 1: ...09T09:37:10.897145+00:00'::timestamptz, duration=(task_insta...
                                                                ^
   HINT:  You will need to rewrite or cast the expression.
   
   [SQL: UPDATE task_instance SET start_date=%(start_date)s, 
end_date=%(end_date)s, duration=(task_instance.end_date - 
task_instance.start_date), state=%(state)s FROM (SELECT 
task_instance.try_number AS try_number, task_instance.task_id AS task_id, 
task_instance.dag_id AS dag_id, task_instance.execution_date AS execution_date, 
task_instance.start_date AS start_date, task_instance.end_date AS end_date, 
task_instance.duration AS duration, task_instance.state AS state, 
task_instance.max_tries AS max_tries, task_instance.hostname AS hostname, 
task_instance.unixname AS unixname, task_instance.job_id AS job_id, 
task_instance.pool AS pool, task_instance.pool_slots AS pool_slots, 
task_instance.queue AS queue, task_instance.priority_weight AS priority_weight, 
task_instance.operator AS operator, task_instance.queued_dttm AS queued_dttm, 
task_instance.queued_by_job_id AS queued_by_job_id, task_instance.pid AS pid, 
task_instance.executor_config AS executor_config, 
task_instance.external_executor
 _id AS external_executor_id
   FROM task_instance LEFT OUTER JOIN dag_run ON task_instance.dag_id = 
dag_run.dag_id AND task_instance.execution_date = dag_run.execution_date
   WHERE task_instance.dag_id IN (%(dag_id_1)s) AND task_instance.state IN 
(%(state_1)s) AND (dag_run.state != %(state_2)s OR dag_run.state IS NULL)) AS 
anon_1 WHERE task_instance.dag_id = anon_1.dag_id AND task_instance.task_id = 
anon_1.task_id AND task_instance.execution_date = anon_1.execution_date]
   [parameters: {'start_date': datetime.datetime(2020, 10, 9, 9, 37, 10, 
897145, tzinfo=Timezone('UTC')), 'end_date': datetime.datetime(2020, 10, 9, 9, 
37, 10, 897145, tzinfo=Timezone('UTC')), 'state': 'failed', 'dag_id_1': 
'test_execute_helper_should_change_state_for_tis_without_dagrun', 'state_1': 
'up_for_retry', 'state_2': 'running'}]
   (Background on this error at: http://sqlalche.me/e/13/f405)
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 
'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       We won't be setting 0 for Sensing, only for TIs in `State.finished()`:

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 
'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       We won't be setting 0 for Sensing, only for TIs in `State.finished()` 
which is failed, success or skipped

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 
'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       it is the `new_state` i.e which state we want to set it to, check the 
following code block.
   
   
https://github.com/apache/airflow/blob/3b0d977d413b38e6b394287a6cbc85a6bc25b414/airflow/jobs/scheduler_job.py#L1787-L1802
   
   So we only want to set the end_date and duration when we are setting the 
new_state to Failed and not when we are setting it to `None` since that task 
will be run again

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 
'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       I was not clear before, it is the `new_state` i.e which state we want to 
set it to, check the following code block.
   
   
https://github.com/apache/airflow/blob/3b0d977d413b38e6b394287a6cbc85a6bc25b414/airflow/jobs/scheduler_job.py#L1787-L1802
   
   So we only want to set the end_date and duration when we are setting the 
new_state to Failed and not when we are setting it to `None` since that task 
will be run again




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to