Thanks Max. I have documented all the discussions around this topic & useful inputs into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling Performance) https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651 <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>.
More inputs from folks are welcomed. Thanks. XD > On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <maximebeauche...@gmail.com> > wrote: > > Personally I'd vote against the idea of having certain scheduler handling a > subset of the DAGs, that's just not HA. > > Also if you are in an env where you have a small number of large DAGs, the > odds of having wasted work and double-firing get pretty high. > > With the lock in place, it's just a matter of the scheduler loop to select > (in a db transaction) the dag that's not been processed for the longest > time that is not locked. Flipping the lock flag to true should be part of > the db transaction. We probably need a btree index on lock and last > processed time. > > This way adding scheduler processes increases the scheduling pace, and > provides an HA solution. No leader / master / slave or election process, > just equal workers that work together. > > Max > > On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.den...@gmail.com> wrote: > >> Get your point and agree. And the suggestion you gave lastly to random >> sort DAGs is a great idea to address it. Thanks! >> >> XD >> >>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <jarek.pot...@polidea.com> >> wrote: >>> >>> I think that the probability calculation holds only if there is no >>> correlation between different schedulers. I think however there might be >> an >>> accidental correlation if you think about typical deployments. >>> >>> Some details why I think accidental correlation is possible and even >>> likely. Assume that: >>> >>> - we have similar and similarly busy machines running schedulers >> (likely) >>> - time is synchronised between the machines (likely) >>> - the machines have the same DAG folders mounted (or copied) and the >>> same filesystem is used (this is exactly what multiple schedulers >>> deployment is all about) >>> - the schedulers start scanning at exactly the same time (crossing 0:00 >>> second every full five minutes for example) - this I am not sure but I >>> imagine this might be "typical" behaviour. >>> - they process list of DAGs in exactly the same sequence (it looks like >>> this is the case dag_processing >>> < >> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300 >>> >>> and models/__init__ >>> < >> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567 >>> : >>> we use os.walk which uses os.listdir for which sequence of processing >>> depends on the filesystem implementation >>> < >> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic> >>> and >>> then we append files to the list) >>> >>> Then it's rather likely that the schedulers will be competing about the >>> very same DAGs at the very beginning. Locking will change how quickly >> they >>> process each DAG of course, but If the DAGs are of similar sizes it's >> also >>> likely that the speed of scanning (DAGS/s) is similar for all schedulers. >>> The schedulers will then catch-up with each other and might pretty much >>> continuously compete for the same DAGs almost all the time. >>> >>> It can be mitigated super-easily by random sorting of the DAGs folder >> list >>> after it is prepared (it's file-system dependent now so we do not rely on >>> particular order) . Then the probability numbers will hold perfectly I >>> think :) >>> >>> J. >>> >>> >>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.den...@gmail.com> >> wrote: >>> >>>> I’m thinking of which architecture would be ideal. >>>> >>>> >>>> # Option-1: >>>> The master-slave architecture would be one option. But leader-selection >>>> will be very essential to consider, otherwise we have issue in terms of >> HA >>>> again. >>>> >>>> >>>> # Option-2: >>>> Another option we may consider is to simply start multiple scheduler >>>> instances (just using the current implementation, after modify & >> validate >>>> the scheduler_lock on DagModel). >>>> >>>> - In this case, given we handle everything properly using locking, we >>>> don’t need to worry too much about double-scheduling/triggering. >>>> >>>> - Another potential concern I had earlier is that different schedulers >> may >>>> compete with each other and cause “waste” of scheduler resource. >>>> After further thinking, I realise this is a typical Birthday Problem. >>>> Given we have m DAGs, and n schedulers, at any moment, the probability >>>> that all schedulers are working on different DAGs is m!/((m-n)! * >> (m^n)), >>>> and the probability that there are schedulers competing on the same DAG >>>> will be 1-m!/((m-n)! * (m^n)). >>>> >>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the >>>> probability that there is schedulers competing on the same DAG is only >>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only >>>> 0.33%. >>>> (This probability will be higher if m/n is low. But users should not >> start >>>> too many schedulers if they don’t have that many DAGs). >>>> >>>> Given the probability of schedulers competing is so low, my concern on >>>> scheduler resource waste is not really valid. >>>> >>>> >>>> >>>> Based on these calculations/assessment, I think we can go for option-2, >>>> i.e. we don’t make big change in the current implementation. Instead, we >>>> ensure the scheduler_lock is working well and test intensively on >> running >>>> multiple schedulers. Then we should be good to let users know that it’s >>>> safe to run multiple schedulers. >>>> >>>> Please share your thoughts on this and correct me if I’m wrong in any >>>> point above. Thanks. >>>> >>>> >>>> XD >>>> >>>> >>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem < >>>> https://en.wikipedia.org/wiki/Birthday_problem> >>>> >>>> >>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fengta...@gmail.com> wrote: >>>>> >>>>> Does the proposal use master-slave architecture(leader scheduler vs >> slave >>>>> scheduler)? >>>>> >>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrql...@gmail.com> wrote: >>>>> >>>>>> Preventing double-triggering by separating DAG files different >>>> schedulers >>>>>> parse sounds easier and more intuitive. I actually removed one of the >>>>>> double-triggering prevention logic here >>>>>> < >>>>>> >>>> >> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127 >>>>>>> (expensive) >>>>>> and >>>>>> was relying on this lock >>>>>> < >>>>>> >>>> >> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233 >>>>>>> >>>>>> to >>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw the >>>>>> insert can be insert overwrite to be idempotent). >>>>>> >>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see >> double-firing >>>>>> recently. >>>>>> >>>>>> Cheers, >>>>>> Kevin Y >>>>>> >>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin < >>>>>> maximebeauche...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Forgot to mention: the intention was to use the lock, but I never >>>>>>> personally got to do the second phase which would consist of skipping >>>> the >>>>>>> DAG if the lock is on, and expire the lock eventually based on a >> config >>>>>>> setting. >>>>>>> >>>>>>> Max >>>>>>> >>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin < >>>>>>> maximebeauche...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> My original intention with the lock was preventing >> "double-triggering" >>>>>> of >>>>>>>> task (triggering refers to the scheduler putting the message in the >>>>>>> queue). >>>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing >>>>>> happens >>>>>>>> when the worker receives the message and starts the task), even if >> the >>>>>>>> scheduler was to go rogue or restart and send multiple triggers for >> a >>>>>>> task >>>>>>>> instance, the worker(s) should only start one task instance. That's >>>>>> done >>>>>>> by >>>>>>>> running the database assertions behind the conditions being met as >>>> read >>>>>>>> database transaction (no task can alter the rows that validate the >>>>>>>> assertion while it's getting asserted). In practice it's a little >>>>>> tricky >>>>>>>> and we've seen rogue double-firing in the past (I have no idea how >>>>>> often >>>>>>>> that happens). >>>>>>>> >>>>>>>> If we do want to prevent double-triggerring, we should make sure >> that >>>> 2 >>>>>>>> schedulers aren't processing the same DAG or DagRun at the same >> time. >>>>>>> That >>>>>>>> would mean for the scheduler to not start the process of locked >> DAGs, >>>>>> and >>>>>>>> by providing a mechanism to expire the locks after some time. >>>>>>>> >>>>>>>> Has anyone experienced double firing lately? If that exist we should >>>>>> fix >>>>>>>> it, but also be careful around multiple scheduler double-triggering >> as >>>>>> it >>>>>>>> would make that problem potentially much worse. >>>>>>>> >>>>>>>> Max >>>>>>>> >>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd.den...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>>> It’s exactly what my team is doing & what I shared here earlier >> last >>>>>>> year >>>>>>>>> ( >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>> < >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>> >>>>>>>>> ) >>>>>>>>> >>>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now >>>> I’m >>>>>>>>> thinking how we can have it more proper & robust. >>>>>>>>> >>>>>>>>> >>>>>>>>> XD >>>>>>>>> >>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo < >> mario.urqu...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> We have been running multiple schedulers for about 3 months. We >>>>>>> created >>>>>>>>>> multiple services to run airflow schedulers. The only difference >> is >>>>>>>>> that >>>>>>>>>> we have each of the schedulers pointed to a directory one level >>>>>> deeper >>>>>>>>> than >>>>>>>>>> the DAG home directory that the workers and webapp use. We have >> seen >>>>>>>>> much >>>>>>>>>> better scheduling performance but this does not yet help with HA. >>>>>>>>>> >>>>>>>>>> DAGS_HOME: >>>>>>>>>> {airflow_home}/dags (webapp & workers) >>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1) >>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2) >>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3) >>>>>>>>>> >>>>>>>>>> Not sure if this helps, just sharing in case it does. >>>>>>>>>> >>>>>>>>>> Thank you, >>>>>>>>>> Mario >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bdbr...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I have done quite some work on making it possible to run multiple >>>>>>>>>>> schedulers at the same time. At the moment I don’t think there >> are >>>>>>>>> real >>>>>>>>>>> blockers actually to do so. We just don’t actively test it. >>>>>>>>>>> >>>>>>>>>>> Database locking is mostly in place (DagRuns and TaskInstances). >>>>>> And >>>>>>> I >>>>>>>>>>> think the worst that can happen is that a task is scheduled >> twice. >>>>>>> The >>>>>>>>> task >>>>>>>>>>> will detect this most of the time and kill one off if concurrent >> if >>>>>>> not >>>>>>>>>>> sequential then I will run again in some occasions. Everyone is >>>>>>> having >>>>>>>>>>> idempotent tasks right so no harm done? ;-) >>>>>>>>>>> >>>>>>>>>>> Have you encountered issues? Maybe work those out? >>>>>>>>>>> >>>>>>>>>>> Cheers >>>>>>>>>>> Bolke. >>>>>>>>>>> >>>>>>>>>>> Verstuurd vanaf mijn iPad >>>>>>>>>>> >>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong < >> xd.den...@gmail.com> >>>>>>> het >>>>>>>>>>> volgende geschreven: >>>>>>>>>>>> >>>>>>>>>>>> Hi Max, >>>>>>>>>>>> >>>>>>>>>>>> Following >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>>>> < >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>>> , >>>>>>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in >>>>>>>>> Airflow >>>>>>>>>>> (mainly for HA and Higher scheduling performance). >>>>>>>>>>>> >>>>>>>>>>>> Along the process of code checking, I found that there is one >>>>>>>>> attribute >>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current >>>>>>>>>>> implementation, but it was introduced long time back (2015) to >>>>>> allow >>>>>>>>>>> multiple schedulers to work together ( >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 >>>>>>>>>>> < >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 >>>>>>>>>> >>>>>>>>>>> ). >>>>>>>>>>>> >>>>>>>>>>>> Since you were the original author of it, it would be very >> helpful >>>>>>> if >>>>>>>>>>> you can kindly share why the multiple-schedulers implementation >> was >>>>>>>>> removed >>>>>>>>>>> eventually, and what challenges/complexity there were. >>>>>>>>>>>> (You already shared a few valuable inputs in the earlier >>>>>> discussion >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E >>>>>>>>>>> < >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E >>>>>>>>>> >>>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG >>>>>>>>> prioritisation & >>>>>>>>>>> load on DB. Other than these, anything else you would like to >>>>>>> advise?) >>>>>>>>>>>> >>>>>>>>>>>> I will also dive into the git history further to understand it >>>>>>> better. >>>>>>>>>>>> >>>>>>>>>>>> Thanks. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> XD >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >>>> >>> >>> -- >>> >>> Jarek Potiuk >>> Polidea <https://www.polidea.com/> | Principal Software Engineer >>> >>> M: +48 660 796 129 <+48660796129> >>> E: jarek.pot...@polidea.com >> >>