KevinYang21 commented on a change in pull request #5908: Revert "[AIRFLOW-4797]
Improve performance and behaviour of zombie de…
URL: https://github.com/apache/airflow/pull/5908#discussion_r321986407
##########
File path: airflow/utils/dag_processing.py
##########
@@ -1193,13 +1195,11 @@ def heartbeat(self):
self._file_path_queue.extend(files_paths_to_queue)
- zombies = self._find_zombies()
-
# Start more processors if we have enough slots and files to process
while (self._parallelism - len(self._processors) > 0 and
len(self._file_path_queue) > 0):
file_path = self._file_path_queue.pop(0)
- processor = self._processor_factory(file_path, zombies)
+ processor = self._processor_factory(file_path, self._zombies)
Review comment:
Since we're using multiprocessing, the list will always be copied as they
won't be able to share any object. Even it shares it's still fine since we only
read from it.
A quick demo on the no share part:
```
root@7949bc1f5c5c:/opt/airflow# python test.py
['123']
root@7949bc1f5c5c:/opt/airflow# cat test.py
import multiprocessing
from pprint import pprint
def worker(the_list):
"""worker function"""
the_list.append(multiprocessing.current_process().name)
return
if __name__ == '__main__':
jobs = []
the_list = ['123']
for i in range(3):
p = multiprocessing.Process(target=worker, args=(the_list, ),
name="subprocess-{}".format(i))
jobs.append(p)
p.start()
pprint(the_list)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services