V0lantis commented on issue #21225:
URL: https://github.com/apache/airflow/issues/21225#issuecomment-1236151304

   I have been trying to fix this issue in 
[celery/kombu](https://github.com/celery/kombu) where I beleive(d) was lying 
the issue we are talking about here (namely, that a SIGTERM was sent while the 
redis queue was consumed, leading to a message never being acknowledged and a 
message lost in the wild). 
   
   The issue is, I have been trying to replicate the issue in kombu without any 
success. I managed to reproduce the issue directly in Airflow by `RPOP` a msg 
before calling celery worker and it worked perfectly as described at the 
beginning of this issue. But, at least in its last version (Celery **v.5.2.7**) 
which is the one we have in production, I can't reproduce the behavior (Sending 
a SIGTERM just after kombu has `BRPOP` the msg). The msg is rightfully acked, 
and rightfully put back in the queue after SIGTERM had been catched by python. 
   
   I don't really know where to go from here now. 
   
   I am putting the logs msg we have been having in our prod environment. Maybe 
someone else will have a better idea : 
   
   **Scheduler logs**
   <details>
   
   ```
        <TaskInstance: dag_id.task_id scheduled__2022-08-24T15:00:00+00:00 
[scheduled]>
        <TaskInstance: dag_id.task_id scheduled__2022-08-24T15:00:00+00:00 
[scheduled]>
   [2022-08-24 16:23:50,060] {cheduler_job.py:546} INFO - Sending 
TaskInstanceKey(dag_id='dag_id', task_id='task_id', 
run_id='scheduled__2022-08-24T15:00:00+00:00', try_number=1, map_index=-1) to 
executor with priority 5 and queue default
   [2022-08-24 16:23:50,060] {base_executor.py:91} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'dag_id', 'task_id', 
'scheduled__2022-08-24T15:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_id.py']
   [2022-08-24 16:23:50,264] {cheduler_job.py:605} INFO - Executor reports 
execution of dag_id.task_id run_id=scheduled__2022-08-24T15:00:00+00:00 exited 
with status queued for try_number 1
   [2022-08-24 16:23:50,276] {cheduler_job.py:632} INFO - Setting external_id 
for <TaskInstance: dag_id.task_id scheduled__2022-08-24T15:00:00+00:00 
[queued]> to 139960c0-263d-4891-b1b0-712b077a0d2b
        <TaskInstance: dag_id.task_id scheduled__2022-08-24T15:00:00+00:00 
[scheduled]>
        <TaskInstance: dag_id.task_id scheduled__2022-08-24T15:00:00+00:00 
[scheduled]>
   ```
   
   </details>
   
   There were two workers which were downscaled with similar logs: 
   
   **1rst worker**
   
   <details>
   
   ```
   [2022-08-24 16:23:49,597: WARNING/ForkPoolWorker-254] unknown id type, added 
to user and store ids columns: create_account_step.fingerprint
   
   worker: Warm shutdown (MainProcess)
   [2022-08-24 16:23:49,777: WARNING/ForkPoolWorker-254] unknown id type, added 
to user and store ids columns: login.fingerprint
   ```
   
   </details>
   
   The log above is not relevant. The relevant information here is that the 
worker received shutdown SIGTERM signal around 0.3 seconds before the task id 
was actually sent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to