I mean at that point it's just as easy (or easier) to do things properly: get the scheduler subprocesses to take a lock on the DAG it's about to process, and release it when it's done. Add a lock timestamp and bit of logic to expire locks (to self heal if the process ever crashed and failed at releasing the lock). Of course make sure that the confirm-its-not-locked-and-take-a-lock process is insulated in a database transaction, and your'e mostly good. That sounds like a very easy thing to do.
The only thing that's missing at that point to fully support multi-schedulers is to centralize the logic that does the prioritization and pushing to workers. That's a bit more complicated, it assumes a leader (and leader election), and to change the logic of how individual "DAG-evaluator processes" communicate what task instances are runnable to that leader (over a message queue? over the database?). Max On Thu, Nov 8, 2018 at 10:02 AM Daniel (Daniel Lamblin) [BDP - Seoul] < lamb...@coupang.com> wrote: > Since you're discussing multi-scheduler trials, > Based on v1.8 we have also tried something, based on passing in a regex to > each scheduler; DAG file paths which match it are ignored. This required > turning off some logic that deletes dag data for dags that are missing from > the dagbag. > It is pretty manual and not evenly distributed, but it allows some 5000+ > DAGs or so with 6 scheduler instances. That said there's some pain around > maintaining such a setup, so we didn't opt for it (yet) in our v1.10 setup. > The lack of cleaning up an old dag name is also not great (it can be done > semi manually). Then there's the work in trying to redefine patterns for > better mixes, testing that patterns don't all ignore the same file, nor > that more than one scheduler includes the same file. I generally wouldn't > suggest this approach. > > In considering to setup a similar modification to v1.10, we thought it > would make sense to instead tell each scheduler which scheduler number it > is, and how many total schedulers there are. Then each scheduler can use > some hash (cityhash?) on the whole py file path, mod it by the scheduler > count, and only parse it if it matches its scheduler number. > > This seemed like a good way to keep a fixed number of schedulers balancing > new dag files, but we didn't do it (yet) because we started to think about > getting fancier: what if a scheduler needs to be added? Can it be done > without stopping the others and update the total count; or vice-versa for > removing a scheduler. If one scheduler drops out can the others renumber > themselves? If that could be solved, then the schedulers could be made into > an autoscaling group… For this we thought about wrapping the whole > scheduler instance's process up in some watchdog that might coordinate with > something like zookeeper (or by using the existing airflow DB) but it got > to be full of potential loopholes for the schedulers, like needing to be in > sync about refilling the dagbag in concert with each other when there's a > change in the total count, and problems when one drops off but is actually > not really down for the count and pops back in having missed that the > others decided changed their numbering, etc. > > I bring this up because the basic form of the ideas doesn't hinge on which > folder a dag is in, which seems more likely to work nicely with team based > hierarchies which also import reusable modules across DAG files. > -Daniel > P.S. yeah we did find there were times when schedulers exited because > there was a db lock on task instances they were trying to update. So the DB > needs to be managed by someone who knows how to scale it for that… or > possibly the model needs to be made more conducive to minimally locking > updates. > > On 10/31/18, 11:38 PM, "Deng Xiaodong" <xd.den...@gmail.com> wrote: > > Hi Folks, > > Previously I initiated a discussion about the best practice of Airflow > setting-up, and it was agreed by a few folks that scheduler may become one > of the bottleneck component (we can only run one scheduler instance, can > only scale vertically rather than horizontally, etc.). Especially when we > have thousands of DAGs, the scheduling latency may be high. > > In our team, we have experimented a naive multiple-scheduler > architecture. Would like to share here, and also seek inputs from you. > > *1. Background* > - Inside DAG_Folder, we can have sub-folders. > - When we initiate scheduler instance, we can specify “--subdir” for > it, which will specify the specific directory that the scheduler is going > to “scan” (https://airflow.apache.org/cli.html#scheduler). > > *2. Our Naive Idea* > Say we have 2,000 DAGs. If we run one single scheduler instance, one > scheduling loop will traverse all 2K DAGs. > > Our idea is: > Step-1: Create multiple sub-directories, say five, under DAG_Folder > (subdir1, subdir2, …, subdir5) > Step-2: Distribute the DAGs evenly into these sub-directories (400 > DAGs in each) > Step-3: then we can start scheduler instance on 5 different machines, > using command `airflow scheduler --subdir subdir<i>` on machine <i>. > > Hence eventually, each scheduler only needs to take care of 400 DAGs. > > *3. Test & Results* > - We have done a testing using 2,000 DAGs (3 tasks in each DAG). > - DAGs are stored using network attached storage (the same drive > mounted to all nodes), so we don’t concern about the DAG_Folder > synchronization. > - No conflict observed (each DAG file will only be parsed & scheduled > by one scheduler instance). > - The scheduling speed improves almost linearly. Demonstrated that we > can scale scheduler horizontally. > > *4. Highlight* > - This naive idea doesn’t address scheduler availability. > - As Kelvin Yang shared earlier in another thread, the database may be > another bottleneck when the load is high. But this is not considered here > yet. > > > Kindly share your thoughts on this naive idea. Thanks. > > > > Best regards, > XD > > > > > > >