huozhanfeng edited a comment on issue #17437: URL: https://github.com/apache/airflow/issues/17437#issuecomment-893444227
> I think, before you start requesting new features, it is great to check if the existing features are not working well for you: > > 1. Did you try to configure multiple schedulers? Airflow 2 has been specifically designed to be able to scale it's operations with mulltiple schedulers. Please try to increase the number of schedulers you have and see if that can improve your experience. > 2. There are a number of settings that you can configure to prioritize scheduler and improve it's speed. Did you try to fine-tune them? For example "file-parsing-sort-mode" should be able to control the sequence of parsing the file https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#file-parsing-sort-mode > 3. There are also other parameters that can control the behaviour of parsers (see the "scrheduler" section in config. > > There are also plenty of materials that you can learn from and try to fine tune the behaviour of scheduler: > > * Official documentation https://airflow.apache.org/docs/apache-airflow/stable/concepts/scheduler.html > * Astronomer's Blog detailing the new features, tunables and scalabiliy of the scheduler https://www.astronomer.io/blog/airflow-2-scheduler#:~:text=As%20part%20of%20Apache%20Airflow,once%20their%20dependencies%20are%20met. > * This fantastic talk from @ashb about Scheduler in Airflow 2 from this yea'rs Airflow Summit, about how it works and how it can be tuned https://airflowsummit.org/sessions/2021/deep-dive-in-to-the-airflow-scheduler/ > Please take a look at those resources and try to fine tune your scheduler accordingly. Come back please with your findings and some more data detailing what you have done and how you tried to fine-tune your configuration. > > Ideally, it would be great if you can report both - if you manage to improve your configuration, let us know what worked and why, if you will try all of that and it did not work - please also report back all the observations you had during your trials - CPU, memory used, I/O usage, what kind of storage you have for dags, whether you tried to fine tune the storage options (for example we know that you need to buy extra I/O when you use EFS as DAG storage otherwise you are limited with the efficiency of the storage). You need to tell us where you saw the bottlenecks and how you tried to overcome them.. > > That will help us to see if there are still some bottlenecks and fine tuning parameters that we were not able to foresee when we designed fine-tuning possibilities for the scheduler (but we need more data from you). > > I am closing it for now, until you can provide this data for us to investigate. Thanks for your suggestion and it's a great harvest for me. In our env, the scheduler machine info is that 16*CPU, 32G memory, and SSD Disk. There are two schedulers in our production with same type of machine. And here are the key configurations of our production env. <pre> parsing_processes = 15 min_file_process_interval = 60 dag_dir_list_interval = 60 num_runs = -1 processor_poll_interval = 1 schedule_after_task_execution = True file_parsing_sort_mode = modified_time </pre> Here is the machine state of scheduler. <pre> top - 20:10:52 up 198 days, 2:28, 2 users, load average: 30.46, 30.87, 31.93 Tasks: 205 total, 17 running, 188 sleeping, 0 stopped, 0 zombie %Cpu0 : 51.0 us, 47.0 sy, 0.0 ni, 2.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu1 : 59.2 us, 39.8 sy, 0.0 ni, 1.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu2 : 54.6 us, 44.0 sy, 0.0 ni, 1.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu3 : 63.2 us, 33.4 sy, 0.0 ni, 3.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu4 : 48.3 us, 42.7 sy, 0.0 ni, 8.9 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu5 : 56.0 us, 40.1 sy, 0.0 ni, 4.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu6 : 49.3 us, 44.0 sy, 0.0 ni, 6.6 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu7 : 57.0 us, 42.1 sy, 0.0 ni, 1.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu8 : 58.8 us, 38.9 sy, 0.0 ni, 2.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu9 : 63.5 us, 32.2 sy, 0.0 ni, 4.0 id, 0.0 wa, 0.0 hi, 0.3 si, 0.0 st %Cpu10 : 50.8 us, 47.2 sy, 0.0 ni, 2.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu11 : 58.9 us, 39.4 sy, 0.0 ni, 1.3 id, 0.0 wa, 0.0 hi, 0.3 si, 0.0 st %Cpu12 : 55.3 us, 42.7 sy, 0.0 ni, 2.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu13 : 61.1 us, 37.5 sy, 0.0 ni, 1.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu14 : 67.7 us, 29.0 sy, 0.0 ni, 3.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st %Cpu15 : 64.0 us, 31.7 sy, 0.0 ni, 4.0 id, 0.0 wa, 0.0 hi, 0.3 si, 0.0 st KiB Mem : 32780340 total, 7585712 free, 1444024 used, 23750604 buff/cache KiB Swap: 0 total, 0 free, 0 used. 30932240 avail Mem </pre> There are 5000+ dags files and it will delay for almost 10mins to recognize the new dag file if a new one comes with such configuration. For example, the last run time of GAEA_test_001.py updated every 7~10 mins. Part of log in `dag_processor_manager.log` as follows: <pre> File Path PID Runtime # DAGs # Errors Last Runtime Last Run --------------------------------------------------------------------------------------------------------------------------------------- ----- --------- -------- ---------- -------------- ------------------- /.../airflow/dags/dtp-dags/GAEA_test_001.py 1 0 0.95s 2021-08-05T16:31:53 </pre> Then I find the logic is that there is a queue named `_file_path_queue` in `class DagFileProcessorManager`, the queue can only be added new messages until it's empty. The configuration like `file_parsing_sort_mode = modified_time` only works in the next circulation of the queue, so it's doesn't work well to let new dag file or new modified dag file recognized faster because a circulation to process 5000+ dags will spend almost 10mins. In addition, `_file_path_queue` is not shared by other scheduler instances, so in one way, it's only related to one scheduler even there are more instances(of course more instances will take some of the pressure off due to the progress of every instance is different, but it's not a fundamental solution). <pre> airflow/utils/dag_processing.py if not self._file_path_queue: self.emit_metrics() self.prepare_file_path_queue() // add new message to `_file_path_queue` and only function `prepare_file_path_queue` can add messages to the queue </pre> So in my opinion, only to scale up the machine or to add logic to let `_file_path_queue` can be modified when there are priority adjustments, such as there are new dags come. Currently, I have written a triky patch to solve it in our env temporarily, the key point is it will turn or add task message into first place of `_file_path_queue` when doing a `refresh_dag_dir` action which is related to configuration `dag_dir_list_interval`. It works well but it's not enough standard to submit a PR now. <pre> diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 4b852342b..6e8398c77 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -554,6 +554,8 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc self._last_zombie_query_time = None # Last time that the DAG dir was traversed to look for files self.last_dag_dir_refresh_time = timezone.make_aware(datetime.fromtimestamp(0)) + # Last dag files info with mtime + self.last_dag_files_with_mtime = {} # Last time stats were printed self.last_stat_print_time = 0 # TODO: Remove magic number @@ -956,6 +958,17 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc """ self._file_paths = new_file_paths self._file_path_queue = [x for x in self._file_path_queue if x in new_file_paths] + # Insert modified dags into top of the queue + files_with_mtime = {} + for file_path in new_file_paths: + files_with_mtime[file_path] = os.path.getmtime(file_path) + if len(self.last_dag_files_with_mtime) != 0: + for file in files_with_mtime.keys(): + if file not in self.last_dag_files_with_mtime or files_with_mtime.get(file) > self.last_dag_files_with_mtime.get(file): + if file in self._file_path_queue: + self._file_path_queue.remove(file) + self._file_path_queue.insert(0, file) + self.last_dag_files_with_mtime = files_with_mtime # Stop processors that are working on deleted files filtered_processors = {} for file_path, processor in self._processors.items(): </pre> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
