This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a6ea6f9a082 Validate map_index range in CLI commands (#62626)
a6ea6f9a082 is described below
commit a6ea6f9a0827126ef8968985f56c221385491fad
Author: leon.jeon <[email protected]>
AuthorDate: Wed Mar 11 12:41:10 2026 +0000
Validate map_index range in CLI commands (#62626)
* Validate map_index range in CLI commands
When a user passes a map_index to CLI commands like `tasks render` or
`tasks test`, validate that it falls within the parse-time mapped count.
This gives a clear error message instead of silently proceeding with an
invalid index. For dynamically mapped tasks (XCom-based), the validation
is skipped since the count is not available at parse time.
closes: #60463
Co-Authored-By: claude-flow <[email protected]>
* Raise clear error for map_index on non-mapped tasks
Split the except block so NotMapped raises a ValueError instead of
being silently ignored alongside NotFullyPopulated. Also switch the
dynamic-mapping test to the standard pytest.raises pattern.
Co-Authored-By: claude-flow <[email protected]>
* Fix ruff format: collapse single-line raise ValueError
Co-Authored-By: claude-flow <[email protected]>
* Use AirflowConsole for non-JSON pool export output
Replace `rich.print(pools_list)` with `AirflowConsole().print_as()` so
non-JSON export formats (table, yaml, plain) are rendered consistently
with other airflow-ctl commands. Add test verifying the delegation.
Co-Authored-By: claude-flow <[email protected]>
* Revert "Use AirflowConsole for non-JSON pool export output"
This reverts commit 5dc87ee8a66edaa09b23faf59ee7c42510e483c1.
---------
Co-authored-by: claude-flow <[email protected]>
---
.../src/airflow/cli/commands/task_command.py | 16 +++++-
.../tests/unit/cli/commands/test_task_command.py | 62 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/cli/commands/task_command.py
b/airflow-core/src/airflow/cli/commands/task_command.py
index 3cdd5c9f619..e615d63ccf4 100644
--- a/airflow-core/src/airflow/cli/commands/task_command.py
+++ b/airflow-core/src/airflow/cli/commands/task_command.py
@@ -31,10 +31,11 @@ from airflow import settings
from airflow._shared.timezones import timezone
from airflow.cli.simple_table import AirflowConsole
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
-from airflow.exceptions import AirflowConfigException, DagRunNotFound,
TaskInstanceNotFound
+from airflow.exceptions import AirflowConfigException, DagRunNotFound,
NotMapped, TaskInstanceNotFound
from airflow.models import TaskInstance
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun, get_or_create_dagrun
+from airflow.models.expandinput import NotFullyPopulated
from airflow.models.serialized_dag import SerializedDagModel
from airflow.sdk.definitions.dag import DAG, _run_task
from airflow.sdk.definitions.param import ParamsDict
@@ -203,7 +204,18 @@ def _get_ti(
f"TaskInstance for {dag.dag_id}, {task.task_id},
map={map_index} with "
f"run_id or logical_date of {logical_date_or_run_id!r} not
found"
)
- # TODO: Validate map_index is in range?
+ if map_index >= 0:
+ try:
+ total = task.get_parse_time_mapped_ti_count()
+ if map_index >= total:
+ raise ValueError(
+ f"map_index {map_index} is out of range. "
+ f"Task '{task.task_id}' has {total} mapped instance(s)
[0..{total - 1}]."
+ )
+ except NotFullyPopulated:
+ pass # Dynamic mapping — cannot validate at parse time
+ except NotMapped:
+ raise ValueError(f"Task '{task.task_id}' is not mapped;
map_index must be -1.")
dag_version = DagVersion.get_latest_version(dag.dag_id,
session=session)
if not dag_version:
# TODO: Remove this once DagVersion.get_latest_version is
guaranteed to return a DagVersion/raise
diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py
b/airflow-core/tests/unit/cli/commands/test_task_command.py
index 7ba2f3cc4bf..8d96d5579ec 100644
--- a/airflow-core/tests/unit/cli/commands/test_task_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_task_command.py
@@ -310,6 +310,68 @@ class TestCliTasks:
assert "[3]" not in output
assert "property: op_args" in output
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_mapped_task_render_out_of_range_map_index(self):
+ """Raise ValueError when map_index exceeds the parse-time mapped
count."""
+ with pytest.raises(ValueError, match=r"map_index 5 is out of range.*3
mapped instance"):
+ task_command.task_render(
+ self.parser.parse_args(
+ [
+ "tasks",
+ "render",
+ "test_mapped_classic",
+ "consumer_literal",
+ "2022-01-01",
+ "--map-index",
+ "5",
+ ]
+ )
+ )
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_mapped_task_render_boundary_map_index(self):
+ """Render should succeed for the last valid map_index (count - 1)."""
+ with redirect_stdout(io.StringIO()) as stdout:
+ task_command.task_render(
+ self.parser.parse_args(
+ [
+ "tasks",
+ "render",
+ "test_mapped_classic",
+ "consumer_literal",
+ "2022-01-01",
+ "--map-index",
+ "2",
+ ]
+ )
+ )
+ output = stdout.getvalue()
+ assert "[3]" in output
+ assert "property: op_args" in output
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_mapped_task_render_dynamic_skips_validation(self):
+ """Dynamic (XCom-based) mapping should skip map_index validation."""
+ # consumer depends on XCom from make_arg_lists, so parse-time count
+ # is not available. Validation should be skipped (NotFullyPopulated).
+ # The render may fail for other reasons, but not with our
+ # "out of range" ValueError.
+ with pytest.raises(Exception) as exc_info: # noqa: PT011
+ task_command.task_render(
+ self.parser.parse_args(
+ [
+ "tasks",
+ "render",
+ "test_mapped_classic",
+ "consumer",
+ "2022-01-01",
+ "--map-index",
+ "999",
+ ]
+ )
+ )
+ assert "out of range" not in str(exc_info.value)
+
def test_mapped_task_render_with_template(self, dag_maker):
"""
tasks render should render and displays templated fields for a given
mapping task