potiuk commented on code in PR #66469:
URL: https://github.com/apache/airflow/pull/66469#discussion_r3215359801
##########
airflow-ctl/src/airflowctl/api/operations.py:
##########
@@ -920,3 +928,51 @@ def list_import_errors(self) ->
PluginImportErrorCollectionResponse | ServerResp
return
PluginImportErrorCollectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
+
+
+class TaskInstanceOperations(BaseOperations):
+ """Task instance operations."""
+
+ def _parse_task_instance_response(
+ self, data: dict | _list
+ ) -> TaskInstanceResponse | _list[TaskInstanceResponse] |
TaskInstanceCollectionResponse:
+ """Parse task instance response data into appropriate models."""
+ if isinstance(data, list):
+ return [TaskInstanceResponse.model_validate(item) for item in data]
+ if "task_instances" in data:
+ return TaskInstanceCollectionResponse.model_validate(data)
+ return TaskInstanceResponse.model_validate(data)
+
+ def get(
+ self, dag_id: str, dag_run_id: str, task_id: str
+ ) -> TaskInstanceResponse | _list[TaskInstanceResponse] |
TaskInstanceCollectionResponse:
+ """Get a task instance."""
+ self.response =
self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}")
+ return self._parse_task_instance_response(self.response.json())
+
+ def list(self, dag_id: str, dag_run_id: str) ->
TaskInstanceCollectionResponse | ServerResponseError:
+ """List task instances."""
+ return super().execute_list(
+ path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances",
+ data_model=TaskInstanceCollectionResponse,
+ )
+
+ def clear(
+ self, dag_id: str, body: ClearTaskInstancesBody
+ ) -> TaskInstanceCollectionResponse | ServerResponseError:
+ """Clear task instances."""
+ self.response = self.client.post(
+ f"dags/{dag_id}/clearTaskInstances",
+ json=body.model_dump(mode="json", exclude_unset=True),
+ )
+ return
TaskInstanceCollectionResponse.model_validate_json(self.response.content)
+
+ def update(
+ self, dag_id: str, dag_run_id: str, task_id: str, body:
PatchTaskInstanceBody
+ ) -> TaskInstanceResponse | _list[TaskInstanceResponse] |
TaskInstanceCollectionResponse:
+ """Update a task instance."""
+ self.response = self.client.patch(
+ f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}",
+ json=body.model_dump(mode="json", exclude_unset=True),
Review Comment:
*Flagged by the adversarial reviewer (Codex). Cross-checked — the
unindexed-PATCH path is documented to update all map indexes.*
**[high] `update` has no `map_index` path and can mutate all mapped indexes**
> `TaskInstanceOperations.update()` only calls `/taskInstances/{task_id}`
and exposes no `map_index` argument, while the server has a distinct
`/{task_id}/{map_index}` route for indexed updates. On the unindexed route, the
server passes `map_index=None`; `SerializedDAG.set_task_instance_state()`
documents that `map_indexes=None` sets all mapped task instances. So the CLI
cannot intentionally update one mapped task instance, and a state update for a
mapped task id goes through the all-index path. That can turn a targeted manual
state correction into a bulk state mutation.
>
> **Recommendation:** Add a `map_index: int | None` parameter and route to
`/taskInstances/{task_id}/{map_index}` when provided; add CLI/API tests proving
mapped updates affect only the selected map index and that unindexed mapped
updates are either explicit bulk behavior or rejected.
Concretely:
```python
def update(
self,
dag_id: str,
dag_run_id: str,
task_id: str,
body: PatchTaskInstanceBody,
map_index: int | None = None,
) -> TaskInstanceCollectionResponse:
if map_index is not None:
path =
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}"
else:
path = f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
self.response = self.client.patch(
path, json=body.model_dump(mode="json", exclude_unset=True)
)
return
TaskInstanceCollectionResponse.model_validate_json(self.response.content)
```
The CLI side then needs `--map-index` exposed as an argparse arg on the
`taskinstance update` command, and the integration test in
`test_airflowctl_commands.py` should add a parametrised case showing
mapped-update scoped to a single index.
This pairs with my [primary-review finding
#1](https://github.com/apache/airflow/pull/66469#discussion_r4259776302)
(return-type narrowing): once `update` is the indexed/unindexed pair, the
return-type narrowing decision can apply per branch (indexed → single TI?
unindexed → collection).
##########
airflow-ctl/src/airflowctl/ctl/cli_config.py:
##########
@@ -659,10 +696,19 @@ def _get_func(args: Namespace, api_operation: dict,
api_client: Client = NEW_API
datamodel_param_name = parameter_key
if expanded_parameter in self.excluded_parameters:
continue
- if expanded_parameter in args_dict.keys():
+ if (
+ expanded_parameter in args_dict.keys()
+ and args_dict[expanded_parameter] is not None
+ ):
+ val = args_dict[expanded_parameter]
+ if isinstance(val, str) and expanded_parameter
in datamodel.model_fields:
+ if _is_list_annotation(
+
datamodel.model_fields[expanded_parameter].annotation
+ ):
+ val = [v.strip() for v in
val.split(",") if v.strip()]
Review Comment:
*Flagged by the adversarial reviewer (Codex). Cross-checked — the
mapped-clear path is genuinely destructive.*
**[high] CLI cannot express mapped task identifiers for `clear`**
> `ClearTaskInstancesBody.task_ids` supports entries shaped as either
`task_id` or `[task_id, map_index]`, and the API tests cover mapped clears with
nested lists. The CLI conversion added here treats any list-typed field as a
comma-separated flat string list, so an input intended to target `['times_2',
0]` becomes `['times_2', '0']`. That is a different request: it targets every
map index for `times_2` and also a separate task id `0`, or fails depending on
the Dag contents. This is a destructive operation, so an operator trying to
clear one mapped task instance can clear more task instances than intended.
>
> **Recommendation:** Add explicit parsing for
`ClearTaskInstancesBody.task_ids` that preserves nested `[task_id, map_index]`
pairs, with CLI tests covering single mapped index, multiple mapped indexes,
and plain task ids before wiring the clear command as supported.
For reference, the `task_ids` field in `ClearTaskInstancesBody` is typed
roughly as `list[str | tuple[str, int]] | None` — the `_is_list_annotation`
helper added in this PR doesn't model nested structure at all, only "is this
annotation list-shaped". A targeted fix likely needs a per-field hook (or
per-Pydantic-field-validator hook) rather than a generic comma-splitter.
If supporting mapped-index `task_ids` is out of scope for this PR, an
acceptable alternative is to **reject** any `--task-ids` input that looks like
it might want a map index (e.g. dotted form, or any form other than plain
identifiers) and document the limitation, then add the nested-pair support in a
follow-up. Either way, today's behaviour — silently flattening — is not safe
for a destructive command.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]