peay edited a comment on issue #16573:
URL: https://github.com/apache/airflow/issues/16573#issuecomment-886830337


   @eladkal not sure if it is the same issue, but I am seeing the same symptom 
on 2.1.1, and can reproduce easily by letting my DAGs run for an hour or two.
   
   This affects tasks randomly in my DAGs, usually after running for a few 
minutes. I have a single Celery worker pod which was healthy whenever this has 
occured so far. Airflow audit log table has no indication that the task state 
was changed.
   
   Here's a sample from a task log, where I have activated SQLAlchemy query 
logging.
   
   ```
   # Task starting up
   
--------------------------------------------------------------------------------
   [2021-07-23 11:11:51,133] {taskinstance.py:1088} INFO - Starting attempt 1 
of 4
   [2021-07-23 11:11:51,133] {taskinstance.py:1089} INFO - 
   
--------------------------------------------------------------------------------
   [2021-07-23 11:11:51,143] {base.py:727} INFO - BEGIN (implicit)
   [2021-07-23 11:11:51,144] {base.py:1234} INFO - SELECT 
task_instance.try_number AS task_instance_try_number, task_instance.task_id AS 
task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, 
task_instance.execution_date AS task_instance_execution_date, 
task_instance.start_date AS task_instance_start_date, task_instance.end_date AS 
task_instance_end_date, task_instance.duration AS task_instance_duration, 
task_instance.state AS task_instance_state, task_instance.max_tries AS 
task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
task_instance_job_id, task_instance.pool AS task_instance_pool, 
task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
task_instance_queue, task_instance.priority_weight AS 
task_instance_priority_weight, task_instance.operator AS 
task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, 
task_instance.queued_by_
 job_id AS task_instance_queued_by_job_id, task_instance.pid AS 
task_instance_pid, task_instance.executor_config AS 
task_instance_executor_config, task_instance.external_executor_id AS 
task_instance_external_executor_id 
   FROM task_instance 
   WHERE task_instance.task_id = %(param_1)s AND task_instance.dag_id = 
%(param_2)s AND task_instance.execution_date = %(param_3)s
   [2021-07-23 11:11:51,144] {base.py:1239} INFO - {'param_1': 'task-name-1', 
'param_2': 'dag-name-1', 'param_3': DateTime(2021, 7, 13, 19, 0, 0, 
tzinfo=Timezone('UTC'))}
   [2021-07-23 11:11:51,147] {base.py:1234} INFO - INSERT INTO log (dttm, 
dag_id, task_id, event, execution_date, owner, extra) VALUES (%(dttm)s, 
%(dag_id)s, %(task_id)s, %(event)s, %(execution_date)s, %(owner)s, %(extra)s) 
RETURNING log.id
   [2021-07-23 11:11:51,148] {base.py:1239} INFO - {'dttm': 
datetime.datetime(2021, 7, 23, 11, 11, 51, 133209, tzinfo=Timezone('UTC')), 
'dag_id': 'dag-name-1', 'task_id': 'task-name-1', 'event': 'running', 
'execution_date': DateTime(2021, 7, 13, 19, 0, 0, tzinfo=Timezone('UTC')), 
'owner': 'airflow', 'extra': None}
   [2021-07-23 11:11:51,150] {base.py:1234} INFO - UPDATE task_instance SET 
start_date=%(start_date)s, state=%(state)s, try_number=%(try_number)s, 
hostname=%(hostname)s, job_id=%(job_id)s WHERE task_instance.task_id = 
%(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s 
AND task_instance.execution_date = %(task_instance_execution_date)s
   [2021-07-23 11:11:51,150] {base.py:1239} INFO - {'start_date': 
datetime.datetime(2021, 7, 23, 11, 11, 51, 122723, tzinfo=Timezone('UTC')), 
'state': 'running', 'try_number': 1, 'hostname': 
'airflow-worker-general-b899585b8-s5lx9', 'job_id': 773, 
'task_instance_task_id': 'task-name-1', 'task_instance_dag_id': 'dag-name-1', 
'task_instance_execution_date': DateTime(2021, 7, 13, 19, 0, 0, 
tzinfo=Timezone('UTC'))}
   [2021-07-23 11:11:51,151] {base.py:769} INFO - COMMIT
   
   # At this point, the task state has been set and committed to `running`
   # Omitting a few minutes of logs
   
   ...
   
   # Task has been running for a few minutes OK
   [2021-07-23 11:15:51,821] {base.py:769} INFO - COMMIT
   [2021-07-23 11:15:51,838] {base.py:727} INFO - BEGIN (implicit)
   
   # This apparently correctly says task instance state is running, since we 
don't stop here.
   [2021-07-23 11:15:51,839] {base.py:1234} INFO - SELECT 
task_instance.try_number AS task_instance_try_number, task_instance.task_id AS 
task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, 
task_instance.execution_date AS task_instance_execution_date, 
task_instance.start_date AS task_instance_start_date, task_instance.end_date AS 
task_instance_end_date, task_instance.duration AS task_instance_duration, 
task_instance.state AS task_instance_state, task_instance.max_tries AS 
task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
task_instance_job_id, task_instance.pool AS task_instance_pool, 
task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
task_instance_queue, task_instance.priority_weight AS 
task_instance_priority_weight, task_instance.operator AS 
task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, 
task_instance.queued_by_
 job_id AS task_instance_queued_by_job_id, task_instance.pid AS 
task_instance_pid, task_instance.executor_config AS 
task_instance_executor_config, task_instance.external_executor_id AS 
task_instance_external_executor_id 
   FROM task_instance 
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = 
%(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s 
    LIMIT %(param_1)s
   [2021-07-23 11:15:51,839] {base.py:1239} INFO - {'dag_id_1': 'dag-name-1', 
'task_id_1': 'task-name-1', 'execution_date_1': DateTime(2021, 7, 13, 19, 0, 0, 
tzinfo=Timezone('UTC')), 'param_1': 1}
   [2021-07-23 11:15:51,842] {base.py:769} INFO - COMMIT
   
   # Task-specific logs
   [2021-07-23 11:15:53,022] {yarn.py:94} INFO - 
application_1624488826511_18729: RUNNING
   [2021-07-23 11:15:58,030] {yarn.py:94} INFO - 
application_1624488826511_18729: RUNNING
   [2021-07-23 11:16:03,037] {yarn.py:94} INFO - 
application_1624488826511_18729: RUNNING
   [2021-07-23 11:16:08,044] {yarn.py:94} INFO - 
application_1624488826511_18729: RUNNING
   [2021-07-23 11:16:13,054] {yarn.py:94} INFO - 
application_1624488826511_18729: RUNNING
   [2021-07-23 11:16:18,062] {yarn.py:94} INFO - 
application_1624488826511_18729: RUNNING
   
   [2021-07-23 11:16:21,863] {base.py:727} INFO - BEGIN (implicit)
   
   # Still OK
   [2021-07-23 11:16:21,863] {base.py:1234} INFO - SELECT job.id AS job_id, 
job.dag_id AS job_dag_id, job.state AS job_state, job.job_type AS job_job_type, 
job.start_date AS job_start_date, job.end_date AS job_end_date, 
job.latest_heartbeat AS job_latest_heartbeat, job.executor_class AS 
job_executor_class, job.hostname AS job_hostname, job.unixname AS job_unixname 
   FROM job 
   WHERE job.id = %(param_1)s AND job.job_type IN (%(job_type_1)s)
   [2021-07-23 11:16:21,864] {base.py:1239} INFO - {'param_1': 773, 
'job_type_1': 'LocalTaskJob'}
   [2021-07-23 11:16:21,866] {base.py:1234} INFO - UPDATE job SET 
latest_heartbeat=%(latest_heartbeat)s WHERE job.id = %(job_id)s
   [2021-07-23 11:16:21,866] {base.py:1239} INFO - {'latest_heartbeat': 
datetime.datetime(2021, 7, 23, 11, 15, 51, 821303, tzinfo=Timezone('UTC')), 
'job_id': 773}
   [2021-07-23 11:16:21,867] {base.py:769} INFO - COMMIT
   [2021-07-23 11:16:21,879] {base.py:727} INFO - BEGIN (implicit)
   [2021-07-23 11:16:21,880] {base.py:1234} INFO - SELECT job.id AS job_id, 
job.dag_id AS job_dag_id, job.state AS job_state, job.job_type AS job_job_type, 
job.start_date AS job_start_date, job.end_date AS job_end_date, 
job.latest_heartbeat AS job_latest_heartbeat, job.executor_class AS 
job_executor_class, job.hostname AS job_hostname, job.unixname AS job_unixname 
   FROM job 
   WHERE job.id = %(param_1)s AND job.job_type IN (%(job_type_1)s)
   [2021-07-23 11:16:21,880] {base.py:1239} INFO - {'param_1': 773, 
'job_type_1': 'LocalTaskJob'}
   [2021-07-23 11:16:21,882] {base.py:769} INFO - COMMIT
   [2021-07-23 11:16:21,894] {base.py:727} INFO - BEGIN (implicit)
   
   # This time, 30s later, it seems this says the task instance is 
`up_for_retry`, but why?
   # I believe this query is 
https://github.com/apache/airflow/blob/2.1.1/airflow/jobs/local_task_job.py#L181
   [2021-07-23 11:16:21,895] {base.py:1234} INFO - SELECT 
task_instance.try_number AS task_instance_try_number, task_instance.task_id AS 
task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, 
task_instance.execution_date AS task_instance_execution_date, 
task_instance.start_date AS task_instance_start_date, task_instance.end_date AS 
task_instance_end_date, task_instance.duration AS task_instance_duration, 
task_instance.state AS task_instance_state, task_instance.max_tries AS 
task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
task_instance_job_id, task_instance.pool AS task_instance_pool, 
task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
task_instance_queue, task_instance.priority_weight AS 
task_instance_priority_weight, task_instance.operator AS 
task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, 
task_instance.queued_by_
 job_id AS task_instance_queued_by_job_id, task_instance.pid AS 
task_instance_pid, task_instance.executor_config AS 
task_instance_executor_config, task_instance.external_executor_id AS 
task_instance_external_executor_id 
   FROM task_instance 
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = 
%(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s 
    LIMIT %(param_1)s
   [2021-07-23 11:16:21,895] {base.py:1239} INFO - {'dag_id_1': 'dag-name-1', 
'task_id_1': 'task-name-1', 'execution_date_1': DateTime(2021, 7, 13, 19, 0, 0, 
tzinfo=Timezone('UTC')), 'param_1': 1}
   [2021-07-23 11:16:21,898] {base.py:769} INFO - COMMIT
   
   # Boom
   [2021-07-23 11:16:21,899] {local_task_job.py:199} WARNING - State of this 
instance has been externally set to up_for_retry. Terminating instance.
   [2021-07-23 11:16:21,900] {process_utils.py:100} INFO - Sending 
Signals.SIGTERM to GPID 9303
   [2021-07-23 11:16:21,900] {taskinstance.py:1284} ERROR - Received SIGTERM. 
Terminating subprocesses.
   ```
   
   I have reviewed the logs for the scheduler, web server and worker with 
SQLAlchemy query logging as well in order to try and determine where the state 
is being altered, and found nothing... there is no `UPDATE` that sets the state 
to `up_for_retry`, whether for this DAG/task, or in general over all task 
instances -- there's just never a query parameter `state` set to `up_for_retry` 
for `UPDATE`s that I could see. I can provide those logs if needed.
   
   At this stage, I am rather puzzled as to what is going on here. Are there 
more logs/checks I could enable to understand what's going on?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to