stevenschaerer commented on issue #37091:
URL: https://github.com/apache/airflow/issues/37091#issuecomment-1942735796

   What do you think about adding a new task instance dependency that sets the 
state to `SKIPPED` if any of the mapped depedencies are `SKIPPED` but not 
`FAILED`?
   
   ```
   class MappedTaskUpstreamDep(BaseTIDep):
       """
       Determines if a mapped task's upstream tasks that provide XComs used by 
this task for task mapping are in
       a state that allows a given task instance to run.
       """
   
       NAME = "Mapped dependencies have succeeded"
       IGNORABLE = True
       IS_TASK_DEP = True
   
       def _get_dep_statuses(self, ti, session, dep_context):
           from airflow.models.mappedoperator import MappedOperator
           if isinstance(ti.task, MappedOperator):
               mapped_dependencies = ti.task.iter_mapped_dependencies()
           elif (task_group := ti.task.get_closest_mapped_task_group()) is not 
None:
               mapped_dependencies = task_group.iter_mapped_dependencies()
           else:
               return
   
           mapped_dependency_tis = [
               ti.get_dagrun(session).get_task_instance(operator.task_id, 
session=session)
               for operator in mapped_dependencies
           ]
           if not mapped_dependency_tis:
               yield self._passing_status(reason="There are no mapped 
dependencies!")
               return
           finished_tis = [
               ti
               for ti in mapped_dependency_tis
               if ti is not None and ti.state in State.finished
           ]
           if len(mapped_dependency_tis) != len(finished_tis):
               return
   
           finished_states = {finished_ti.state for finished_ti in finished_tis}
           if finished_states == {TaskInstanceState.SUCCESS}:
               yield self._passing_status(reason="The task's mapped 
dependencies have all succeeded!")
               return
   
           if finished_states.issubset({TaskInstanceState.SKIPPED, 
TaskInstanceState.SUCCESS}):
               new_state = TaskInstanceState.SKIPPED
           else:
               new_state = TaskInstanceState.UPSTREAM_FAILED
           if ti.set_state(new_state, session):
               dep_context.have_changed_ti_states = True
           yield self._failing_status(reason="At least one of task's mapped 
dependencies has not succeeded!")
   ```
   No need for a detailed review, I'd just like to know if this is a valid 
approach before writing more tests and creating a PR.
   
   Alternatively, something like this could be added to `expand_mapped_task` in 
`AbstractOperator` which is where the reported `UPSTREAM_FAILED` is coming from 
right now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to