syun64 opened a new pull request, #33532:
URL: https://github.com/apache/airflow/pull/33532
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
closes #32816
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
### Summary
This PR introduces Dag SLAs as a part of **AIP-57 Refactor SLA Feature**.
After many discussions, the community has voted to introduce new SLA
features aimed at replacing the existing SLA feature which has caused a lot of
confusion and dissatisfaction in the user base over the years. The proposal
introduces SLAs that are measured and contained within the lifetime of a Dag
Run, and within the lifetime of a Task Instance which cost significantly less
for the Airflow Infrastructure to compute than the existing concept of SLAs.
The proposed SLA callback functions will also accept the TI context as the
argument, which is a parameter that is used by other callback types
(on_success_callback, on_failure_callback, on_execute_callback) to honor a
consistent interface.
### Dag Updates
#### Added
sla: timedelta | None = None
on_sla_miss_callback: None | DagStateChangeCallback |
list[DagStateChangeCallback] = None
#### Marked for Deprecation
sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] =
None
### Dag Run DB Updates
#### Added
sla_missed = Column(Boolean, default=False)
### DagCallbackRequest Updates (callback_data json payload)
#### Added
sla_miss: bool | None = False
dagrun_state: DagRunState
#### Removed
is_failure_callback: bool | None = True
### Dag SLA Measurement Semantics
SLA tracking logic is evaluated when the active Dag Run is checked for
updates using DagRun.update_state which is executed in the main scheduling
loop. Dag SLA is tracked for all run types except for DagRunType.BACKFILL_JOB
if **sla** parameter is provided in the Dag definition.
If **sla_missed** attribute of the Dag Run is already set to True, the
scheduler loop will skip the SLA check for that dag run. This condition ensures
that the SLA miss callback is generated only once when a scheduler loop has
detected that the dag run has just missed its SLA and has been made the
corresponding Dag Run record update in the DB.
SLA is measured against the time between the Dag Run start_date (time) and
the current time. If SLA is less than the time between the current time and the
Dag Run start_date, SLA has been missed for the active Dag Run. Using the
start_date (instead of the scheduled start time according to the time table)
allows us to keep a consistent measuring pattern across MANUAL and SCHEDULED
run types, and keep the SLA evaluation logic simple for the initial version of
this feature.
SLA callback is indicated by the sla_miss boolean flag on
DagCallbackRequest. The State of the DagRun corresponding to the
DagCallbackRequest is propagated through the dagrun_state parameter, which is
also used to deduce whether a on_success_callback or a on_failure_callback must
be executed along with the request. It is now possible for a DagCallbackRequest
to be issued without a finished DagRun State (SUCCESS or FAILED); it is upto
dagrun.handle_callback function to parse the parameters and execute the
appropriate callback related to the given DagRun State or sla_miss flag.
### Question:
Do we need to consider the impact of removing is_failure_callback from
DagCallbackRequest and replacing it with dagrun_state? If DbCallbackRequest is
enabled and unexecuted DagCallbackRequests are stored in the DB at the time of
DB upgrade, would it lead to some callbacks not being executed?
EDIT: I've added an alembic migration step that unpacks the json
callback_data and programmatically updates the json payload with the logical
mapping of is_failure_callback <-> dagrun_state for DagCallbackRequest
callback_types of DbCallbackRequest records. I wanted to get others' thoughts
on whether this suggestion improves the reliability of the migration or makes
it more risky.
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]