argibbs opened a new pull request, #25489:
URL: https://github.com/apache/airflow/pull/25489

   # The "I'm determined to fix SLAs real good, fix 'em real good" MR
   
   OK, so #25147 made a start in this direction. Summing up the, er, summary 
from _that_ MR, the problem was that SLA callbacks could keep occuring, and 
prevent the dag processor manager from ever processing more than 2 or 3 dags in 
the queue before the SAL callbacks re-upped and went to the front of the queue.
   
   Under the new behaviour, the SLA callbacks went to the _back_ of the queue. 
This guaranteed that the queue would be processed at least once. However, it 
turns out that dags on disk would only be re-added to the queue _once the queue 
was empty_. But with SLA callbacks arriving all. the. time. the queue would 
never drain, and we'd never re-read dags from disk. So if you updated the dag 
file, you'd have to bounce the scheduler to pick up the change, and then it 
would process all non-SLA-generating DAGs exactly once. And then you'd need to 
bounce again.
   
   # I've almost certainily missed some steps.
   
   Before I go into a bit more detail about the change, I'd like to acknowledge 
that as a (very) small time contributor to the project, I'm not familiar with 
all the done things when making more radical changes. In particular, I assume 
there's more doc changes needed than just a newsfragment. I've added a config 
flag, some metrics, _and_ the behaviour of the queue processing has subtly 
changed (for the better I hope!)
   
   I'd very much appreciate someone(s) leaving a comment :point_down: telling 
me what else I need to do.
   
   # TL;DR
   
   I mean, I see your point. Skip to the bottom for the summary.
   
   # Pay attention, here comes the science bit
   
   Ok, so to briefly recap the queue behaviour prior to this change:
   1. The scheduler spins up.
   2. The worker loop starts.
   3. It scans the disk to create a list of all known files. _It does not add 
these files to the queue_.
   4. If the queue is empty (on start up it will be) then all known files are 
added to the queue.
   5. It then spins up dag processors (via multiprocessing or in-line, 
depending on set up) to consume the first N entries in the queue (where N is a 
function of config settings, and includes a count of any files currently being 
processed from a previous pass of the loop)
   6. It then goes back to step 2.
   
   - Note that Step 3 is re-run periodically based on a time interval set in 
config (`dag_dir_list_interval`). It does not require that the queue be empty. 
Any changes to the set of files on disk e.g. dags being deleted will cause the 
manager to remove dags from the queue.
   - After a few iterations of the loop, the contents of the queue are 
processed, and step 4 kicks in, and the dag files present on disk are re-added 
to the queue again.
       - The default config is to do this no more than once every 30 seconds 
(`min_file_process_interval`), so if your dag files only take 10 seconds to 
process, there will be 20 seconds of idle time, but if your dag files take a 
minute to process, then the manager will be permanently busy, because as soon 
as the queue drains, it'll be well past time to reload the queue from disk.
       - This only works if the queue _actually empties_ - as foreshadowed 
above, if the queue never empties (because SLAs) then step 4 will never kick 
in, and we'll never reload the queue. Step 3 will ensure that within a short 
interval any deleted dags are removed from the queue (should they happen to be 
one of the ones generating SLAs) but updated and newly added dag files will not 
be picked up.
       - To be clear, this is the problem I alluded to in my comments at the 
end of the previous MR after it got merged.
   
   Locally, I tested a hacky fix whereby on receipt of a SLA callback I still 
add the callback, but I simply _didn't add the dag to the queue_. This works, 
but means that SLA callbacks are only processed when the queue drains, and is 
reprocessed (because then all dags are added to the queue, guaranteeing that we 
will process any outstanding SLA callbacks). However, If someone has specified 
a large wait time between loading dags from disk, this will affect how timely 
the SLA alerts are, which is fine for me, because I don't do that, but I wasn't 
getting a "this will be fine for everyone" vibe from the change (I did say it 
was hacky!).
   
   Also, there's another catch. While the problem is much more prevalent with 
SLAs, these are not the only callbacks. I could envisage a situation where 
someone configures a dag with a very small interval (e.g. a dag run every 10 
seconds). While I think this is a much more theoretical problem that might not 
ever exist in the wild - this isn't really the use case Airflow is intended for 
- the upshot would be that a dag generating lots of dag callbacks would be 
spamming the queue. And those callbacks still go to the _front_ of the queue, 
i.e. you're back to the situation I tried to solve in the previous MR!
   
   # I don't think that word means what you think it means
   
   I decided that fundamentally, part of the the problem was that the queue 
should be FIFO. And it wasn't. But if I made it FIFO, then the higher priority 
DAG callbacks would have to wait their turn behind the dag files loaded from 
disk, and I'm pretty sure stopping that would eliminate some of the speed-ups 
Airflow 2 was trumpeted as solving. Airflow 1 used to have on average a 15 
second gap (= 30/2) between one task completing and the downstream tasks being 
scheduled, because once the task completed, you had to wait for the manager to 
drain the queue, add the files to the queue from disk, and then process the 
dag. (And that's assuming you could even process all your dags in <30 
seconds...)
   
   I didn't want to be the guy who accidentally breaks _that_ particular speed 
up. :scream: 
   
   So I did two things:
   
   ## Thing 1: Tackling the FIFO issue aka gazumping callbacks.
   1. I split the existing queue into two: a priority queue, and a standard 
queue. the priority queue is drained before the standard queue.
   2. You can probably guess what goes where, but: the dag callbacks go into 
the priority queue, while the SLAs callbacks and dags found on disk are added 
to the standard queue.
   3. **Both queues are FIFO.** This means that rapidly updating dags can't 
gazump other slightly-less-rapidly updating dags, because they go to the back. 
Of course, most of the time, there's only one dag with callbacks in the 
priority queue, and there's no material change in behaviour. 
   4. Hypothetically, the priority queue could be permanently busy. If this 
happens, it'll be like it was with the SLAs; the standard queue would never get 
a look-in, and we'd stop processing new/updated dags.
   5. So **I also added a new config param**, This is called 
`max_file_process_interval` and it's the dual to the existing 
`min_file_process_interval`. It guarantees that if you do happen to have a 
permanently busy priority queue, eventually we'll take a breather, and process 
the files on disk anyway.
   6. By default, this flag is set to 120 seconds. Setting it to zero disables 
it, i.e. you can have a permanently busy priority queue.
   7. I am acutely aware / wary of the fact that this 
separation-of-the-queues-plus-config-param is purely solving a problem that is 
(for me at least) purely a theoretically-possible issue. I've never seen it, 
but I don't think it's premature optimisation to tackle it in this change.
   
   ## Thing 2: Handling the fact that SLAs stop the standard queue from ever 
being empty.
   1. As noted ... somewhere ... above, a hacky fix was to simply stop adding 
dags to the queue on SLA callback. But this wasn't perfect for people who don't 
happen to use airflow the way I do.
   2. So, the first half of Thing 2 was to make the standard queue FIFO. This 
is effectively upholding the change from the previous MR; SLA callbacks simply 
get treated the same as dags loaded from disk, and have to wait their turn.
   3. The next bit was to then add a `set` which tracked which dag files in the 
queue were still outstanding from the last refresh from disk. Once the queue 
was refreshed from disk, we'd work through every file eventually (because 
FIFO), and at that point the set would be empty, even if the queue wasn't 
(because SLAs).
   4. Once the set was empty, we'd refresh the queue with dags found on disk.
   5. Some of those dags from disk _might already be in the queue due to SLAs_ 
but we don't care; this is a good thing as it means loading dags from disk 
doesn't push existing SLAs to the back of the line. 
   
   # Notes:
   
   This doesn't materially change _how_ SLAs work; they are generated and 
consumed the same as before. It just means that we reliably consume the alerts 
once generated without breaking the rest of the system. In my experience at 
least, adding SLAs would simply cause the system to stop processing dag updates.
   
   In particular, I don't address issues with SLA timestamps (as raised by 
#22532), nor do I deal with other problems (e.g. now SLAs fire reliably, I have 
noticed that they fire during catch-up, and that the same alert can fire 
multiple times).
   
   This is not because I think the current approach is correct (I Have 
Opinions) but rather it is a sad-but-true fact that I don't have the time to 
take on a big project (hence I will keep My Opinions to myself), so if they're 
minor enough to live with, I'm just going to live with them.
   
   # Summary: I'm from the UK; politely waiting in line is what we do best.
   
   - Separation of manager's `_file_path_queue` to `_std_file_path_queue` (for 
SLAs and dags loaded from disk) and `priority_file_path_queue` (for DAG call 
backs)
   - New `max_file_process_interval` config to ensure files are read from disk 
every so often, even if the priority queue is always busy
   - Dags from disk will be added to std queue even if the std queue is still 
busy with SLAs.
   - Some extra stats/metrics to track queue depths / progress
   - Tests updated to reflect changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to