potiuk commented on issue #5200: [AIRFLOW-4401] Use managers for Queue synchronization URL: https://github.com/apache/airflow/pull/5200#issuecomment-487823276 @BasPH @Fokko @ashb -> I made some ammendments now as I realised that Manager().Queue() is always a JoinableQueue (unlike multiprocessing.Queue). I updated the pattern everywhere to follow this general pattern below (also described it in the commit message). Since the managed queue is anyhow proxied to the shared one, get_nowait() will always return the message if it already has been put to the queue by another process. BTW. the get_nowait() message in case of managed queue is not really "instant" as in case of standard multiprocessing.Queue - it will poll the shared process, but that's exactly why it is good for us :). It also seems that in KubernetesExecutor there was a mistake. See the comment here https://github.com/apache/airflow/pull/5200/commits/6ae1ac6f25b53e8a726b463461c96b50e114d7e7#r279615263 - if we've heard about inability to gracefully shutdown the KubernetesExecutor - that could be it. ``` self.task_queue.join() ``` ``` while True: try: res = queue.get_nowait() try: .... do some processing finally: queue.task_done() except Empty: break ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
