Also - not sure if you are subscribed to the devlist - so I will add your
direct address here so that you can for sure see the answer (and if you are
not subscribed, then by all means - do subscribe).

On Sun, Mar 10, 2024 at 12:01 PM Jarek Potiuk <ja...@potiuk.com> wrote:

> I think before writing AIP in confluence, I would encourage you to try to
> describe your idea in a shared google docs document and explain it. But
> before you do that - I'd encourage you to take a close look and deep dive
> into implementation of priorities. It might be different than you think, it
> has priority weight algorithms that allow for inclusion of
> downstream/upstream task priorities, also since the way how airflow
> serializes the tasks, they are re-reread and refreshed by Airflow every 30
> seconds by default, so whatever priority_weights you set in DAGs will
> override the priorities that you **might** want to set via external API.
>
> Note that even today - tasks do not have "priorities" per se. They have
> "priority_weight" and "weight_rule"  - that is used to automatically
> determine what's the actual priority of the task based on those rules. So
> there is not a single "priority" you can override, there is a set of
> database queries to calculate those when tasks are eligible for execution,
> and you cannot simply "set" priority for the task this way.
>
> But there is more fundamental problem with the proposal - this proposal
> seems to validate a basic principle that we have in Airflow - that tasks
> and their behaviour is entirely defined by DAG authors who have access to
> the DAG folder and can change the task definition. See
> https://airflow.apache.org/docs/apache-airflow/stable/security/security_model.html
> - UI users (so also API users) - by definition cannot CHANGE DAG and task
> definitions. This is by design. They can run/rerun/clear tasks defined by
> DAG Authors - and it's the DAG authors that have ultimate influence on the
> definition of tasks. If you look very closely at the API, you will find
> that there is not a single API there that allows you to modify existing
> task definitions. Not a single one.
>
> The changes ALWAYS  come from the DAG folder. No exception
>
> So what you are proposing here is way more than just changing "a priority"
> of the task - you are proposing change in a fundamental assumption that
> Airflow takes - that authors of DAG are the only ones who can change it.
> Now - someone else will be able to change the task definition. Someone who
> is not a DAG author. And can change it independently from task definition.
>
> And it has far reaching consequences. For example we are just discussing a
> whole series of changes about dag Versioning:
>
> *
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-63%3A+DAG+Versioning
> *
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-64%3A+Keep+TaskInstance+try+history
> *
> https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-66%3A+Execution+of+specific+DAG+versions
>
> All those series of changes build on the assumption that task definition
> comes via changes in the DAG folder. They will not handle case where task
> definition can also be changed - independently - via other mechanisms, such
> an API call. Breaking the assumption will make the whole versioning way
> harder, or maybe even impossible.
>
> So I am not sure that the way you described your proposal is correct and
> implementation of it should not do what you propose. Maybe you should
> consider a different approach if you would like to change priorities of
> tasks (note that priorities of tasks are used when executor decides which
> of the tasks eligible for execution should be turned from queued , to
> running and should be given an execution slot).
>
> I think you have two ways, if you want to proceed with your idea:
>
> 1) Implement it by using the fact that Airflow DAGs are Python code. If
> you **really** want to permanently change priorities of tasks, you could
> simply write your DAGs in a specific way to use some variables (for example
> coming from local json file) as priorities and read it from there - and
> then, rather than making an API call to airflow webserver, you could change
> the priorities directly by changing priorities stored in those JSON files
> in the DAG folders. You could also directly modify priorities in the Python
> code as well - that's a bit more complex, but should also be possible. This
> is simple. Does not require to implement new features in Airflow, does not
> interfere with Airflow's security model and basic assumptions we have for
> DAG definition, does not have long-term effect on things like DAG
> versioning.
>
> 2) Maybe what you are after is to add a completely different mechanism to
> decide on priorities - currently this mechanism uses priority_weights
> stored by task and priority weight rules defined in DAG/task definition and
> uses it to calculate the actual priority used when the executor decides
> which tasks should be picked for execution.  This would be a completely new
> feature that would have to be carefully designed and implemented - also
> including the fact that we are just in the middle of implementing
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-61+Hybrid+Execution.
> - hybrid executors and also we are discussing multi-tenancy proposal that
> builds on the way how hybrid executors will be working
> https://cwiki.apache.org/confluence/display/AIRFLOW/%5BDRAFT%5D+AIP-67+Multi-tenant+deployment+of+Airflow+components
> - especially in the context of possibly starving tenants by other tenants.
> But this is a far more complex task than "set priority for task".
>
> 3) Probably what you are proposing might actually be solved in a different
> way. We have an open proposal from Hussein-Awala to make priority weight
> calculation flexible - to the point that you could provide your own
> priority weight calculation rules configured via plugins in Airflow. That
> seems like a much better way to think about customising priorities, because
> the rule can be provided at the task definition and the rule could also add
> a way to provide an "override" for the priority. The big difference here vs
> API and setting priority per task from outside, is that decision can be
> made not when the API call is made but when priorities are calculated in
> the airflow scheduler. That seems to be a much better place to decide about
> priorities and does not involve changing task definition.
>
> I'd encourage you to take a close look at what I wrote and deeper dive
> into the way priority calculation works.
>
>  Also what might be very useful for you is to take a look at other
> discussions and PRs we are doing around priority weights  at this moment:
>
> * The Proposal from Hussein-Awala I mentioned, to make priority
> calculation flexible - https://github.com/apache/airflow/pull/36029
> * PR from Andrey about priority weight validation:
> https://github.com/apache/airflow/pull/37990
>
> I do not want to discourage you from your quest, but I think that while
> direction you want to go is a noble one, doing it in the way you proposed
> likely does not take into account some of the basic assumptions and how it
> impacts on-going work and AIPs being worked on in Airflow.
>
> J.
>
>
>
> On Tue, Feb 27, 2024 at 4:51 PM Alvaro Serrano <alvaroserper2...@gmail.com>
> wrote:
>
>> Hi,
>>
>> My name is Alvaro and I am a junior developer who is writing his FMP
>> (Final
>> master project) about a project which uses Apache Airflow. I am interested
>> in developing a new feature for Airflow which I think many people could
>> use.
>>
>> MOTIVATION
>> I really think this is a very useful feature to add. There are times when
>> you want to make a dag execution and you expect that execution to overtake
>> the others already running, because it has priority over them. Now, this
>> is
>> not possible, because the tasks priority are static and only added when
>> creating a dag, not when doing a dag run.
>>
>> CONSIDERATIONS
>> I propose to add a parameter to the dag run API and Client endpoint named
>> as 'priority'. This parameter will overwrite the tasks priority of the new
>> dag run. Then, the scheduler will do the work as it does now.
>> This will remove the need of creating several dags which do the same but
>> with different priority parameters.
>> Users are not affected by this change, because it is a change that adds an
>> optional parameter that can be used or not.
>>
>> I will extend the AIP proposal when I get access to the Confluence AIP
>> Draft.
>>
>> Thanks in advance.
>>
>

Reply via email to