ashb commented on a change in pull request #21019:
URL: https://github.com/apache/airflow/pull/21019#discussion_r791130585
##########
File path: tests/models/test_dagrun.py
##########
@@ -874,3 +875,20 @@ def test_verify_integrity_task_start_date(Stats_incr,
session, run_type, expecte
assert len(tis) == expected_tis
Stats_incr.assert_called_with('task_instance_created-DummyOperator',
expected_tis)
+
+
[email protected](reason="TODO: Expand mapped literals at verify_integrity
time!")
Review comment:
It won't _only_ be in the mini scheduler run, there will still be a
"expansion of last resort" in the scheduler. I guess the difference is do we
want to do the expansion eagerly at DagRun creation time, when it could
possibly be done in another process (the LocalTaskJob).
It's probably going to be quite rare in practice that maps will be literals,
so I think it's not even the cost to check this here, given that it's so
unlikely it will do anything useful.
##########
File path: airflow/models/baseoperator.py
##########
@@ -1800,6 +1801,53 @@ def wait_for_downstream(self) -> bool:
def depends_on_past(self) -> bool:
return self.partial_kwargs.get("depends_on_past") or
self.wait_for_downstream
+ def expand_mapped_task(self, upstream_ti: "TaskInstance", session:
"Session" = NEW_SESSION) -> None:
+ """Create the mapped TaskInstances for mapped task."""
+ # TODO: support having multiuple mapped upstreams?
+ from airflow.models.taskmap import TaskMap
+ from airflow.settings import task_instance_mutation_hook
+
+ task_map_info: TaskMap = (
+ session.query(TaskMap)
+ .filter_by(
+ dag_id=upstream_ti.dag_id,
+ task_id=upstream_ti.task_id,
+ run_id=upstream_ti.run_id,
+ map_index=upstream_ti.map_index,
+ )
+ .one()
+ )
+
+ unmapped_ti: Optional[TaskInstance] =
upstream_ti.dag_run.get_task_instance(
+ self.task_id, map_index=-1, session=session
+ )
+
+ maps = range(task_map_info.length)
+
+ if unmapped_ti:
+ # The unmapped TaskInstance still exists -- this means we haven't
+ # tried to run it before.
+ unmapped_ti.map_index = 0
+ maps = range(1, task_map_info.length)
Review comment:
I think skipping makes sense, yeah.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]