pingzh commented on pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#issuecomment-717392179
> Are you sure this is still a problem on mainline. Looking at
start_new_processes:
>
> ```python
> def start_new_processes(self):
> """Start more processors if we have enough slots and files to
process"""
> while self._parallelism - len(self._processors) > 0 and
self._file_path_queue:
> file_path = self._file_path_queue.pop(0)
> callback_to_execute_for_file =
self._callback_to_execute[file_path]
> processor = self._processor_factory(
> file_path,
> callback_to_execute_for_file,
> self._dag_ids,
> self._pickle_dags)
>
> del self._callback_to_execute[file_path]
> Stats.incr('dag_processing.processes')
>
> processor.start()
> self.log.debug(
> "Started a process (PID: %s) to generate tasks for %s",
> processor.pid, file_path
> )
> self._processors[file_path] = processor
> self.waitables[processor.waitable_handle] = processor
> ```
>
> I can't see at first glance how `self._parallelism - len(self._processors)
> 0` would ever lead to too many processes.
Yes, it still an issue, the main logic to launch new dag file processes does
not change much between `1.10.4` and the `master` branch. We also cherry-picked
this PR https://github.com/apache/airflow/pull/7597 to our `1.10.4` version.
The issue does not happen often.
This is the incident leading us to find this issue. As you can see, the same
dag file is processing many times (the dag process for this dag file usually
takes more than 15 min)

I have lost some context about how exactly it got into that state.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]