dstandish commented on a change in pull request #22184:
URL: https://github.com/apache/airflow/pull/22184#discussion_r825333926
##########
File path: airflow/dag_processing/processor.py
##########
@@ -412,20 +411,19 @@ def manage_slas(self, dag: DAG, session: Session = None)
-> None:
else:
while next_info.logical_date < ts:
next_info = dag.next_dagrun_info(next_info.data_interval,
restricted=False)
-
if next_info is None:
break
- if (ti.dag_id, ti.task_id, next_info.logical_date) in
recorded_slas_query:
+ if (ti.dag_id, ti.task_id, ti.run_id, ti.map_index) in
recorded_sla_misses:
Review comment:
hmm... but this also presents a problem in the context of mapping.
consider this logic:
```python
if (ti.dag_id, ti.task_id, next_run_id, ti.map_index) in
recorded_sla_misses:
break
if next_info.logical_date + task.sla < ts:
sla_misses.append(
SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
run_id=next_run_id,
map_index=ti.map_index,
timestamp=ts,
)
)
```
In the context of mapping, it's not really reasonable to expect that a TI
with this particular map index should exist in the next run -- that's kindof
the point of mapping, you don't know what tasks will need to run from one run
to the next.
It feels like we need to keep SlaMiss at the grain of dag / run / task, and
that mapped tasks should really be thought of as more like a subtask in this
context. Indeed maybe we should consider making that more formal (i.e. mapped
tasks as subtasks). WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]