pingzh edited a comment on pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#issuecomment-717392179


   > Are you sure this is still a problem on mainline. Looking at 
start_new_processes:
   > 
   > ```python
   >     def start_new_processes(self):
   >         """Start more processors if we have enough slots and files to 
process"""
   >         while self._parallelism - len(self._processors) > 0 and 
self._file_path_queue:
   >             file_path = self._file_path_queue.pop(0)
   >             callback_to_execute_for_file = 
self._callback_to_execute[file_path]
   >             processor = self._processor_factory(
   >                 file_path,
   >                 callback_to_execute_for_file,
   >                 self._dag_ids,
   >                 self._pickle_dags)
   > 
   >             del self._callback_to_execute[file_path]
   >             Stats.incr('dag_processing.processes')
   > 
   >             processor.start()
   >             self.log.debug(
   >                 "Started a process (PID: %s) to generate tasks for %s",
   >                 processor.pid, file_path
   >             )
   >             self._processors[file_path] = processor
   >             self.waitables[processor.waitable_handle] = processor
   > ```
   > 
   > I can't see at first glance how `self._parallelism - len(self._processors) 
> 0` would ever lead to too many processes.
   
   Yes, it still an issue, the main logic to launch new dag file processes does 
not change much between `1.10.4` and the `master` branch. We also cherry-picked 
this PR https://github.com/apache/airflow/pull/7597 to our `1.10.4` version.
   
   The issue does not happen often.
   
   This is the incident leading us to find this issue. As you can see, the same 
dag file is processing many times (the dag process for this dag file usually 
takes more than 15 min)
   
   
![image](https://user-images.githubusercontent.com/8662365/97335898-9cef5600-183b-11eb-99fa-6e21e6ef78a2.png)
   
   when the manager adds the callback to the `_file_path_queue`, it does not 
care whether this dag file is currently under processing or in the cool down 
period, which leads to multiple dag processes processing the same dag file.
   
   As for the exceed of the `_parallelism`, I have lost some context about how 
exactly it got into that state :(
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to