uranusjr commented on a change in pull request #21614:
URL: https://github.com/apache/airflow/pull/21614#discussion_r810682765
##########
File path: airflow/models/dagrun.py
##########
@@ -649,27 +651,46 @@ def task_instance_scheduling_decisions(self, session:
Session = NEW_SESSION) ->
def _get_ready_tis(
self,
- scheduleable_tasks: List[TI],
+ schedulable_tis: List[TI],
finished_tis: List[TI],
session: Session,
) -> Tuple[List[TI], bool]:
old_states = {}
ready_tis: List[TI] = []
changed_tis = False
- if not scheduleable_tasks:
+ if not schedulable_tis:
return ready_tis, changed_tis
+ # If we expand TIs, we need a new list so that we iterate over them
too. (We can't alter
+ # `schedulable_tis` in place and have the `for` loop pick them up
+ expanded_tis: List[TI] = []
+
# Check dependencies
- for st in scheduleable_tasks:
- old_state = st.state
- if st.are_dependencies_met(
+ for schedulable in itertools.chain(schedulable_tis, expanded_tis):
+
+ # Expansion of last resort! This is ideally handled in the
mini-scheduler in LocalTaskJob, but if
+ # for any reason it wasn't, we need to expand it now
+ if schedulable.map_index < 0 and schedulable.task.is_mapped:
Review comment:
```suggestion
if schedulable.task.is_mapped and schedulable.map_index < 0:
```
Perhaps a very minor opimisation? Most of the tasks should be non-mapped and
all indexed -1, so this saves some checks.
##########
File path: airflow/models/dagrun.py
##########
@@ -649,27 +651,46 @@ def task_instance_scheduling_decisions(self, session:
Session = NEW_SESSION) ->
def _get_ready_tis(
self,
- scheduleable_tasks: List[TI],
+ schedulable_tis: List[TI],
finished_tis: List[TI],
session: Session,
) -> Tuple[List[TI], bool]:
old_states = {}
ready_tis: List[TI] = []
changed_tis = False
- if not scheduleable_tasks:
+ if not schedulable_tis:
return ready_tis, changed_tis
+ # If we expand TIs, we need a new list so that we iterate over them
too. (We can't alter
+ # `schedulable_tis` in place and have the `for` loop pick them up
+ expanded_tis: List[TI] = []
+
# Check dependencies
- for st in scheduleable_tasks:
- old_state = st.state
- if st.are_dependencies_met(
+ for schedulable in itertools.chain(schedulable_tis, expanded_tis):
+
+ # Expansion of last resort! This is ideally handled in the
mini-scheduler in LocalTaskJob, but if
+ # for any reason it wasn't, we need to expand it now
+ if schedulable.map_index < 0 and schedulable.task.is_mapped:
+ # HACK. This needs a better way, one that copes with multiple
upstreams!
+ for ti in finished_tis:
+ if schedulable.task_id in ti.task.downstream_task_ids:
+ upstream = ti
+
+ assert isinstance(schedulable.task, MappedOperator)
+ new_tis =
schedulable.task.expand_mapped_task(upstream, session=session)
+ assert new_tis[0] is schedulable
+ expanded_tis.extend(new_tis[1:])
Review comment:
`expand_mapped_task` has a block for when the unmapped task does not
exist. Do we need to account for that case here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]