stevenschaerer commented on issue #37091:
URL: https://github.com/apache/airflow/issues/37091#issuecomment-1942735796
What do you think about adding a new task instance dependency that sets the
state to `SKIPPED` if any of the mapped depedencies are `SKIPPED` but not
`FAILED`?
```
class MappedTaskUpstreamDep(BaseTIDep):
"""
Determines if a mapped task's upstream tasks that provide XComs used by
this task for task mapping are in
a state that allows a given task instance to run.
"""
NAME = "Mapped dependencies have succeeded"
IGNORABLE = True
IS_TASK_DEP = True
def _get_dep_statuses(self, ti, session, dep_context):
from airflow.models.mappedoperator import MappedOperator
if isinstance(ti.task, MappedOperator):
mapped_dependencies = ti.task.iter_mapped_dependencies()
elif (task_group := ti.task.get_closest_mapped_task_group()) is not
None:
mapped_dependencies = task_group.iter_mapped_dependencies()
else:
return
mapped_dependency_tis = [
ti.get_dagrun(session).get_task_instance(operator.task_id,
session=session)
for operator in mapped_dependencies
]
if not mapped_dependency_tis:
yield self._passing_status(reason="There are no mapped
dependencies!")
return
finished_tis = [
ti
for ti in mapped_dependency_tis
if ti is not None and ti.state in State.finished
]
if len(mapped_dependency_tis) != len(finished_tis):
return
finished_states = {finished_ti.state for finished_ti in finished_tis}
if finished_states == {TaskInstanceState.SUCCESS}:
yield self._passing_status(reason="The task's mapped
dependencies have all succeeded!")
return
if finished_states.issubset({TaskInstanceState.SKIPPED,
TaskInstanceState.SUCCESS}):
new_state = TaskInstanceState.SKIPPED
else:
new_state = TaskInstanceState.UPSTREAM_FAILED
if ti.set_state(new_state, session):
dep_context.have_changed_ti_states = True
yield self._failing_status(reason="At least one of task's mapped
dependencies has not succeeded!")
```
No need for a detailed review, I'd just like to know if this is a valid
approach before writing more tests and creating a PR.
Alternatively, something like this could be added to `expand_mapped_task` in
`AbstractOperator` which is where the reported `UPSTREAM_FAILED` is coming from
right now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]