Yeah, essentially what `airflow scheduler` does is:
```
job = SchedulerJob()
job.run()
```
You could even put `if __name__ == "__main__": ...` at the end of your
custom scheduler and then do `python -m your.custom_scheduler`
On Tue, Sep 6 2022 at 12:15:52 +02:00:00, Jarek Potiuk
<[email protected]> wrote:
Why do you need to make it pluggable?
One could say that a better approach will be toi build your own
scheduler and run it instead of Airflow Scheduler and run it instead?
Basically we are talking about replacing the whole scheduling loop.
Splitting it into three separate steps makes it actually less
flexible than replacing the whole loop with - I think - questionable
benefits. But maybe I am mistaken :) ?
I am not saying it is better and that making it pluggale this way is
a bad idea, I just want to know what you want to achieve by making it
pluggable this way, compared to just writing your own scheduler from
scratch using existing Airflow classes ? In the model you proposed
with the "pluggable scheduler framework" - this is basically what
would happen anyway if you had to write your own scheduler but IMHO
you will be bound and limited to implementing those three methods in
specific way, while when writing your own scheduler you would be free
to do anything in any way you want.
Ree: Kubernetes - I think we should not look at what others did but
also why they did it. I think there is a difference between
Kubernetes in that it provides much more than just scheduling - it
provides resource management, networking, cluster management, and
scheduling is just part of it. Airflow is quite a different beast
altogether. It provides some tools on the "task level" - dependency
management, APIS that the tasks can use, the way how individual tasks
are executed etc. But on the "whole airflow" level it does not
provide any abstractions (resource management etc.) that will remain
if you remove the scheduler. If you remove the scheduler
implementation, there is not much left - there are executors, but
(providing that we clean their API a bit and make them truly
standalone) they could be used totally independently on the
"scheduling loop" implementation. There is no need to have the same
kind of loop with the three steps that the current scheduler has.
I think we have another layer of abstraction here: "jobs" that seems
more appropriate for what you propose:
* backfill_job.py
* triggerer_job.py
* scheduler_job.py
* local_task_job.py
* base_job.py
What I think you want to do is to replace the whole "SchedulerJob".
So why not yourself (or anyone else) implement their own
"non_locking_scheduler_job.py" and plug it into CLI?
I am really curious to see the reasoning why the "pluggable"
architecture would be better from yourself (or anyone else) writing a
new "scheduler" job and accompanying scheduler CLI from scratch?
J.
On Tue, Sep 6, 2022 at 5:53 AM Ping Zhang <[email protected]
<mailto:[email protected]>> wrote:
Hi Jarek, Tomasz and Vikram,
Thanks for your comments.
To be clear, we are not removing the scheduler but to make it
pluggable, just like different executors in the core and the
BackfillJob.
Pluggable scheduler can give the flexibility to the users to choose
or write a scheduler to meet their infra requirements while still
leveraging the rest of Airflow protocol e.g. dag authoring without
forking airflow. It does not make assumptions about how you should
use/run airflow clusters. It also avoids fitting all use cases into
one scheduler, leading to convoluted logic and being hard to test.
The pluggable scheduler framework
<https://gist.github.com/pingzh/6717ff99b4ca31d5b02161f7999a9dd8#file-airflowschedulerframework-py-L1-L41>
is a very lightweight class (less than 40 lines of code), which only
defines high level scheduler contracts. It does not change any
features in the current scheduler. It uses the dependency injection
pattern, which injects the scheduler_cls to the framework. It does
not assume any implementation. It only abstracts common interfaces
for the scheduler (similar to the BaseExecutor). On a very high
level, it breaks down the `_do_scheduling()
<https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L918-L977>`
into three interfaces:
make_scheduling_decisions, which corresponds to this code block
<https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L918-L943>.
send_queuable_tasks_to_executor, which corresponds to this code
block
<https://github.com/apache/airflow/blob/a2db8fcb7df1a266e82e17b937c9c1cf01a16a42/airflow/jobs/scheduler_job.py#L945-L973>.
process_system_events, which corresponds to executor events and
zombies.
Just like Kubernetes, it was originally designed for web
applications and microservices. However, it has been extended to the
offline data world. The Scheduling Framework
<https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/>
was proposed to make its scheduler more pluggable. Also, new
Kubernetes schedulers, like YuniKorn
<https://yunikorn.apache.org/docs/next/design/architecture/>,
volcano <https://volcano.sh/en/> were created for different use
cases and plugged to the Kubernetes ecosystem.
Please let me know your thoughts.
Ping
On Mon, Aug 29, 2022 at 3:44 PM Vikram Koka
<[email protected]> wrote:
Hi Ping,
Conceptually, I have a similar reaction to Jarek and Tomek above,
but I really want to understand the problem you have described with
(2) before I comment further.
Can you please elaborate on the problems:
Airflow 2.0 treats all DagRuns with the same scheduling priority
(see code
<https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L923>).
This means DAGs with more DagRuns could be scheduled more often
than others and large DAGs might slow down small DAGs scheduling.
This may not be desired in some cases.
Can you please share a use case for the above issue?
Thanks,
Vikram
On Sun, Aug 28, 2022 at 1:45 AM Tomasz Urbaszek
<[email protected] <mailto:[email protected]>> wrote:
Hey Ping,
I somehow agree with Jarek that there's not much detail on what
exactly would be the "pluggable" part.
Is "new" scheduler the only solution to those problems you have
mentioned? As said, maybe we should consider DagRuns
prioritisation? If growing tables is a perf problem then maybe you
should consider introducing some retention policy (it's done by
the others as far as I know)?
Personally I think we should focus on how to improve "out of the
box" Airflow instead of adding more plug-something components.
Cheers,
Tomek
On Sat, 27 Aug 2022 at 02:05, Jarek Potiuk <[email protected]
<mailto:[email protected]>> wrote:
Hey Ping,
I don't think there is nearly enough information in what you
described on what "pluggable" scheduler means. What I see in the
doc and your description is a problem statement, but not a
solution.
But please don't jump on trying to describe it just yet.
I am very skeptical about the idea in general. if you remove
scheduler. from Ariflow Core, there is not much left. Airflow
core is practically speaking only a scheduler when- it comes to
the internals. I almost think that if someone wants to make
"scheduler" pluggable - that calls for forking Airflow - and
forking airflow (if someone wants to do it and developing for
themselves will be far more effective than trying to get a
"pluggable" architecture. Also because this is at most a tactical
solution IMHO.
I believe a lot of this problem statement of yours is "past
looking" rather than "forward looking" and it does not include
the fact that not only Airflow changes but that the environment
around it changes. And by the time the result of any design and
development of any such pluggable solution might be even close to
completion, the environment will change already and we will be in
a different place - both in terms of what Airflow will be capable
of and what external environment will be.
MySQL 5.7 EOL is in a year - October 2023. And we will surely
drop it then. And anyone using it should. Then the single
scheduler only case will be gone (we will not support 5.7
anyway). I seriously doubt within a year we can develop "another"
scheduler and even if we do, if the only reason for it is not
supporting multiple schedulers, that would be the first thing to
drop in October 2023. And if we have a pluggable scheduler with 2
implementations by October 2023, I will be the first one to raise
"let's drop this non-locking scheduler as we don't need it any
more". If you look back - and imagine we are back in January 2019
and we would keep compatibility with Python 2.7. We would not be
where we are now. And we need to look into the modern, new
Airflow future rather than looking at some bad and discouraged
ways of using Airflow. Even more, we should encourage and help
the users that are using Airflow in those "non-future-proof" ways
to switch to use the new ways. And add features that make current
scheduler more appealing for the cases you mentioned
Also the 2.4 of Airflow brings Datasets and Data-driven
scheduling. And as surprising as it might look, it will generally
solve the "big DAG/small DAG" problem. Simply speaking, the DAGs
of Airflow will suddenly start becoming more modular and you will
be able to do the same you did with huge 1000 tasks dags with 50
20-tasks dags which will be connected via datasets. this will be
far better, more modular solution. And rather than complicating
Airflow by designing and implementing multiple schedulers, I
would rather focus on developing tooling that will make
distributed DAG development far more appealing for any users. And
those users (like AirBnB - with huge DAGs) should follow the
suite in changing their approach - this will give them far more
capabilities, will enable them to distribute DAG development and
manage it way better than having a huge, simple DAG
Maybe instead of adding pluggable schedulers, we should rather
(after 2.4) work on a tooling that will help users with huge DAGs
to split them. Maybe we should add a way to prioritise DagRuns ?
Both of those are much more forward-looking than trying to
"cement" existing (bad) usage patterns IMHO by making them
"blessed" by having a 2nd type of scheduler supporting those
cases that should be solved differently.
That's how I see it.
J.
On Tue, Aug 23, 2022 at 7:46 AM Ping Zhang <[email protected]
<mailto:[email protected]>> wrote:
Hi Airflow community,
We are proposing to have the Airflow Scheduler adopt a pluggable
pattern, similar to the executor.
Background:
Airflow 2.0 has introduced a new scheduler in AIP-15 (Scheduler
HA + performance improvement)
<https://airflow.apache.org/blog/airflow-two-point-oh-is-here/#massive-scheduler-performance-improvements>.
The new scheduler leverages the skip-locked feature in the
database to scale horizontally
<https://airflow.apache.org/docs/apache-airflow/stable/concepts/scheduler.html#overview>.
It works well for relatively small clusters (small number of
tasks in a dag and small number of dag files) as shown in the
benchmark results from the community:
Scenario (1000 tasks in total)
DAG shape
1.10.10 Total Task Lag
2.0 beta Total Task Lag
Speedup
100 DAG files, 1 DAG per file,
10 Tasks per DAG
Linear
200 seconds
11.6 seconds
17 times
10 DAG files, 1 DAG per file,
100 Tasks per DAG
Linear
144 seconds
14.3 seconds
10 times
10 DAG files, 10 DAGs per file,
10 Tasks per DAG
Binary Tree
200 seconds
12 seconds
16 times
From: <https://www.astronomer.io/blog/airflow-2-scheduler>
From the most recent 2022 Airflow survey
<https://docs.google.com/document/d/18E3gBbrPI6cHAKRkRIPfju9pOk4EJNd2M-1fRJO4glA/edit#heading=h.yhlzd4j2mpzz>,
81% of the Airflow users have between 1 to 250 DAGs in their
largest Airflow instance (4.8% of users have more than 1000
DAGs). 75% of the surveyed Airflow users have between 1 to 100
tasks per DAG. The Airflow 2.0 scheduler can satisfy these needs.
However, there are cases where the Airflow 2.0 scheduler cannot
be deployed due to:
The team cannot use more than one scheduler due to the
company’s database team not supporting mysql 8+ or postgresql
10+. (Arguably, it is true that they should be supported but in
reality, it can take quite a while for large companies to
upgrade to newer db versions)
Airflow 2.0 treats all DagRuns with the same scheduling priority
(see code
<https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L923>).
This means DAGs with more DagRuns could be scheduled more often
than others and large DAGs might slow down small DAGs
scheduling. This may not be desired in some cases.
For very large scale clusters (with more than 10 million rows in
the task instance table), the database tends to be the unstable
component. The infra team does not want to add extra load to the
database with more than one scheduler. However, with only one
Airflow 2.0 scheduler, it cannot support large scale clusters as
it has removed the multi-processing dag runs and only uses one
core to schedule all dag runs
<https://github.com/apache/airflow/blob/6b7a343b25b06ab592f19b7e70843dda2d7e0fdb/airflow/jobs/scheduler_job.py#L886-L976>.
The above limitations hinder evolving Airflow as a general
purpose scheduling platform.
To address the above limitations and avoid making the scheduler
core code larger and logic more complex, we propose to have a
pluggable scheduler pattern. With that, the Airflow infra
team/users can choose the best scheduler to satisfy their needs
and even swap parts that need customization to achieve their
best interest.
Please let me know your thoughts about this and look forward to
feedback.
(Here is the google doc link,
<https://docs.google.com/document/d/1njmX3D_9a4TjjG9CYPWJqdkb9EyXkeQPnycYaMTUQ_s/edit?usp=sharing>
feel free to comment it in the doc)
Thanks,
Ping