saulbein opened a new issue, #27657:
URL: https://github.com/apache/airflow/issues/27657

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   We're having an issue due to the fact that sometimes we need to restart 
Airflow services. Some of the tasks that were running during the restart will 
retry properly, as expected, but others will not. I have not managed to figure 
out the cause of this, but I can provide logs from the investigation (see 
below).
   
   Discussion #27071 seems to have a similar issue as well.
   
   ### What you think should happen instead
   
   All tasks should have been marked for retry and retried, instead of only 
some of them.
   
   ### How to reproduce
   
   I have not managed to reproduce this issue locally, every attempt it just 
retries the tasks as expected.
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   Airflow version: release:2.3.4+88b274c95b212b541ba19918880ae425856212be
   Provider versions (there are more providers used by DAGs, but they should 
not be relevant):
   apache-airflow-providers-amazon==6.0.0
   apache-airflow-providers-celery==2.1.0
   apache-airflow-providers-postgres==5.2.2
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   Setup with issues (runs on a k8s cluster):
   - Kubernetes Server Version: version.Info{Major:"1", Minor:"23", 
GitVersion:"v1.23.13", GitCommit:"592eca05be27f7d927d0b25cbb4241d75a9574bf", 
GitTreeState:"clean", BuildDate:"2022-10-12T10:50:48Z", GoVersion:"go1.17.13", 
Compiler:"gc", Platform:"linux/amd64"}
   - Airflow scheduler, webserver and workers are all in different k8s pods, 
running CeleryExecutor on the scheduler
   - Database is postgres hosted in an AWS RDS instance, Celery queue broker is 
an AWS ElastiCache Redis cluster
   - DAG folder is synchronized via a shared docker volume backed by AWS EFS
   
   Local test setup (same docker images used as the setup with issues):
   - Docker Compose version 2.11.2
   - Docker version 20.10.18, build b40c2f6b5d
   - Airflow scheduler, webserver and workers are all in different containers, 
running CeleryExecutor on the scheduler
   - Database is a postgres container, Celery queue broker is a redis container
   - DAG folder is synchronized via a shared docker volume backed by the local 
filesystem
   
   ### Anything else
   
   Notes:
   - our workers take an extra 60s to start due to some legacy code, but they 
do start before the zombie task detection happens
   - we are running 2 schedulers with DB locking, but only one deals with 
zombie tasks so they are not interfering with each other
   - the task that does not get restarted is detected as zombie multiple times
   - there are more tasks that get interrupted or retried here, but I ommited 
them for brevity
   - both `dag_1` and `dag_2` have retries set to `1` for all tasks and the 
retry delay is 15mins
   
   Logs of when the issue happened:
   ```
   -- tasks start
   [2022-11-08 08:47:36,352] {scheduler_job.py:354} INFO - 1 tasks up for 
execution:
        <TaskInstance: dag_1.task_a scheduled__2022-11-08T07:30:00+00:00 
[scheduled]>
   [2022-11-08 08:47:36,352] {scheduler_job.py:419} INFO - DAG dag_1 has 0/100 
running and queued tasks
   [2022-11-08 08:47:36,353] {scheduler_job.py:505} INFO - Setting the 
following tasks to queued state:
        <TaskInstance: dag_1.task_a scheduled__2022-11-08T07:30:00+00:00 
[scheduled]>
   [2022-11-08 08:47:36,355] {scheduler_job.py:547} INFO - Sending 
TaskInstanceKey(dag_id='dag_1', task_id='task_a', 
run_id='scheduled__2022-11-08T07:30:00+00:00', try_number=1, map_index=-1) to 
executor with priority 1 and queue default
   [2022-11-08 08:47:36,355] {base_executor.py:91} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'dag_1', 'task_a', 
'scheduled__2022-11-08T07:30:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_1.py']
   [2022-11-08 08:47:36,361: INFO/MainProcess] Task 
airflow.executors.celery_executor.execute_command[e8a00dcd-1ddf-4c9d-a3af-c144f73a1eff]
 received
   [2022-11-08 08:47:36,364: INFO/ForkPoolWorker-1] 
[e8a00dcd-1ddf-4c9d-a3af-c144f73a1eff] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dag_1', 'task_a', 'scheduled__2022-11-08T07:30:00+00:00', 
'--local', '--subdir', 'DAGS_FOLDER/dag_1.py']
   [2022-11-08 08:47:36,365] {scheduler_job.py:633} INFO - Setting external_id 
for <TaskInstance: dag_1.task_a scheduled__2022-11-08T07:30:00+00:00 [queued]> 
to e8a00dcd-1ddf-4c9d-a3af-c144f73a1eff
   [2022-11-08 08:47:42,215: INFO/ForkPoolWorker-1] Running <TaskInstance: 
dag_1.task_a scheduled__2022-11-08T07:30:00+00:00 [queued]> on host 
100.115.222.44
   [2022-11-08 08:48:48,115] {scheduler_job.py:354} INFO - 4 tasks up for 
execution:
        <TaskInstance: dag_2.task_b scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
        <TaskInstance: dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
        <TaskInstance: dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
        <TaskInstance: dag_2.task_e scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
   [2022-11-08 08:48:48,115] {scheduler_job.py:419} INFO - DAG dag_2 has 0/100 
running and queued tasks
   [2022-11-08 08:48:48,115] {scheduler_job.py:419} INFO - DAG dag_2 has 1/100 
running and queued tasks
   [2022-11-08 08:48:48,116] {scheduler_job.py:419} INFO - DAG dag_2 has 2/100 
running and queued tasks
   [2022-11-08 08:48:48,116] {scheduler_job.py:419} INFO - DAG dag_2 has 3/100 
running and queued tasks
   [2022-11-08 08:48:48,116] {scheduler_job.py:505} INFO - Setting the 
following tasks to queued state:
        <TaskInstance: dag_2.task_b scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
        <TaskInstance: dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
        <TaskInstance: dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
        <TaskInstance: dag_2.task_e scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
   [2022-11-08 08:48:48,119] {scheduler_job.py:547} INFO - Sending 
TaskInstanceKey(dag_id='dag_2', task_id='task_b', 
run_id='scheduled__2022-11-08T06:48:47.892047+00:00', try_number=1, 
map_index=-1) to executor with priority 1 and queue default
   [2022-11-08 08:48:48,119] {base_executor.py:91} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'dag_2', 'task_b', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 08:48:48,120] {scheduler_job.py:547} INFO - Sending 
TaskInstanceKey(dag_id='dag_2', task_id='task_c', 
run_id='scheduled__2022-11-08T06:48:47.892047+00:00', try_number=1, 
map_index=-1) to executor with priority 1 and queue default
   [2022-11-08 08:48:48,120] {base_executor.py:91} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'dag_2', 'task_c', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 08:48:48,120] {scheduler_job.py:547} INFO - Sending 
TaskInstanceKey(dag_id='dag_2', task_id='task_d', 
run_id='scheduled__2022-11-08T06:48:47.892047+00:00', try_number=1, 
map_index=-1) to executor with priority 1 and queue default
   [2022-11-08 08:48:48,120] {base_executor.py:91} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'dag_2', 'task_d', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 08:48:48,121] {scheduler_job.py:547} INFO - Sending 
TaskInstanceKey(dag_id='dag_2', task_id='task_e', 
run_id='scheduled__2022-11-08T06:48:47.892047+00:00', try_number=1, 
map_index=-1) to executor with priority 1 and queue default
   [2022-11-08 08:48:48,121] {base_executor.py:91} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'dag_2', 'task_e', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 08:48:48,170: INFO/MainProcess] Task 
airflow.executors.celery_executor.execute_command[6edb1b51-128a-453b-abf6-4bbb47943468]
 received
   [2022-11-08 08:48:48,170: INFO/MainProcess] Task 
airflow.executors.celery_executor.execute_command[643b9c8a-c816-4894-ba16-10346daa2d72]
 received
   [2022-11-08 08:48:48,171: INFO/MainProcess] Task 
airflow.executors.celery_executor.execute_command[5f47d5e9-71c2-4777-bf06-72f596ea9898]
 received
   [2022-11-08 08:48:48,171: INFO/MainProcess] Task 
airflow.executors.celery_executor.execute_command[2dc4f281-9d99-4d70-9f74-70d02d803f08]
 received
   [2022-11-08 08:48:48,173: INFO/ForkPoolWorker-1] 
[6edb1b51-128a-453b-abf6-4bbb47943468] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dag_2', 'task_e', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 08:48:48,173: INFO/ForkPoolWorker-1] 
[643b9c8a-c816-4894-ba16-10346daa2d72] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dag_2', 'task_d', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 08:48:48,174: INFO/ForkPoolWorker-1] 
[5f47d5e9-71c2-4777-bf06-72f596ea9898] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dag_2', 'task_b', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 08:48:48,174: INFO/ForkPoolWorker-3] 
[2dc4f281-9d99-4d70-9f74-70d02d803f08] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dag_2', 'task_c', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 08:48:48,189] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_2.task_b run_id=scheduled__2022-11-08T06:48:47.892047+00:00 
exited with status queued for try_number 1
   [2022-11-08 08:48:48,189] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_2.task_c run_id=scheduled__2022-11-08T06:48:47.892047+00:00 
exited with status queued for try_number 1
   [2022-11-08 08:48:48,189] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_2.task_d run_id=scheduled__2022-11-08T06:48:47.892047+00:00 
exited with status queued for try_number 1
   [2022-11-08 08:48:48,190] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_2.task_e run_id=scheduled__2022-11-08T06:48:47.892047+00:00 
exited with status queued for try_number 1
   [2022-11-08 08:48:48,196] {scheduler_job.py:633} INFO - Setting external_id 
for <TaskInstance: dag_2.task_b scheduled__2022-11-08T06:48:47.892047+00:00 
[queued]> to 5f47d5e9-71c2-4777-bf06-72f596ea9898
   [2022-11-08 08:48:48,197] {scheduler_job.py:633} INFO - Setting external_id 
for <TaskInstance: dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 
[queued]> to 2dc4f281-9d99-4d70-9f74-70d02d803f08
   [2022-11-08 08:48:48,197] {scheduler_job.py:633} INFO - Setting external_id 
for <TaskInstance: dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 
[queued]> to 643b9c8a-c816-4894-ba16-10346daa2d72
   [2022-11-08 08:48:48,197] {scheduler_job.py:633} INFO - Setting external_id 
for <TaskInstance: dag_2.task_e scheduled__2022-11-08T06:48:47.892047+00:00 
[queued]> to 6edb1b51-128a-453b-abf6-4bbb47943468
   [2022-11-08 08:48:53,885: INFO/ForkPoolWorker-1] Running <TaskInstance: 
dag_2.task_e scheduled__2022-11-08T06:48:47.892047+00:00 [queued]> on host 
100.115.245.163
   [2022-11-08 08:48:53,992: INFO/ForkPoolWorker-1] Running <TaskInstance: 
dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 [queued]> on host 
100.114.200.229
   [2022-11-08 08:48:54,337: INFO/ForkPoolWorker-1] Running <TaskInstance: 
dag_2.task_b scheduled__2022-11-08T06:48:47.892047+00:00 [queued]> on host 
100.114.216.107
   [2022-11-08 08:48:54,619: INFO/ForkPoolWorker-3] Running <TaskInstance: 
dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 [queued]> on host 
100.116.148.23
   
   -- some finish
   [2022-11-08 08:48:58,341: INFO/ForkPoolWorker-1] Task 
airflow.executors.celery_executor.execute_command[6edb1b51-128a-453b-abf6-4bbb47943468]
 succeeded in 10.169025331037119s: None
   [2022-11-08 08:48:58,430] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_2.task_e run_id=scheduled__2022-11-08T06:48:47.892047+00:00 
exited with status success for try_number 1
   [2022-11-08 08:48:58,434] {scheduler_job.py:643} INFO - TaskInstance 
Finished: dag_id=dag_2, task_id=task_e, 
run_id=scheduled__2022-11-08T06:48:47.892047+00:00, map_index=-1, 
run_start_date=2022-11-08 08:48:53.972116+00:00, run_end_date=2022-11-08 
08:48:57.542753+00:00, run_duration=3.570637, state=success, 
executor_state=success, try_number=1, max_tries=1, job_id=17526539, 
pool=default_pool, queue=default, priority_weight=1, operator=SomeOperator, 
queued_dttm=2022-11-08 08:48:48.117420+00:00, queued_by_job_id=17519022, 
pid=6513
   [2022-11-08 08:49:07,162: INFO/ForkPoolWorker-1] Task 
airflow.executors.celery_executor.execute_command[5f47d5e9-71c2-4777-bf06-72f596ea9898]
 succeeded in 18.989054591045715s: None
   [2022-11-08 08:49:07,459] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_2.task_b run_id=scheduled__2022-11-08T06:48:47.892047+00:00 
exited with status success for try_number 1
   [2022-11-08 08:49:07,463] {scheduler_job.py:643} INFO - TaskInstance 
Finished: dag_id=dag_2, task_id=task_b, 
run_id=scheduled__2022-11-08T06:48:47.892047+00:00, map_index=-1, 
run_start_date=2022-11-08 08:48:54.442108+00:00, run_end_date=2022-11-08 
08:49:06.704935+00:00, run_duration=12.262827, state=success, 
executor_state=success, try_number=1, max_tries=1, job_id=17526541, 
pool=default_pool, queue=default, priority_weight=1, operator=SomeOperator, 
queued_dttm=2022-11-08 08:48:48.117420+00:00, queued_by_job_id=17519022, 
pid=5272
   
   -- restart happens
   [2022-11-08 08:49:41,278] <last scheduler log before shutdown>
   [2022-11-08 08:50:13,511] worker: Warm shutdown (MainProcess)
   ...
   [2022-11-08 08:50:13,534] worker: Warm shutdown (MainProcess)
   [2022-11-08 08:50:42,000] <workers start receiving sigkill>
   [2022-11-08 08:50:42,827] {scheduler_job.py:1231} INFO - Resetting orphaned 
tasks for active dag runs
   [2022-11-08 08:50:42,759] {scheduler_job.py:709} INFO - Starting the 
scheduler
   [2022-11-08 08:50:42,963] {celery_executor.py:532} INFO - Adopted the 
following 19 tasks from a dead executor
        <TaskInstance: dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 
[running]> in state STARTED
        <TaskInstance: dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 
[running]> in state STARTED
        <TaskInstance: dag_1.task_a scheduled__2022-11-08T07:30:00+00:00 
[running]> in state STARTED
   [2022-11-08 08:51:54,574: INFO/MainProcess] celery@worker-97j8f ready.
   ...
   [2022-11-08 08:53:30,010: INFO/MainProcess] celery@worker-8sdvl ready.
   
   
   -- zombie detection is run
   [2022-11-08 08:55:38,644] {scheduler_job.py:1362} WARNING - Failing (19) 
jobs without heartbeat after 2022-11-08 08:50:38.636836+00:00
   [2022-11-08 08:55:38,690] {scheduler_job.py:1370} ERROR - Detected zombie 
job: {'full_filepath': '/usr/local/airflow/dags/dag_1.py', 'msg': 'Detected 
<TaskInstance: dag_1.task_a scheduled__2022-11-08T07:30:00+00:00 [running]> as 
zombie', 'simple_task_instance': 
<airflow.models.taskinstance.SimpleTaskInstance object at 0x7ff077a47af0>, 
'is_failure_callback': True}
   [2022-11-08 08:55:38,754] {scheduler_job.py:1370} ERROR - Detected zombie 
job: {'full_filepath': '/usr/local/airflow/dags/dag_2.py', 'msg': 'Detected 
<TaskInstance: dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 
[running]> as zombie', 'simple_task_instance': 
<airflow.models.taskinstance.SimpleTaskInstance object at 0x7ff077926e50>, 
'is_failure_callback': True}
   [2022-11-08 08:55:38,761] {scheduler_job.py:1370} ERROR - Detected zombie 
job: {'full_filepath': '/usr/local/airflow/dags/dag_2.py', 'msg': 'Detected 
<TaskInstance: dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 
[running]> as zombie', 'simple_task_instance': 
<airflow.models.taskinstance.SimpleTaskInstance object at 0x7ff077a47370>, 
'is_failure_callback': True}
   [2022-11-08 08:55:49,021] {scheduler_job.py:1362} WARNING - Failing (14) 
jobs without heartbeat after 2022-11-08 08:50:49.014152+00:00
   [2022-11-08 08:55:49,108] {scheduler_job.py:1370} ERROR - Detected zombie 
job: {'full_filepath': '/usr/local/airflow/dags/dag_1.py', 'msg': 'Detected 
<TaskInstance: dag_1.task_a scheduled__2022-11-08T07:30:00+00:00 [running]> as 
zombie', 'simple_task_instance': 
<airflow.models.taskinstance.SimpleTaskInstance object at 0x7ff077a4adc0>, 
'is_failure_callback': True}
   [2022-11-08 08:55:59,283] {scheduler_job.py:1362} WARNING - Failing (9) jobs 
without heartbeat after 2022-11-08 08:50:59.278210+00:00
   [2022-11-08 08:55:59,321] {scheduler_job.py:1370} ERROR - Detected zombie 
job: {'full_filepath': '/usr/local/airflow/dags/dag_1.py', 'msg': 'Detected 
<TaskInstance: dag_1.task_a scheduled__2022-11-08T07:30:00+00:00 [running]> as 
zombie', 'simple_task_instance': 
<airflow.models.taskinstance.SimpleTaskInstance object at 0x7ff077965f40>, 
'is_failure_callback': True}
   
   -- one of the tasks gets marked as failed
   [2022-11-08 08:56:00,432] {dagrun.py:552} ERROR - Marking run <DagRun dag_1 
@ 2022-11-08 07:30:00+00:00: scheduled__2022-11-08T07:30:00+00:00, 
state:running, queued_at: 2022-11-08 08:30:00.226664+00:00. externally 
triggered: False> failed
   [2022-11-08 08:56:00,442] {dagrun.py:612} INFO - DagRun Finished: 
dag_id=dag_1, execution_date=2022-11-08 07:30:00+00:00, 
run_id=scheduled__2022-11-08T07:30:00+00:00, run_start_date=2022-11-08 
08:30:00.284197+00:00, run_end_date=2022-11-08 08:56:00.441987+00:00, 
run_duration=1560.15779, state=failed, external_trigger=False, 
run_type=scheduled, data_interval_start=2022-11-08 07:30:00+00:00, 
data_interval_end=2022-11-08 08:30:00+00:00, 
dag_hash=d22f26bcfbd8f9911d8d29891fa42598
   [2022-11-08 08:56:00,452] {dag.py:2977} INFO - Setting next_dagrun for dag_1 
to 2022-11-08T08:30:00+00:00, run_after=2022-11-08T09:30:00+00:00
   
   -- some tasks get scheduled for a second attempt
   [2022-11-08 09:10:46,901] {scheduler_job.py:354} INFO - 2 tasks up for 
execution:
        <TaskInstance: dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
        <TaskInstance: dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
   [2022-11-08 09:10:46,902] {scheduler_job.py:419} INFO - DAG dag_2 has 0/100 
running and queued tasks
   [2022-11-08 09:10:46,902] {scheduler_job.py:419} INFO - DAG dag_2 has 1/100 
running and queued tasks
   [2022-11-08 09:10:46,902] {scheduler_job.py:505} INFO - Setting the 
following tasks to queued state:
        <TaskInstance: dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
        <TaskInstance: dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 
[scheduled]>
   [2022-11-08 09:10:46,906] {scheduler_job.py:547} INFO - Sending 
TaskInstanceKey(dag_id='dag_2', task_id='task_c', 
run_id='scheduled__2022-11-08T06:48:47.892047+00:00', try_number=2, 
map_index=-1) to executor with priority 1 and queue default
   [2022-11-08 09:10:46,906] {base_executor.py:91} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'dag_2', 'task_c', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 09:10:46,907] {scheduler_job.py:547} INFO - Sending 
TaskInstanceKey(dag_id='dag_2', task_id='task_d', 
run_id='scheduled__2022-11-08T06:48:47.892047+00:00', try_number=2, 
map_index=-1) to executor with priority 1 and queue default
   [2022-11-08 09:10:46,907] {base_executor.py:91} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'dag_2', 'task_d', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 09:10:46,970: INFO/ForkPoolWorker-1] 
[e54003f4-15cf-4423-8378-95b066dcd659] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dag_2', 'task_d', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 09:10:46,972: INFO/ForkPoolWorker-3] 
[8540e515-db64-4c3d-be56-59a600ad8c0b] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dag_2', 'task_c', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 09:10:46,985] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_2.task_c run_id=scheduled__2022-11-08T06:48:47.892047+00:00 
exited with status queued for try_number 2
   [2022-11-08 09:10:46,985] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_2.task_d run_id=scheduled__2022-11-08T06:48:47.892047+00:00 
exited with status queued for try_number 2
   [2022-11-08 09:10:46,993] {scheduler_job.py:633} INFO - Setting external_id 
for <TaskInstance: dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 
[queued]> to 8540e515-db64-4c3d-be56-59a600ad8c0b
   [2022-11-08 09:10:46,993] {scheduler_job.py:633} INFO - Setting external_id 
for <TaskInstance: dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 
[queued]> to e54003f4-15cf-4423-8378-95b066dcd659
   [2022-11-08 09:10:52,796: INFO/ForkPoolWorker-1] Running <TaskInstance: 
dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 [queued]> on host 
100.115.245.164
   [2022-11-08 09:10:53,160: INFO/ForkPoolWorker-3] Running <TaskInstance: 
dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 [queued]> on host 
100.115.222.49
   
   -- celery visibility timeout, old attempts get run again but immediately 
fail because of task state in Airflow
   [2022-11-08 09:48:59,865: INFO/MainProcess] Task 
airflow.executors.celery_executor.execute_command[643b9c8a-c816-4894-ba16-10346daa2d72]
 received
   [2022-11-08 09:48:59,865: INFO/MainProcess] Task 
airflow.executors.celery_executor.execute_command[2dc4f281-9d99-4d70-9f74-70d02d803f08]
 received
   [2022-11-08 09:48:59,867: INFO/MainProcess] Task 
airflow.executors.celery_executor.execute_command[e8a00dcd-1ddf-4c9d-a3af-c144f73a1eff]
 received
   [2022-11-08 09:48:59,868: INFO/ForkPoolWorker-1] 
[2dc4f281-9d99-4d70-9f74-70d02d803f08] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dag_2', 'task_c', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 09:48:59,867: INFO/ForkPoolWorker-3] 
[643b9c8a-c816-4894-ba16-10346daa2d72] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dag_2', 'task_d', 
'scheduled__2022-11-08T06:48:47.892047+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_2.py']
   [2022-11-08 09:48:59,871: INFO/ForkPoolWorker-1] 
[e8a00dcd-1ddf-4c9d-a3af-c144f73a1eff] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dag_1', 'task_a', 'scheduled__2022-11-08T07:30:00+00:00', 
'--local', '--subdir', 'DAGS_FOLDER/dag_1.py']
   [2022-11-08 09:49:05,955: INFO/ForkPoolWorker-3] Running <TaskInstance: 
dag_2.task_d scheduled__2022-11-08T06:48:47.892047+00:00 [running]> on host 
100.121.39.8
   [2022-11-08 09:49:06,182: INFO/ForkPoolWorker-1] Running <TaskInstance: 
dag_2.task_c scheduled__2022-11-08T06:48:47.892047+00:00 [running]> on host 
100.121.34.8
   [2022-11-08 09:49:06,437: INFO/ForkPoolWorker-3] Task 
airflow.executors.celery_executor.execute_command[643b9c8a-c816-4894-ba16-10346daa2d72]
 succeeded in 6.570589219001704s: None
   [2022-11-08 09:49:06,600: INFO/ForkPoolWorker-1] Task 
airflow.executors.celery_executor.execute_command[2dc4f281-9d99-4d70-9f74-70d02d803f08]
 succeeded in 6.734126636001747s: None
   [2022-11-08 09:49:06,607: INFO/ForkPoolWorker-1] Running <TaskInstance: 
dag_1.task_a scheduled__2022-11-08T07:30:00+00:00 [failed]> on host 
100.97.127.104
   [2022-11-08 09:49:07,022: INFO/ForkPoolWorker-1] Task 
airflow.executors.celery_executor.execute_command[e8a00dcd-1ddf-4c9d-a3af-c144f73a1eff]
 succeeded in 7.152360106818378s: None
   [2022-11-08 09:49:07,131] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_1.task_a run_id=scheduled__2022-11-08T07:30:00+00:00 exited 
with status success for try_number 1   
   [2022-11-08 09:49:07,131] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_2.task_c run_id=scheduled__2022-11-08T06:48:47.892047+00:00 
exited with status success for try_number 1
   [2022-11-08 09:49:07,131] {scheduler_job.py:600} INFO - Executor reports 
execution of dag_2.task_d run_id=scheduled__2022-11-08T06:48:47.892047+00:00 
exited with status success for try_number 1
   [2022-11-08 09:49:07,137] {scheduler_job.py:643} INFO - TaskInstance 
Finished: dag_id=dag_1, task_id=task_a, 
run_id=scheduled__2022-11-08T07:30:00+00:00, map_index=-1, 
run_start_date=2022-11-08 08:47:42.294484+00:00, run_end_date=2022-11-08 
08:58:28.773400+00:00, run_duration=646.478916, state=failed, 
executor_state=success, try_number=1, max_tries=1, job_id=17526533, 
pool=default_pool, queue=default, priority_weight=1, operator=ApplySQL, 
queued_dttm=2022-11-08 08:47:36.353832+00:00, queued_by_job_id=17526549, 
pid=6062
   [2022-11-08 09:49:07,137] {scheduler_job.py:643} INFO - TaskInstance 
Finished: dag_id=dag_2, task_id=task_d, 
run_id=scheduled__2022-11-08T06:48:47.892047+00:00, map_index=-1, 
run_start_date=2022-11-08 09:10:52.884552+00:00, run_end_date=None, 
run_duration=412.600346, state=running, executor_state=success, try_number=1, 
max_tries=1, job_id=17526813, pool=default_pool, queue=default, 
priority_weight=1, operator=SomeOperator, queued_dttm=2022-11-08 
09:10:46.903526+00:00, queued_by_job_id=17526550, pid=233
   [2022-11-08 09:49:07,138] {scheduler_job.py:643} INFO - TaskInstance 
Finished: dag_id=dag_2, task_id=task_c, 
run_id=scheduled__2022-11-08T06:48:47.892047+00:00, map_index=-1, 
run_start_date=2022-11-08 09:10:53.238023+00:00, run_end_date=None, 
run_duration=412.011327, state=running, executor_state=success, try_number=1, 
max_tries=1, job_id=17526814, pool=default_pool, queue=default, 
priority_weight=1, operator=SomeOperator, queued_dttm=2022-11-08 
09:10:46.903526+00:00, queued_by_job_id=17526550, pid=196
   ```
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to