I think before writing AIP in confluence, I would encourage you to try to
describe your idea in a shared google docs document and explain it. But
before you do that - I'd encourage you to take a close look and deep dive
into implementation of priorities. It might be different than you think, it
has priority weight algorithms that allow for inclusion of
downstream/upstream task priorities, also since the way how airflow
serializes the tasks, they are re-reread and refreshed by Airflow every 30
seconds by default, so whatever priority_weights you set in DAGs will
override the priorities that you **might** want to set via external API.

Note that even today - tasks do not have "priorities" per se. They have
"priority_weight" and "weight_rule"  - that is used to automatically
determine what's the actual priority of the task based on those rules. So
there is not a single "priority" you can override, there is a set of
database queries to calculate those when tasks are eligible for execution,
and you cannot simply "set" priority for the task this way.

But there is more fundamental problem with the proposal - this proposal
seems to validate a basic principle that we have in Airflow - that tasks
and their behaviour is entirely defined by DAG authors who have access to
the DAG folder and can change the task definition. See
https://airflow.apache.org/docs/apache-airflow/stable/security/security_model.html
- UI users (so also API users) - by definition cannot CHANGE DAG and task
definitions. This is by design. They can run/rerun/clear tasks defined by
DAG Authors - and it's the DAG authors that have ultimate influence on the
definition of tasks. If you look very closely at the API, you will find
that there is not a single API there that allows you to modify existing
task definitions. Not a single one.

The changes ALWAYS  come from the DAG folder. No exception

So what you are proposing here is way more than just changing "a priority"
of the task - you are proposing change in a fundamental assumption that
Airflow takes - that authors of DAG are the only ones who can change it.
Now - someone else will be able to change the task definition. Someone who
is not a DAG author. And can change it independently from task definition.

And it has far reaching consequences. For example we are just discussing a
whole series of changes about dag Versioning:

*
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-63%3A+DAG+Versioning
*
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-64%3A+Keep+TaskInstance+try+history
*
https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-66%3A+Execution+of+specific+DAG+versions

All those series of changes build on the assumption that task definition
comes via changes in the DAG folder. They will not handle case where task
definition can also be changed - independently - via other mechanisms, such
an API call. Breaking the assumption will make the whole versioning way
harder, or maybe even impossible.

So I am not sure that the way you described your proposal is correct and
implementation of it should not do what you propose. Maybe you should
consider a different approach if you would like to change priorities of
tasks (note that priorities of tasks are used when executor decides which
of the tasks eligible for execution should be turned from queued , to
running and should be given an execution slot).

I think you have two ways, if you want to proceed with your idea:

1) Implement it by using the fact that Airflow DAGs are Python code. If you
**really** want to permanently change priorities of tasks, you could simply
write your DAGs in a specific way to use some variables (for example coming
from local json file) as priorities and read it from there - and then,
rather than making an API call to airflow webserver, you could change the
priorities directly by changing priorities stored in those JSON files in
the DAG folders. You could also directly modify priorities in the Python
code as well - that's a bit more complex, but should also be possible. This
is simple. Does not require to implement new features in Airflow, does not
interfere with Airflow's security model and basic assumptions we have for
DAG definition, does not have long-term effect on things like DAG
versioning.

2) Maybe what you are after is to add a completely different mechanism to
decide on priorities - currently this mechanism uses priority_weights
stored by task and priority weight rules defined in DAG/task definition and
uses it to calculate the actual priority used when the executor decides
which tasks should be picked for execution.  This would be a completely new
feature that would have to be carefully designed and implemented - also
including the fact that we are just in the middle of implementing
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-61+Hybrid+Execution.
- hybrid executors and also we are discussing multi-tenancy proposal that
builds on the way how hybrid executors will be working
https://cwiki.apache.org/confluence/display/AIRFLOW/%5BDRAFT%5D+AIP-67+Multi-tenant+deployment+of+Airflow+components
- especially in the context of possibly starving tenants by other tenants.
But this is a far more complex task than "set priority for task".

3) Probably what you are proposing might actually be solved in a different
way. We have an open proposal from Hussein-Awala to make priority weight
calculation flexible - to the point that you could provide your own
priority weight calculation rules configured via plugins in Airflow. That
seems like a much better way to think about customising priorities, because
the rule can be provided at the task definition and the rule could also add
a way to provide an "override" for the priority. The big difference here vs
API and setting priority per task from outside, is that decision can be
made not when the API call is made but when priorities are calculated in
the airflow scheduler. That seems to be a much better place to decide about
priorities and does not involve changing task definition.

I'd encourage you to take a close look at what I wrote and deeper dive into
the way priority calculation works.

 Also what might be very useful for you is to take a look at other
discussions and PRs we are doing around priority weights  at this moment:

* The Proposal from Hussein-Awala I mentioned, to make priority calculation
flexible - https://github.com/apache/airflow/pull/36029
* PR from Andrey about priority weight validation:
https://github.com/apache/airflow/pull/37990

I do not want to discourage you from your quest, but I think that while
direction you want to go is a noble one, doing it in the way you proposed
likely does not take into account some of the basic assumptions and how it
impacts on-going work and AIPs being worked on in Airflow.

J.



On Tue, Feb 27, 2024 at 4:51 PM Alvaro Serrano <alvaroserper2...@gmail.com>
wrote:

> Hi,
>
> My name is Alvaro and I am a junior developer who is writing his FMP (Final
> master project) about a project which uses Apache Airflow. I am interested
> in developing a new feature for Airflow which I think many people could
> use.
>
> MOTIVATION
> I really think this is a very useful feature to add. There are times when
> you want to make a dag execution and you expect that execution to overtake
> the others already running, because it has priority over them. Now, this is
> not possible, because the tasks priority are static and only added when
> creating a dag, not when doing a dag run.
>
> CONSIDERATIONS
> I propose to add a parameter to the dag run API and Client endpoint named
> as 'priority'. This parameter will overwrite the tasks priority of the new
> dag run. Then, the scheduler will do the work as it does now.
> This will remove the need of creating several dags which do the same but
> with different priority parameters.
> Users are not affected by this change, because it is a change that adds an
> optional parameter that can be used or not.
>
> I will extend the AIP proposal when I get access to the Confluence AIP
> Draft.
>
> Thanks in advance.
>

Reply via email to