peay edited a comment on issue #16573:
URL: https://github.com/apache/airflow/issues/16573#issuecomment-886830337
@eladkal not sure if it is the same issue, but I am seeing the same symptom
on 2.1.1, and can reproduce easily by letting my DAGs run for an hour or two.
This affects tasks randomly in my DAGs, usually after running for a few
minutes. I have a single Celery worker pod which was healthy whenever this has
occured so far.
Here's a sample from a task log, where I have activated SQLAlchemy query
logging.
```
# Task has been running for a few minutes OK
[2021-07-23 11:15:51,821] {base.py:769} INFO - COMMIT
[2021-07-23 11:15:51,838] {base.py:727} INFO - BEGIN (implicit)
# This apparently correctly says task instance state is running, since we
don't stop here.
[2021-07-23 11:15:51,839] {base.py:1234} INFO - SELECT
task_instance.try_number AS task_instance_try_number, task_instance.task_id AS
task_instance_task_id, task_instance.dag_id AS task_instance_dag_id,
task_instance.execution_date AS task_instance_execution_date,
task_instance.start_date AS task_instance_start_date, task_instance.end_date AS
task_instance_end_date, task_instance.duration AS task_instance_duration,
task_instance.state AS task_instance_state, task_instance.max_tries AS
task_instance_max_tries, task_instance.hostname AS task_instance_hostname,
task_instance.unixname AS task_instance_unixname, task_instance.job_id AS
task_instance_job_id, task_instance.pool AS task_instance_pool,
task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS
task_instance_queue, task_instance.priority_weight AS
task_instance_priority_weight, task_instance.operator AS
task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm,
task_instance.queued_by_
job_id AS task_instance_queued_by_job_id, task_instance.pid AS
task_instance_pid, task_instance.executor_config AS
task_instance_executor_config, task_instance.external_executor_id AS
task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id =
%(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s
LIMIT %(param_1)s
[2021-07-23 11:15:51,839] {base.py:1239} INFO - {'dag_id_1': 'dag-name-1',
'task_id_1': 'task-name-1', 'execution_date_1': DateTime(2021, 7, 13, 19, 0, 0,
tzinfo=Timezone('UTC')), 'param_1': 1}
[2021-07-23 11:15:51,842] {base.py:769} INFO - COMMIT
# Task-specific logs
[2021-07-23 11:15:53,022] {yarn.py:94} INFO -
application_1624488826511_18729: RUNNING
[2021-07-23 11:15:58,030] {yarn.py:94} INFO -
application_1624488826511_18729: RUNNING
[2021-07-23 11:16:03,037] {yarn.py:94} INFO -
application_1624488826511_18729: RUNNING
[2021-07-23 11:16:08,044] {yarn.py:94} INFO -
application_1624488826511_18729: RUNNING
[2021-07-23 11:16:13,054] {yarn.py:94} INFO -
application_1624488826511_18729: RUNNING
[2021-07-23 11:16:18,062] {yarn.py:94} INFO -
application_1624488826511_18729: RUNNING
[2021-07-23 11:16:21,863] {base.py:727} INFO - BEGIN (implicit)
# Still OK
[2021-07-23 11:16:21,863] {base.py:1234} INFO - SELECT job.id AS job_id,
job.dag_id AS job_dag_id, job.state AS job_state, job.job_type AS job_job_type,
job.start_date AS job_start_date, job.end_date AS job_end_date,
job.latest_heartbeat AS job_latest_heartbeat, job.executor_class AS
job_executor_class, job.hostname AS job_hostname, job.unixname AS job_unixname
FROM job
WHERE job.id = %(param_1)s AND job.job_type IN (%(job_type_1)s)
[2021-07-23 11:16:21,864] {base.py:1239} INFO - {'param_1': 773,
'job_type_1': 'LocalTaskJob'}
[2021-07-23 11:16:21,866] {base.py:1234} INFO - UPDATE job SET
latest_heartbeat=%(latest_heartbeat)s WHERE job.id = %(job_id)s
[2021-07-23 11:16:21,866] {base.py:1239} INFO - {'latest_heartbeat':
datetime.datetime(2021, 7, 23, 11, 15, 51, 821303, tzinfo=Timezone('UTC')),
'job_id': 773}
[2021-07-23 11:16:21,867] {base.py:769} INFO - COMMIT
[2021-07-23 11:16:21,879] {base.py:727} INFO - BEGIN (implicit)
[2021-07-23 11:16:21,880] {base.py:1234} INFO - SELECT job.id AS job_id,
job.dag_id AS job_dag_id, job.state AS job_state, job.job_type AS job_job_type,
job.start_date AS job_start_date, job.end_date AS job_end_date,
job.latest_heartbeat AS job_latest_heartbeat, job.executor_class AS
job_executor_class, job.hostname AS job_hostname, job.unixname AS job_unixname
FROM job
WHERE job.id = %(param_1)s AND job.job_type IN (%(job_type_1)s)
[2021-07-23 11:16:21,880] {base.py:1239} INFO - {'param_1': 773,
'job_type_1': 'LocalTaskJob'}
[2021-07-23 11:16:21,882] {base.py:769} INFO - COMMIT
[2021-07-23 11:16:21,894] {base.py:727} INFO - BEGIN (implicit)
# This time, 30s later, it seems this says the task instance is
`up_for_retry`, but why?
# I believe this query is
https://github.com/apache/airflow/blob/2.1.1/airflow/jobs/local_task_job.py#L181
[2021-07-23 11:16:21,895] {base.py:1234} INFO - SELECT
task_instance.try_number AS task_instance_try_number, task_instance.task_id AS
task_instance_task_id, task_instance.dag_id AS task_instance_dag_id,
task_instance.execution_date AS task_instance_execution_date,
task_instance.start_date AS task_instance_start_date, task_instance.end_date AS
task_instance_end_date, task_instance.duration AS task_instance_duration,
task_instance.state AS task_instance_state, task_instance.max_tries AS
task_instance_max_tries, task_instance.hostname AS task_instance_hostname,
task_instance.unixname AS task_instance_unixname, task_instance.job_id AS
task_instance_job_id, task_instance.pool AS task_instance_pool,
task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS
task_instance_queue, task_instance.priority_weight AS
task_instance_priority_weight, task_instance.operator AS
task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm,
task_instance.queued_by_
job_id AS task_instance_queued_by_job_id, task_instance.pid AS
task_instance_pid, task_instance.executor_config AS
task_instance_executor_config, task_instance.external_executor_id AS
task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id =
%(task_id_1)s AND task_instance.execution_date = %(execution_date_1)s
LIMIT %(param_1)s
[2021-07-23 11:16:21,895] {base.py:1239} INFO - {'dag_id_1': 'dag-name-1',
'task_id_1': 'task-name-1', 'execution_date_1': DateTime(2021, 7, 13, 19, 0, 0,
tzinfo=Timezone('UTC')), 'param_1': 1}
[2021-07-23 11:16:21,898] {base.py:769} INFO - COMMIT
# Boom
[2021-07-23 11:16:21,899] {local_task_job.py:199} WARNING - State of this
instance has been externally set to up_for_retry. Terminating instance.
[2021-07-23 11:16:21,900] {process_utils.py:100} INFO - Sending
Signals.SIGTERM to GPID 9303
[2021-07-23 11:16:21,900] {taskinstance.py:1284} ERROR - Received SIGTERM.
Terminating subprocesses.
```
I have reviewed the logs for the scheduler, web server and worker with
SQLAlchemy query logging as well in order to try and determine where the state
is being altered, and found nothing... there is no `UPDATE` that sets the state
to `up_for_retry`, whether for this DAG/task, or in general over all task
instances -- there's just never a query parameter `state` set to `up_for_retry`
for `UPDATE`s that I could see. I can provide those logs if needed.
At this stage, I am rather puzzled as to what is going on here. Are there
more logs/checks I could enable to understand what's going on?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]