Copilot commented on code in PR #60161:
URL: https://github.com/apache/airflow/pull/60161#discussion_r3066482337


##########
airflow-core/src/airflow/ui/src/queries/useBulkUpdateTaskInstancesDryRun.ts:
##########
@@ -0,0 +1,65 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQuery, type UseQueryOptions } from "@tanstack/react-query";
+
+import { OpenAPI } from "openapi/requests/core/OpenAPI";
+import { request as __request } from "openapi/requests/core/request";
+import type {
+  BulkBody_BulkTaskInstanceBody_,
+  TaskInstanceCollectionResponse,
+} from "openapi/requests/types.gen";
+
+type Props<TData, TError> = {
+  dagId: string;
+  dagRunId: string;
+  options?: Omit<UseQueryOptions<TData, TError>, "queryFn" | "queryKey">;
+  requestBody?: BulkBody_BulkTaskInstanceBody_;
+};
+
+export const useBulkUpdateTaskInstancesDryRunKey = 
"bulkUpdateTaskInstancesDryRun";
+
+export const useBulkUpdateTaskInstancesDryRun = <TData = 
TaskInstanceCollectionResponse, TError = unknown>({
+  dagId,
+  dagRunId,
+  options,
+  requestBody,
+}: Props<TData, TError>) =>
+  useQuery<TData, TError>({
+    ...options,
+    queryFn: () =>
+      __request(OpenAPI, {
+        body: requestBody ?? { actions: [] },
+        errors: {
+          401: "Unauthorized",
+          403: "Forbidden",
+          422: "Validation Error",
+        },
+        mediaType: "application/json",
+        method: "PATCH",
+        path: {
+          dag_id: dagId,
+          dag_run_id: dagRunId,
+        },
+        query: {
+          dry_run: true,
+        },
+        url: "/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances",

Review Comment:
   The backend route added in this PR is `PATCH 
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/dry_run` (path 
suffix), but this hook calls `/taskInstances` with `dry_run=true` query param. 
This mismatch will cause 404s and break the UI dry-run. Update the hook to call 
the new `/taskInstances/dry_run` endpoint (or switch to the generated 
`TaskInstanceService.bulkTaskInstancesDryRun` / 
`useTaskInstanceServiceBulkTaskInstancesDryRun`).
   ```suggestion
           url: 
"/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/dry_run",
   ```



##########
airflow-core/src/airflow/ui/src/queries/useBulkUpdateTaskInstancesDryRun.test.ts:
##########
@@ -0,0 +1,235 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { renderHook, waitFor } from "@testing-library/react";
+import { describe, it, expect, vi, beforeEach } from "vitest";
+
+import { BaseWrapper } from "src/utils/Wrapper";
+
+import {
+  useBulkUpdateTaskInstancesDryRun,
+  useBulkUpdateTaskInstancesDryRunKey,
+} from "./useBulkUpdateTaskInstancesDryRun";
+
+// Mock the openapi request
+vi.mock("openapi/requests/core/request", () => ({
+  request: vi.fn(),
+}));
+
+type RequestConfig = {
+  readonly body?: {
+    readonly actions?: Array<{
+      readonly entities?: Array<{ readonly task_id?: string }>;
+    }>;
+  };
+};
+
+describe("useBulkUpdateTaskInstancesDryRun", () => {
+  const mockRequest = vi.fn();
+
+  beforeEach(async () => {
+    vi.clearAllMocks();
+    const { request } = await import("openapi/requests/core/request");
+
+    vi.mocked(request).mockImplementation(mockRequest);
+  });
+
+  it("should have correct query key", () => {
+    
expect(useBulkUpdateTaskInstancesDryRunKey).toBe("bulkUpdateTaskInstancesDryRun");
+  });
+
+  it("should call request with dry_run=true query parameter", async () => {
+    const mockResponse = {
+      task_instances: [],
+      total_entries: 0,
+    };
+
+    mockRequest.mockResolvedValue(mockResponse);
+
+    const { result } = renderHook(
+      () =>
+        useBulkUpdateTaskInstancesDryRun({
+          dagId: "test-dag",
+          dagRunId: "test-run",
+          requestBody: {
+            actions: [
+              {
+                action: "update",
+                entities: [
+                  {
+                    map_index: -1,
+                    new_state: "success",
+                    task_id: "test-task",
+                  },
+                ],
+              },
+            ],
+          },
+        }),
+      {
+        wrapper: BaseWrapper,
+      },
+    );
+
+    await waitFor(() => {
+      expect(result.current.isSuccess).toBe(true);
+    });
+
+    expect(mockRequest).toHaveBeenCalledWith(
+      expect.anything(),
+      expect.objectContaining({
+        method: "PATCH",
+        path: {
+          dag_id: "test-dag",
+          dag_run_id: "test-run",
+        },
+        query: {
+          dry_run: true,
+        },
+      }),
+    );

Review Comment:
   This test locks in the (now incorrect) client contract of calling 
`/taskInstances` with `dry_run=true`. If the hook is updated to use the new 
`/taskInstances/dry_run` endpoint, update this assertion to validate the 
correct URL and remove the `dry_run` query expectation. This is required to 
prevent the test suite from passing while the UI remains incompatible with the 
API.



##########
airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml:
##########
@@ -7377,11 +7322,186 @@ paths:
         schema:
           type: string
           title: Dag Run Id
+      - name: identifier
+        in: path
+        required: true
+        schema:
+          anyOf:
+          - type: string
+          - type: 'null'

Review Comment:
   The generated OpenAPI marks the `identifier` *path* parameter as nullable 
(`string | null`). Path params cannot be null when the segment is required. 
This comes from the FastAPI signature using `identifier: str | None = None`. 
Update the route signatures so required path params are non-optional (`str` 
with no default), then regenerate the OpenAPI spec/client.
   ```suggestion
             type: string
   ```



##########
airflow-core/src/airflow/ui/src/components/MarkAs/TaskInstance/MarkGroupTaskInstanceAsDialog.tsx:
##########
@@ -0,0 +1,168 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Button, Flex, Heading, VStack } from "@chakra-ui/react";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+import { useParams } from "react-router-dom";
+
+import type { LightGridTaskInstanceSummary, TaskInstanceState } from 
"openapi/requests/types.gen";
+import { ActionAccordion } from "src/components/ActionAccordion";
+import { StateBadge } from "src/components/StateBadge";
+import Time from "src/components/Time";
+import { Dialog } from "src/components/ui";
+import SegmentedControl from "src/components/ui/SegmentedControl";
+import { usePatchTaskGroup } from "src/queries/usePatchTaskGroup";
+import { usePatchTaskGroupDryRun } from "src/queries/usePatchTaskGroupDryRun";
+
+type Props = {
+  readonly groupTaskInstance: LightGridTaskInstanceSummary;
+  readonly onClose: () => void;
+  readonly open: boolean;
+  readonly state: TaskInstanceState;
+};
+
+const MarkGroupTaskInstanceAsDialog = ({ groupTaskInstance, onClose, open, 
state }: Props) => {
+  const { t: translate } = useTranslation();
+  const { dagId = "", runId = "" } = useParams();
+  const groupId = groupTaskInstance.task_id;
+
+  const [selectedOptions, setSelectedOptions] = useState<Array<string>>([]);
+
+  const past = selectedOptions.includes("past");
+  const future = selectedOptions.includes("future");
+  const upstream = selectedOptions.includes("upstream");
+  const downstream = selectedOptions.includes("downstream");
+
+  // eslint-disable-next-line unicorn/no-null -- DAGRunResponse["note"] type 
requires null, not undefined
+  const [note, setNote] = useState<string | null>(null);
+
+  const { data, isPending: isPendingDryRun } = usePatchTaskGroupDryRun({
+    dagId,
+    dagRunId: runId,
+    options: {
+      enabled: open,
+      refetchOnMount: "always",
+    },
+    requestBody: {
+      include_downstream: downstream,
+      include_future: future,
+      include_past: past,
+      include_upstream: upstream,
+      new_state: state,
+      note,

Review Comment:
   The dry-run request includes `note`, but notes don’t affect which task 
instances are selected; including it increases request churn (and if the 
dry-run hook’s cache key doesn’t include `note`, it can lead to confusing cache 
behavior while typing). Prefer omitting `note` from dry-run requests, or (if 
the API intentionally echoes note-related data) include `note` in the dry-run 
queryKey so caching stays consistent.
   ```suggestion
   
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -949,41 +1028,92 @@ def bulk_task_instances(
 def patch_task_instance(
     dag_id: str,
     dag_run_id: str,
-    task_id: str,
     dag_bag: DagBagDep,
     body: PatchTaskInstanceBody,
     user: GetUserDep,
     session: SessionDep,
+    identifier: str | None = None,
+    task_id: str | None = None,
     map_index: int | None = None,
+    task_group_id: str | None = Query(None, description="Task group id to 
update task instances for"),
     update_mask: list[str] | None = Query(None),
 ) -> TaskInstanceCollectionResponse:

Review Comment:
   This endpoint’s path is `.../taskInstances/{identifier}`, but `identifier` 
is declared optional and there is also a `task_id` query parameter that can 
override which task is actually patched. That allows requests like 
`/taskInstances/foo?task_id=bar`, where the modified resource doesn’t match the 
URL path, and it also generates an OpenAPI schema with nullable path params 
(invalid for required path segments). Make `identifier: str` (no default) and 
remove the `task_id` query parameter from this route; keep `task_group_id` as 
the selector for group updates.



##########
airflow-core/src/airflow/ui/src/queries/useBulkUpdateTaskInstances.ts:
##########
@@ -0,0 +1,78 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+import { useTranslation } from "react-i18next";
+
+import {
+  useTaskInstanceServiceBulkTaskInstances,
+  useTaskInstanceServiceGetTaskInstancesKey,
+  UseGridServiceGetGridRunsKeyFn,
+  UseGridServiceGetGridTiSummariesKeyFn,
+  useGridServiceGetGridTiSummariesKey,
+} from "openapi/queries";
+import { toaster } from "src/components/ui";
+
+type Props = {
+  readonly affectsMultipleRuns?: boolean;
+  readonly dagId: string;
+  readonly dagRunId: string;
+  readonly onSuccess?: () => void;
+};
+
+export const useBulkUpdateTaskInstances = ({
+  affectsMultipleRuns = false,
+  dagId,
+  dagRunId,
+  onSuccess,
+}: Props) => {
+  const queryClient = useQueryClient();
+  const { t: translate } = useTranslation();
+
+  const onError = (error: Error) => {
+    toaster.create({
+      description: error.message,
+      title: translate("toaster.update.error", {
+        resourceName: translate("taskGroup"),

Review Comment:
   This hook is for bulk updating task instances, but the error toast uses 
`resourceName: translate(\"taskGroup\")`, which will produce a misleading error 
title in non-task-group contexts. Use the task-instance translation key 
(consistent with other TI toasts, e.g. `taskInstance_one`) or make the resource 
name configurable via props.
   ```suggestion
           resourceName: translate("taskInstance_one"),
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py:
##########
@@ -491,3 +596,139 @@ def handle_bulk_delete(
 
         except HTTPException as e:
             results.errors.append({"error": f"{e.detail}", "status_code": 
e.status_code})
+
+    def handle_request_dry_run(self) -> TaskInstanceCollectionResponse:
+        """Handle bulk request in dry_run mode, returning affected task 
instances without making changes."""
+        from airflow.api_fastapi.common.dagbag import get_latest_version_of_dag
+
+        all_affected_tis: list[TI] = []
+        seen_ti_keys: set[tuple[str, str, str, int]] = set()
+
+        for action in self.request.actions:
+            if action.action.value != "update":
+                # Only update actions are supported for dry_run
+                continue
+
+            update_action = cast("BulkUpdateAction[BulkTaskInstanceBody]", 
action)
+            # Validate and categorize entities
+            update_specific_map_index_task_keys, 
update_all_map_index_task_keys = self._categorize_entities(
+                update_action.entities, BulkActionResponse()
+            )
+
+            specific_entity_map = {
+                (entity.dag_id, entity.dag_run_id, entity.task_id, 
entity.map_index): entity
+                for entity in update_action.entities
+                if entity.map_index is not None
+            }
+            all_map_entity_map = {
+                (entity.dag_id, entity.dag_run_id, entity.task_id): entity
+                for entity in update_action.entities
+                if entity.map_index is None
+            }
+
+            # Handle updates for specific map_index task instances
+            if update_specific_map_index_task_keys:
+                _, matched_task_keys, _ = 
self._categorize_task_instances(update_specific_map_index_task_keys)
+
+                for dag_id, dag_run_id, task_id, map_index in 
matched_task_keys:
+                    entity = specific_entity_map.get((dag_id, dag_run_id, 
task_id, map_index))
+                    if entity is None or entity.new_state is None:
+                        continue
+
+                    dag = get_latest_version_of_dag(self.dag_bag, dag_id, 
self.session)
+                    if not dag:
+                        continue
+
+                    # Simulate state change without committing
+                    affected_tis = (
+                        dag.set_task_instance_state(
+                            task_id=task_id,
+                            run_id=dag_run_id,
+                            map_indexes=[map_index],
+                            state=entity.new_state,
+                            upstream=entity.include_upstream or False,
+                            downstream=entity.include_downstream or False,
+                            future=entity.include_future or False,
+                            past=entity.include_past or False,
+                            commit=False,
+                            session=self.session,
+                        )
+                        or []
+                    )
+
+                    # Add unique task instances
+                    for ti in affected_tis:
+                        ti_key = (
+                            ti.dag_id,
+                            ti.run_id,
+                            ti.task_id,
+                            ti.map_index if ti.map_index is not None else -1,
+                        )
+                        if ti_key not in seen_ti_keys:
+                            seen_ti_keys.add(ti_key)
+                            all_affected_tis.append(ti)

Review Comment:
   This manually re-implements unique task instance collection logic. Since 
`_collect_unique_tis(...)` is introduced in the same module and already 
encapsulates this behavior, reuse it here (and in the other loop below) to 
avoid duplication and ensure keying stays consistent across endpoints.
   ```suggestion
                       _collect_unique_tis(affected_tis, seen_ti_keys, 
all_affected_tis)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to