No problem! I very much appreciate your questions and critical thought process as well. It's been pretty difficult for me to fully understand how the SLA feature worked, given how overloaded and complicated the logic is in its current state. So it really helps to have another invested party thinking through the solution.
Yeah, I think that's a great point. If we are changing the definition of 'SLA' or 'soft_alert_timeout' to mean: checking if a running dag, or its running tasks have just passed the expected completion time, maybe we could put it into the scheduler. I think the complication of performance concerns only arises when we are trying to ensure that SLA Misses for tasks have been evaluated for all the tasks - instead of checking just the running tasks. Here are some running thoughts that I think are worth thinking through before we come to that conclusion: 1. The evaluation of SLA of tasks that have 'yet to be scheduled' is not too straightforward. Yes, we've just made a distinction between missing an SLA due to DAG scheduling delays vs a promptly running one with the reasoning that if there are scheduling delays in the cluster, and that we should be paying attention to a different higher level alarm. I think the problem of evaluating SLAs for tasks that are blocked by an upstream task is generally a different one - here we don't have any issues with the cluster's capacity itself. Instead, there is a specific task we want to measure against an 'expected time of completion' for, that has not been launched yet because its upstream task has yet to be completed. My impression is, if a DAG has been launched - we should evaluate the 'expected time of completion' of all the tasks within. If you disagree, I'm happy to continue this discussion to hash out the scope of the responsibility of this feature. For now, assuming that we see these blocked tasks as being worth tracking the 'expected time of completion' for, there are interesting ways to address this issue, and here are two examples. - Now / in the DagFileProcessingProcess: https://github.com/apache/airflow/blob/main/airflow/dag_processing/processor.py#L439 - the existing implementation does a group by to get the maximum execution of a task instance that has ever succeeded or skipped in the specific Dag ID, and 'extends' them onto the next run of the Dag. This way, we aren't able to evaluate the SLA for the first run of a given task, but we avoid not being able to evaluate the SLA for tasks that have been blocked by an upstream task. In my opinion, even though this query has been tweaked over multiple times, and behaves reasonably well, it slows the Dag Parsing process when the number of dags and tasks scales out to hundreds or thousands. Another big downside of this logic in my humble opinion, is just that it's so difficult to understand and maintain. I had to read this code over a series of times to understand what exactly we were exactly doing with these series of SQL operations. - If Task SLAs were evaluated in the scheduler at update_state, similar to how it is done in this proposal: https://github.com/apache/airflow/pull/8545/files#diff-62c8e300ee91e0d59f81e0ea5d30834f04db71ae74f2e155a10b51056b00b59bR1772, we would need to infer 'unscheduled_tis' that are more downstream than the list of TIs that are currently "RUNNING". Maybe this isn't so bad? 2. The callback should not be called within the scheduler: I believe that there is a reason why DagCallbackRequests are sent to the DagFileProcessingProcess for it to be executed in a separate process. My general understanding is, the DAG Data Model that the Scheduler has access to, only has a subset of DAG attributes available to it. And same goes for the Task Instance. If we want to execute the callback, we need to issue the callback request to an external process for it to launch. Let's say that we want to send a callback request to the DagFileProcessingProcess once an SLA miss is detected, and have the DAG execute the task level SLA callback. The issue with this is that the task_instance context that the DAG has access to, is actually not the context of the task itself, but just the first task instance that is arbitrarily selected to represent the DAG: https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L1408. Hence, a Task level SLA callback cannot accurately be called in a DagFileProcessorProcess. I believe this is the reason why folks over the years have fitted so many additional arguments into the sla_miss_callback that pulls it further away from having a consistent function signature with other DAG and Task level callbacks. On Wed, Sep 13, 2023 at 1:51 PM Daniel Standish <daniel.stand...@astronomer.io.invalid> wrote: > OK so one difference here is, you're adding a new DAG SLA concept. Which > is useful. One subtle difference from what I think is the existing > "concept" of SLA is that you are evaluating it against when it started, as > opposed to when it should have started, and evaluating it only in the > course of running. > > Let's suppose for a moment that everyone is on board with this and thinks > it's a necessary tradeoff. > > Well now let's look at individual task instance SLAs. With that change in > the concept of what an SLA is, do we still *need* to move to "soft timeout" > for individual tasks? I think maybe no. Because, why could we not, at the > same time as we evaluate the dag run SLA, also evaluate each task's SLA, > and evaluate it against the same "start time" that the overall DAG SLA is > evaluating against? This would seem to be more *like* the existing SLA > concept for individual tasks, the difference being it requires the dag to > be running (which is already a requirement of your new task SLA concept). > The other difference, again, is the start time vs should-have-started time > distinction. But this would also seem to remove the "doesn't work for > deferrables" problem. > -- Sung Yun Cornell Tech '20 Master of Engineering in Computer Science