Thanks Devjyoti for your reply. To elaborate based on your inputs:
- *When to add one more shard*: We have designed some metrics, like "how long the scheduler instance takes to parse & schedule all DAGs (in the subdir it’s taking care of)". When the metric is higher than a given threshold for long enough time, we may want to add one more shard. - *Easy Solution to Balance Shard Load*: Exactly the same as you’re pointing out, we create initial set of shards by randomly distribute our DAGs into each subdir. Similar to building a mathematical model, there are some assumptions we have to make for convenience, like “complexity of DAGs are roughly equal”. As for new DAGs: we developed an application creating DAGs based on metadata, and the application would check the # of files in each subdir and always put the new DAG into the subdir with the least # of DAGs. XD > On 2 Nov 2018, at 12:47 AM, Devjyoti Patra <[email protected]> wrote: > >>> 1. “Shard by # of files may not yield same load”: fully agree with you. > This concern was also raised by other co-workers in my team. But given this > is a preliminary trial, we didn’t consider this yet. > > One issue here is that when do you decide to add one more shard? I think if > you monitor the time it takes to parse each source file and log it; you can > use this to find the outliers when your scheduling SLA is breached and move > the outliers to a new shard. Creating the initial set of shard by randomly > putting an equal number of files in each subdir seems like the easiest way > to approach this problem. > > On Thu, Nov 1, 2018 at 7:11 PM Deng Xiaodong <[email protected]> wrote: > >> Thanks Kelvin and Max for your inputs! >> >> To Kelvin’s questions: >> 1. “Shard by # of files may not yield same load”: fully agree with you. >> This concern was also raised by other co-workers in my team. But given this >> is a preliminary trial, we didn’t consider this yet. >> 2. We haven’t started to look into how we can dynamically allocate >> scheduler resource yet. But I think this preliminary trial would be a good >> starting point. >> 3. DB: look forward to your PR on this! >> 4. “Why do you need to shard the scheduler while the scheduler can scale >> up pretty high” >> There are a few reasons: >> 4.1 we have strict SLA on scheduling. We expect one scheduling loop takes >> < 3 minutes no matter how many DAGs we have >> 4.2 we’re containerising the deployment, while our infrastructure team >> added the restriction that for each pod we can only use up to 2 cores >> (blocked us from scaling vertically). >> 4.3 even though this naive architecture doesn’t provide HA, actually it >> partially addresses the availability concern (if one scheduler out of 5 >> fails, at least 80% DAGs can still be scheduled properly). >> >> To Max’s questions: >> 1. I haven’t tested pools or queues features with this architecture. So >> can’t give a very firm answer on this. >> 2. In the load tests I have done, I haven’t observed such “misfires” yet >> (I’m running a customised version based on 1.10.0 BTW) >> 3. This is a very valid point. I haven’t checked the implementation of DAG >> prioritisation in detail yet. For the scenario in our team, we don’t >> prioritise DAGs, so we didn’t take this into consideration. On the other >> hand, this naive architecture didn’t change anything in Airflow. It simply >> makes use of the “--subdir” argument of scheduler command. If we want to >> have a more serious multi-scheduler setting-up natively supported by >> Airflow, I believe for sure we need to make significant changes to the code >> to ensure all features, like cross DAG prioritisation, are supported. >> >> >> Kindly let me know your thoughts. Thanks! >> >> XD >> >> >>> On 1 Nov 2018, at 4:25 AM, Maxime Beauchemin <[email protected]> >> wrote: >>> >>> A few related thoughts: >>> * there may be hiccups around concurrency (pools, queues), though the >> worker should double-checks that the constraints are still met when firing >> the task, so in theory this should be ok >>> * there may be more "misfires" meaning the task gets sent to the worker, >> but by the time it starts the conditions aren't met anymore because of a >> race condition with one of the other schedulers. Here I'm assuming recent >> versions of Airflow will simply eventually re-fire the misfires and heal >>> * cross DAG prioritization can't really take place anymore as there's >> not a shared "ready-to-run" list of task instances that can be sorted by >> priority_weight. Whichever scheduler instance fires first is likely to get >> the open slots first. >>> >>> Max >>> >>> >>> On Wed, Oct 31, 2018 at 1:00 PM Kevin Yang <[email protected] <mailto: >> [email protected]>> wrote: >>> Finally we start to talk about this seriously? Yeah! :D >>> >>> For your approach, a few thoughts: >>> >>> 1. Shard by # of files may not yield same load--even very different >> load >>> since we may have some framework DAG file producing 500 DAG and take >>> forever to parse. >>> 2. I think Alex Guziel <https://github.com/saguziel < >> https://github.com/saguziel>> had previously >>> talked about using apache helix to shard the scheduler. I haven't >> look a >>> lot into it but may be something you're interested in. I personally >> like >>> that idea because we don't need to reinvent the wheel about a lot >> stuff( >>> less code to maintain also ;) ). >>> 3. About the DB part, I should be contributing back some changes that >>> can dramatically drop the DB CPU usage. Afterwards I think we should >> have >>> plenty of headroom( assuming the traffic is ~4000 DAG files and ~40k >>> concurrency running task instances) so we should probly be fine here. >>> >>> Also I'm kinda curious about your setup and want to understand why do you >>> need to shard the scheduler, since the scheduler can now scale up pretty >>> high actually. >>> >>> Thank you for initiate the discussion, I think it can turn out to be a >> very >>> valuable and critical discussion--many people have been >> thinking/discussing >>> about this and I can't wait to hear the ideas :D >>> >>> Cheers, >>> Kevin Y >>> >>> On Wed, Oct 31, 2018 at 7:38 AM Deng Xiaodong <[email protected] >> <mailto:[email protected]>> wrote: >>> >>>> Hi Folks, >>>> >>>> Previously I initiated a discussion about the best practice of Airflow >>>> setting-up, and it was agreed by a few folks that scheduler may become >> one >>>> of the bottleneck component (we can only run one scheduler instance, >> can >>>> only scale vertically rather than horizontally, etc.). Especially when >> we >>>> have thousands of DAGs, the scheduling latency may be high. >>>> >>>> In our team, we have experimented a naive multiple-scheduler >> architecture. >>>> Would like to share here, and also seek inputs from you. >>>> >>>> **1. Background** >>>> - Inside DAG_Folder, we can have sub-folders. >>>> - When we initiate scheduler instance, we can specify “--subdir” for >> it, >>>> which will specify the specific directory that the scheduler is going >> to >>>> “scan” (https://airflow.apache.org/cli.html#scheduler < >> https://airflow.apache.org/cli.html#scheduler>). >>>> >>>> **2. Our Naive Idea** >>>> Say we have 2,000 DAGs. If we run one single scheduler instance, one >>>> scheduling loop will traverse all 2K DAGs. >>>> >>>> Our idea is: >>>> Step-1: Create multiple sub-directories, say five, under DAG_Folder >>>> (subdir1, subdir2, …, subdir5) >>>> Step-2: Distribute the DAGs evenly into these sub-directories (400 >> DAGs in >>>> each) >>>> Step-3: then we can start scheduler instance on 5 different machines, >>>> using command `airflow scheduler --subdir subdir<i>` on machine <i>. >>>> >>>> Hence eventually, each scheduler only needs to take care of 400 DAGs. >>>> >>>> **3. Test & Results** >>>> - We have done a testing using 2,000 DAGs (3 tasks in each DAG). >>>> - DAGs are stored using network attached storage (the same drive >> mounted >>>> to all nodes), so we don’t concern about the DAG_Folder >> synchronization. >>>> - No conflict observed (each DAG file will only be parsed & scheduled >> by >>>> one scheduler instance). >>>> - The scheduling speed improves almost linearly. Demonstrated that we >> can >>>> scale scheduler horizontally. >>>> >>>> **4. Highlight** >>>> - This naive idea doesn’t address scheduler availability. >>>> - As Kelvin Yang shared earlier in another thread, the database may be >>>> another bottleneck when the load is high. But this is not considered >> here >>>> yet. >>>> >>>> >>>> Kindly share your thoughts on this naive idea. Thanks. >>>> >>>> >>>> >>>> Best regards, >>>> XD >>>> >>>> >>>> >>>> >>>> >> >>
