uranusjr commented on a change in pull request #21210:
URL: https://github.com/apache/airflow/pull/21210#discussion_r795859822
##########
File path: airflow/executors/debug_executor.py
##########
@@ -76,6 +79,8 @@ def _run_task(self, ti: TaskInstance) -> bool:
key = ti.key
try:
params = self.tasks_params.pop(ti.key, {})
+ if ti.task.is_mapped:
+ ti.task = cast("MappedOperator", ti.task).unmap()
Review comment:
I wonder if it’d be nicer to also implement `unmap()` on `BaseOperator`
and have it simply `return self`.
##########
File path: airflow/jobs/backfill_job.py
##########
@@ -623,15 +650,25 @@ def tabulate_ti_keys_set(set_ti_keys:
Set[TaskInstanceKey]) -> str:
# Sorting by execution date first
sorted_ti_keys = sorted(
set_ti_keys,
- key=lambda ti_key: (ti_key.run_id, ti_key.dag_id,
ti_key.task_id, ti_key.try_number),
+ key=lambda ti_key: (
+ ti_key.run_id,
+ ti_key.dag_id,
+ ti_key.task_id,
+ ti_key.map_index,
+ ti_key.try_number,
+ ),
+ )
+ return tabulate(
+ sorted_ti_keys, headers=["DAG ID", "Task ID", "Run ID", "Map
Index", "Try number"]
)
- return tabulate(sorted_ti_keys, headers=["DAG ID", "Task ID", "Run
ID", "Try number"])
def tabulate_tis_set(set_tis: Set[TaskInstance]) -> str:
# Sorting by execution date first
sorted_tis = sorted(set_tis, key=lambda ti: (ti.run_id, ti.dag_id,
ti.task_id, ti.try_number))
- tis_values = ((ti.dag_id, ti.task_id, ti.run_id, ti.try_number)
for ti in sorted_tis)
- return tabulate(tis_values, headers=["DAG ID", "Task ID", "Run
ID", "Try number"])
+ tis_values = (
+ (ti.dag_id, ti.task_id, ti.run_id, ti.map_index,
ti.try_number) for ti in sorted_tis
+ )
+ return tabulate(tis_values, headers=["DAG ID", "Task ID", "Run
ID", "Map Index", "Try number"])
Review comment:
Probably should be discussed in more detail in the UX phase but I wonder
if we should hide the map index column entirely if no tis are mapped, and
display an empty cell (instead of -1) for non-mapped tis.
##########
File path: airflow/models/baseoperator.py
##########
@@ -1649,6 +1616,7 @@ def _trigger_rule_from_kwargs(self) -> int:
@classmethod
def from_operator(cls, operator: BaseOperator, mapped_kwargs: Dict[str,
Any]) -> "MappedOperator":
dag: Optional["DAG"] = getattr(operator, '_dag', None)
+ tg: Optional["TaskGroup"] = getattr(operator, 'task_group', None)
Review comment:
When are those attributes not available? We can probably make both of
these always present on the class (defaulting to None) instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]