I mean at that point it's just as easy (or easier) to do things properly:
get the scheduler subprocesses to take a lock on the DAG it's about to
process, and release it when it's done. Add a lock timestamp and bit of
logic to expire locks (to self heal if the process ever crashed and failed
at releasing the lock). Of course make sure that the
confirm-its-not-locked-and-take-a-lock process is insulated in a database
transaction, and your'e mostly good. That sounds like a very easy thing to

The only thing that's missing at that point to fully support
multi-schedulers is to centralize the logic that does the prioritization
and pushing to workers. That's a bit more complicated, it assumes a leader
(and leader election), and to change the logic of how individual
"DAG-evaluator processes" communicate what task instances are runnable to
that leader (over a message queue? over the database?).


On Thu, Nov 8, 2018 at 10:02 AM Daniel (Daniel Lamblin) [BDP - Seoul] <
lamb...@coupang.com> wrote:

> Since you're discussing multi-scheduler trials,
> Based on v1.8 we have also tried something, based on passing in a regex to
> each scheduler; DAG file paths which match it are ignored. This required
> turning off some logic that deletes dag data for dags that are missing from
> the dagbag.
> It is pretty manual and not evenly distributed, but it allows some 5000+
> DAGs or so with 6 scheduler instances. That said there's some pain around
> maintaining such a setup, so we didn't opt for it (yet) in our v1.10 setup.
> The lack of cleaning up an old dag name is also not great (it can be done
> semi manually). Then there's the work in trying to redefine patterns for
> better mixes, testing that patterns don't all ignore the same file, nor
> that more than one scheduler includes the same file. I generally wouldn't
> suggest this approach.
> In considering to setup a similar modification to v1.10, we thought it
> would make sense to instead tell each scheduler which scheduler number it
> is, and how many total schedulers there are. Then each scheduler can use
> some hash (cityhash?) on the whole py file path, mod it by the scheduler
> count, and only parse it if it matches its scheduler number.
> This seemed like a good way to keep a fixed number of schedulers balancing
> new dag files, but we didn't do it (yet) because we started to think about
> getting fancier: what if a scheduler needs to be added? Can it be done
> without stopping the others and update the total count; or vice-versa for
> removing a scheduler. If one scheduler drops out can the others renumber
> themselves? If that could be solved, then the schedulers could be made into
> an autoscaling group… For this we thought about wrapping the whole
> scheduler instance's process up in some watchdog that might coordinate with
> something like zookeeper (or by using the existing airflow DB) but it got
> to be full of potential loopholes for the schedulers, like needing to be in
> sync about refilling the dagbag in concert with each other when there's a
> change in the total count, and problems when one drops off but is actually
> not really down for the count and pops back in having missed that the
> others decided changed their numbering, etc.
> I bring this up because the basic form of the ideas doesn't hinge on which
> folder a dag is in, which seems more likely to work nicely with team based
> hierarchies which also import reusable modules across DAG files.
> -Daniel
> P.S. yeah we did find there were times when schedulers exited because
> there was a db lock on task instances they were trying to update. So the DB
> needs to be managed by someone who knows how to scale it for that… or
> possibly the model needs to be made more conducive to minimally locking
> updates.
> On 10/31/18, 11:38 PM, "Deng Xiaodong" <xd.den...@gmail.com> wrote:
>     Hi Folks,
>     Previously I initiated a discussion about the best practice of Airflow
> setting-up, and it was agreed by a few folks that scheduler may become one
> of the bottleneck component (we can only run one scheduler instance, can
> only scale vertically rather than horizontally, etc.). Especially when we
> have thousands of DAGs, the scheduling latency may be high.
>     In our team, we have experimented a naive multiple-scheduler
> architecture. Would like to share here, and also seek inputs from you.
>     *1. Background*
>     - Inside DAG_Folder, we can have sub-folders.
>     - When we initiate scheduler instance, we can specify “--subdir” for
> it, which will specify the specific directory that the scheduler is going
> to “scan” (https://airflow.apache.org/cli.html#scheduler).
>     *2. Our Naive Idea*
>     Say we have 2,000 DAGs. If we run one single scheduler instance, one
> scheduling loop will traverse all 2K DAGs.
>     Our idea is:
>     Step-1: Create multiple sub-directories, say five, under DAG_Folder
> (subdir1, subdir2, …, subdir5)
>     Step-2: Distribute the DAGs evenly into these sub-directories (400
> DAGs in each)
>     Step-3: then we can start scheduler instance on 5 different machines,
> using command `airflow scheduler --subdir subdir<i>` on machine <i>.
>     Hence eventually, each scheduler only needs to take care of 400 DAGs.
>     *3. Test & Results*
>     - We have done a testing using 2,000 DAGs (3 tasks in each DAG).
>     - DAGs are stored using network attached storage (the same drive
> mounted to all nodes), so we don’t concern about the DAG_Folder
> synchronization.
>     - No conflict observed (each DAG file will only be parsed & scheduled
> by one scheduler instance).
>     - The scheduling speed improves almost linearly. Demonstrated that we
> can scale scheduler horizontally.
>     *4. Highlight*
>     - This naive idea doesn’t address scheduler availability.
>     - As Kelvin Yang shared earlier in another thread, the database may be
> another bottleneck when the load is high. But this is not considered here
> yet.
>     Kindly share your thoughts on this naive idea. Thanks.
>     Best regards,
>     XD

Reply via email to