KevinYang21 commented on a change in pull request #5908: Revert "[AIRFLOW-4797] 
Improve performance and behaviour of zombie de…
URL: https://github.com/apache/airflow/pull/5908#discussion_r321986407
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -1193,13 +1195,11 @@ def heartbeat(self):
 
             self._file_path_queue.extend(files_paths_to_queue)
 
-        zombies = self._find_zombies()
-
         # Start more processors if we have enough slots and files to process
         while (self._parallelism - len(self._processors) > 0 and
                len(self._file_path_queue) > 0):
             file_path = self._file_path_queue.pop(0)
-            processor = self._processor_factory(file_path, zombies)
+            processor = self._processor_factory(file_path, self._zombies)
 
 Review comment:
   Since we're using multiprocessing, the list will always be copied as they 
won't be able to share any object. Even it shares it's still fine since we only 
read from it.
   
   A quick demo on the no share part:
   
   ```
   root@7949bc1f5c5c:/opt/airflow# python test.py
   ['123']
   root@7949bc1f5c5c:/opt/airflow# cat test.py
   import multiprocessing
   from pprint import pprint
   
   def worker(the_list):
       """worker function"""
       the_list.append(multiprocessing.current_process().name)
       return
   
   if __name__ == '__main__':
       jobs = []
       the_list = ['123']
       for i in range(3):
           p = multiprocessing.Process(target=worker, args=(the_list, ), 
name="subprocess-{}".format(i))
           jobs.append(p)
           p.start()
       pprint(the_list)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to