potiuk commented on issue #5200: [AIRFLOW-4401] Use managers for Queue 
synchronization
URL: https://github.com/apache/airflow/pull/5200#issuecomment-487823276
 
 
   @BasPH @Fokko @ashb -> I made some ammendments now as I realised that 
Manager().Queue() is always a JoinableQueue (unlike multiprocessing.Queue). I 
updated the pattern everywhere to follow this general pattern below (also 
described it in the commit message). 
   
   Since the managed queue is anyhow proxied to the shared one, get_nowait() 
will always return the  message if it already has been put to the queue by 
another process.
   
   BTW. the get_nowait() message in case of managed queue is not really  
"instant" as in case of standard multiprocessing.Queue - it will poll the 
shared process, but that's exactly why it is good for us :).
   
   It also seems that in KubernetesExecutor there was a mistake. See the 
comment here 
https://github.com/apache/airflow/pull/5200/commits/6ae1ac6f25b53e8a726b463461c96b50e114d7e7#r279615263
 - if we've heard about inability to gracefully shutdown the KubernetesExecutor 
- that could be it.
   
   ```
   self.task_queue.join()
   ```
   
   
   ```
   while True:
      try:
          res = queue.get_nowait()
          try:
             .... do some processing
          finally:
              queue.task_done()
      except Empty:
          break
   
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to