huozhanfeng commented on issue #17437:
URL: https://github.com/apache/airflow/issues/17437#issuecomment-893444227


   > I think, before you start requesting new features, it is great to check if 
the existing features are not working well for you:
   > 
   > 1. Did you try to configure multiple schedulers? Airflow 2 has been 
specifically designed to be able to scale it's operations with mulltiple 
schedulers. Please try to increase the number of schedulers you have and see if 
that can improve your experience.
   > 2. There are a number of settings that you can configure to prioritize 
scheduler and improve it's speed. Did you try to fine-tune them? For example 
"file-parsing-sort-mode" should be able to control the sequence of parsing the 
file 
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#file-parsing-sort-mode
   > 3. There are also other parameters that can control the behaviour of 
parsers (see the "scrheduler" section in config.
   > 
   > There are also plenty of materials that you can learn from and try to fine 
tune the behaviour of scheduler:
   > 
   > * Official documentation 
https://airflow.apache.org/docs/apache-airflow/stable/concepts/scheduler.html
   > * Astronomer's Blog detailing the new features, tunables and scalabiliy of 
the scheduler 
https://www.astronomer.io/blog/airflow-2-scheduler#:~:text=As%20part%20of%20Apache%20Airflow,once%20their%20dependencies%20are%20met.
   > * This fantastic talk from @ashb  about Scheduler in Airflow 2 from this 
yea'rs Airflow Summit, about how it works and how it can be tuned 
https://airflowsummit.org/sessions/2021/deep-dive-in-to-the-airflow-scheduler/
   >   Please take a look at those resources and try to fine tune your 
scheduler accordingly. Come back please with your findings and some more data 
detailing what you have done and how you tried to fine-tune your configuration.
   > 
   > Ideally, it would be great if you can report both - if you manage to 
improve your configuration, let us know what worked and why, if you will try 
all of that and it did not work - please also report back all the observations 
you had during your trials - CPU, memory used, I/O usage, what kind of storage 
you have for dags, whether you tried to fine tune the storage options (for 
example we know that you need to buy extra I/O when you use EFS as DAG storage 
otherwise you are limited with the efficiency of the storage). You need to tell 
us where you saw the bottlenecks and how you tried to overcome them..
   > 
   > That will help us to see if there are still some bottlenecks and fine 
tuning parameters that we were not able to foresee when we designed fine-tuning 
possibilities for the scheduler (but we need more data from you).
   > 
   > I am closing it for now, until you can provide this data for us to 
investigate.
   
   Thanks for your suggestion and it's a great harvest for me. In our env, the 
scheduler machine info is that 16*CPU, 32G memory, and SSD Disk. There are two 
schedulers in our production with same type of machine. And here are the key 
configurations of our production env.
   <pre>
   parsing_processes = 15
   min_file_process_interval = 60
   dag_dir_list_interval = 60
   num_runs = -1
   processor_poll_interval = 1
   schedule_after_task_execution = True
   file_parsing_sort_mode = modified_time
   </pre>
   
   Here is the machine state of scheduler.
   <pre>
   top - 20:10:52 up 198 days,  2:28,  2 users,  load average: 30.46, 30.87, 
31.93
   Tasks: 205 total,  17 running, 188 sleeping,   0 stopped,   0 zombie
   %Cpu0  : 51.0 us, 47.0 sy,  0.0 ni,  2.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu1  : 59.2 us, 39.8 sy,  0.0 ni,  1.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu2  : 54.6 us, 44.0 sy,  0.0 ni,  1.3 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu3  : 63.2 us, 33.4 sy,  0.0 ni,  3.3 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu4  : 48.3 us, 42.7 sy,  0.0 ni,  8.9 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu5  : 56.0 us, 40.1 sy,  0.0 ni,  4.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu6  : 49.3 us, 44.0 sy,  0.0 ni,  6.6 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu7  : 57.0 us, 42.1 sy,  0.0 ni,  1.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu8  : 58.8 us, 38.9 sy,  0.0 ni,  2.3 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu9  : 63.5 us, 32.2 sy,  0.0 ni,  4.0 id,  0.0 wa,  0.0 hi,  0.3 si,  0.0 
st
   %Cpu10 : 50.8 us, 47.2 sy,  0.0 ni,  2.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu11 : 58.9 us, 39.4 sy,  0.0 ni,  1.3 id,  0.0 wa,  0.0 hi,  0.3 si,  0.0 
st
   %Cpu12 : 55.3 us, 42.7 sy,  0.0 ni,  2.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu13 : 61.1 us, 37.5 sy,  0.0 ni,  1.3 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu14 : 67.7 us, 29.0 sy,  0.0 ni,  3.3 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 
st
   %Cpu15 : 64.0 us, 31.7 sy,  0.0 ni,  4.0 id,  0.0 wa,  0.0 hi,  0.3 si,  0.0 
st
   KiB Mem : 32780340 total,  7585712 free,  1444024 used, 23750604 buff/cache
   KiB Swap:        0 total,        0 free,        0 used. 30932240 avail Mem
   </pre>
   
   There are 5000+ dags files and it will delay for almost 10mins to recognize 
the new dag file if a new one comes with such configuration. For example, the 
last run time of GAEA_test_001.py updated every 7~10 mins. Part of log in 
`dag_processor_manager.log` as follows:
   <pre>
   File Path                                                                    
                                                              PID  Runtime      
# DAGs    # Errors  Last Runtime    Last Run
   
---------------------------------------------------------------------------------------------------------------------------------------
  -----  ---------  --------  ----------  --------------  -------------------
   /.../airflow/dags/dtp-dags/GAEA_test_001.py                                  
                                                                 1           0  
0.95s           2021-08-05T16:31:53
   </pre>
   
   Then I find the logic is that there is a queue named `_file_path_queue` in 
`class DagFileProcessorManager`, the queue can only be added new messages until 
it's empty. The configuration like `file_parsing_sort_mode = modified_time` 
only works in the next circulation of the queue, so it's doesn't work well to 
let new dag file or new modified dag file recognized faster because a 
circulation to process 5000+ dags will spend almost 10mins. In addition, 
`_file_path_queue` is not shared by other scheduler instances, so in one way,  
it's only related to one scheduler even there are more instances(of course more 
instances will take some of the pressure off due to the progress of every 
instance is different, but it's not a fundamental solution). 
   
   <pre>
   airflow/utils/dag_processing.py
   
   if not self._file_path_queue:
                   self.emit_metrics()
                   self.prepare_file_path_queue() // add new message to 
`_file_path_queue` and only function `prepare_file_path_queue` can add messages 
to the queue
   </pre>
   
   So in my opinion, only to scale up the machine or to add logic to let 
`_file_path_queue` can be modified when there are priority adjustments, such as 
there are new dags come. 
   
   Currently, I have written a triky patch to solve it in our env temporarily, 
the key point is it will turn or add task message into first place of 
`_file_path_queue` when doing a `refresh_dag_dir` action which is related by 
configuration `dag_dir_list_interval`. It works well but it's not enough 
standard to submit a PR now.
   <pre>
   diff --git a/airflow/utils/dag_processing.py 
b/airflow/utils/dag_processing.py
   index 4b852342b..6e8398c77 100644
   --- a/airflow/utils/dag_processing.py
   +++ b/airflow/utils/dag_processing.py
   @@ -554,6 +554,8 @@ class DagFileProcessorManager(LoggingMixin):  # pylint: 
disable=too-many-instanc
            self._last_zombie_query_time = None
            # Last time that the DAG dir was traversed to look for files
            self.last_dag_dir_refresh_time = 
timezone.make_aware(datetime.fromtimestamp(0))
   +        # Last dag files info with mtime
   +        self.last_dag_files_with_mtime = {}
            # Last time stats were printed
            self.last_stat_print_time = 0
            # TODO: Remove magic number
   @@ -956,6 +958,17 @@ class DagFileProcessorManager(LoggingMixin):  # pylint: 
disable=too-many-instanc
            """
            self._file_paths = new_file_paths
            self._file_path_queue = [x for x in self._file_path_queue if x in 
new_file_paths]
   +        # Insert modified dags into top of the queue
   +        files_with_mtime = {}
   +        for file_path in new_file_paths:
   +            files_with_mtime[file_path] = os.path.getmtime(file_path)
   +        if len(self.last_dag_files_with_mtime) != 0:
   +            for file in files_with_mtime.keys():
   +                if file not in self.last_dag_files_with_mtime or 
files_with_mtime.get(file) > self.last_dag_files_with_mtime.get(file):
   +                    if file in self._file_path_queue:
   +                        self._file_path_queue.remove(file)
   +                    self._file_path_queue.insert(0, file)
   +        self.last_dag_files_with_mtime = files_with_mtime
            # Stop processors that are working on deleted files
            filtered_processors = {}
            for file_path, processor in self._processors.items():
   </pre>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to