This is an automated email from the ASF dual-hosted git repository.

choo121600 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 41ec7a09398 Improve query validation, including for streaming (#67212)
41ec7a09398 is described below

commit 41ec7a09398337b433745e5b3f9af9fe1371d1b8
Author: Brent Bovenzi <[email protected]>
AuthorDate: Wed May 20 10:22:26 2026 -0400

    Improve query validation, including for streaming (#67212)
---
 .../airflow/ui/src/queries/gridViewQueryKeys.ts    | 10 ++++++
 .../ui/src/queries/useBulkClearTaskInstances.ts    | 17 +++++++--
 .../airflow/ui/src/queries/useBulkTaskInstances.ts |  3 ++
 .../src/airflow/ui/src/queries/useClearRun.ts      |  3 +-
 .../ui/src/queries/useClearTaskInstances.ts        |  3 +-
 .../src/airflow/ui/src/queries/useDeleteDagRun.ts  |  9 +++--
 .../ui/src/queries/useDeleteTaskInstance.ts        | 12 ++++++-
 .../airflow/ui/src/queries/useGridTISummaries.ts   | 42 ++++++++++++++++++++++
 .../src/airflow/ui/src/queries/usePatchDagRun.ts   |  9 ++++-
 .../airflow/ui/src/queries/usePatchTaskGroup.ts    |  3 +-
 .../airflow/ui/src/queries/usePatchTaskInstance.ts |  3 +-
 .../airflow/ui/src/queries/useUpdateHITLDetail.ts  | 10 +++++-
 12 files changed, 112 insertions(+), 12 deletions(-)

diff --git a/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts 
b/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts
index 42060cac610..2c43ff47eae 100644
--- a/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts
+++ b/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts
@@ -21,7 +21,10 @@ import {
   UseDagServiceGetDagDetailsKeyFn,
   UseDagServiceGetLatestRunInfoKeyFn,
   UseGridServiceGetGridRunsKeyFn,
+  useTaskInstanceServiceGetExtraLinksKey,
+  useTaskInstanceServiceGetLogKey,
   UseTaskInstanceServiceGetTaskInstancesKeyFn,
+  useTaskInstanceServiceGetTaskInstanceTryDetailsKey,
 } from "openapi/queries";
 
 export const gridQueryKeys = (dagId: string) =>
@@ -32,3 +35,10 @@ export const gridQueryKeys = (dagId: string) =>
     UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]),
     UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" }, [{ 
dagId, dagRunId: "~" }]),
   ] as const;
+
+/** Prefix keys for per-attempt TI caches that become stale after any 
mutation. */
+export const tiPerAttemptQueryKeys = [
+  [useTaskInstanceServiceGetLogKey],
+  [useTaskInstanceServiceGetExtraLinksKey],
+  [useTaskInstanceServiceGetTaskInstanceTryDetailsKey],
+] as const;
diff --git 
a/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
index 063b1c6d77d..8c14e36664c 100644
--- a/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
@@ -20,11 +20,17 @@ import { useQueryClient } from "@tanstack/react-query";
 import { useState } from "react";
 import { useTranslation } from "react-i18next";
 
-import { useDagRunServiceGetDagRunsKey, 
useTaskInstanceServiceGetTaskInstancesKey } from "openapi/queries";
+import {
+  useDagRunServiceGetDagRunsKey,
+  useTaskInstanceServiceGetMappedTaskInstanceKey,
+  useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
 import { TaskInstanceService } from "openapi/requests/services.gen";
 import type { TaskInstanceResponse } from "openapi/requests/types.gen";
 import { toaster } from "src/components/ui";
 
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+
 type Props = {
   readonly clearSelections: VoidFunction;
   readonly onSuccessConfirm: VoidFunction;
@@ -46,10 +52,15 @@ export const useBulkClearTaskInstances = ({ 
clearSelections, onSuccessConfirm }:
   const [isPending, setIsPending] = useState(false);
   const { t: translate } = useTranslation(["common", "dags"]);
 
-  const invalidateQueries = async () => {
+  const invalidateQueries = async (dagIds: ReadonlySet<string>) => {
     await Promise.all([
       queryClient.invalidateQueries({ queryKey: 
[useTaskInstanceServiceGetTaskInstancesKey] }),
       queryClient.invalidateQueries({ queryKey: 
[useDagRunServiceGetDagRunsKey] }),
+      queryClient.invalidateQueries({ queryKey: 
[useTaskInstanceServiceGetMappedTaskInstanceKey] }),
+      ...tiPerAttemptQueryKeys.map((key) => queryClient.invalidateQueries({ 
queryKey: key })),
+      ...[...dagIds].flatMap((dagId) =>
+        gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({ 
queryKey: key })),
+      ),
     ]);
   };
 
@@ -92,7 +103,7 @@ export const useBulkClearTaskInstances = ({ clearSelections, 
onSuccessConfirm }:
         ),
       );
 
-      await invalidateQueries();
+      await invalidateQueries(new Set([...byDagRun.values()].map(({ dagId }) 
=> dagId)));
 
       toaster.create({
         description: translate("toaster.bulkClear.success.description", {
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
index 8ad4d616197..ac47c550b37 100644
--- a/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
@@ -32,6 +32,8 @@ import type {
 } from "openapi/requests/types.gen";
 import { toaster } from "src/components/ui";
 
+import { tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+
 type Props = {
   readonly clearSelections: VoidFunction;
   readonly onSuccessConfirm: VoidFunction;
@@ -62,6 +64,7 @@ export const useBulkTaskInstances = ({ clearSelections, 
onSuccessConfirm }: Prop
     await Promise.all([
       queryClient.invalidateQueries({ queryKey: 
[useTaskInstanceServiceGetTaskInstancesKey] }),
       queryClient.invalidateQueries({ queryKey: 
[useDagRunServiceGetDagRunsKey] }),
+      ...tiPerAttemptQueryKeys.map((key) => queryClient.invalidateQueries({ 
queryKey: key })),
     ]);
 
     const isDelete = Boolean(responseData.delete);
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts 
b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
index f4b23c90bd4..18629915a75 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
@@ -28,7 +28,7 @@ import {
 } from "openapi/queries";
 import { createErrorToaster } from "src/utils";
 
-import { gridQueryKeys } from "./gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
 import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun";
 
 export const useClearDagRun = ({
@@ -61,6 +61,7 @@ export const useClearDagRun = ({
       [useDagRunServiceGetDagRunsKey],
       [useClearDagRunDryRunKey, dagId],
       UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+      ...tiPerAttemptQueryKeys,
     ];
 
     await Promise.all([
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts 
b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
index 2d70c49bc5b..33103445472 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
@@ -31,7 +31,7 @@ import type { ApiError } from "openapi/requests";
 import type { ClearTaskInstancesBody, TaskInstanceCollectionResponse } from 
"openapi/requests/types.gen";
 import { toaster } from "src/components/ui";
 
-import { gridQueryKeys } from "./gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
 import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
 import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";
 
@@ -119,6 +119,7 @@ export const useClearTaskInstances = ({
       [useClearTaskInstancesDryRunKey, dagId],
       [usePatchTaskInstanceDryRunKey, dagId, dagRunId],
       UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+      ...tiPerAttemptQueryKeys,
     ];
 
     await Promise.all([
diff --git a/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts 
b/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
index fc45986ac26..c92dcdd3a4e 100644
--- a/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
@@ -23,11 +23,13 @@ import {
   useDagRunServiceDeleteDagRun,
   useDagRunServiceGetDagRunsKey,
   UseDagRunServiceGetDagRunKeyFn,
-  useTaskInstanceServiceGetTaskInstancesKey,
+  UseGanttServiceGetGanttDataKeyFn,
   useTaskInstanceServiceGetHitlDetailsKey,
+  useTaskInstanceServiceGetMappedTaskInstanceKey,
+  useTaskInstanceServiceGetTaskInstancesKey,
 } from "openapi/queries";
 import { toaster } from "src/components/ui";
-import { gridQueryKeys } from "src/queries/gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from 
"src/queries/gridViewQueryKeys";
 import { createErrorToaster } from "src/utils";
 
 type DeleteDagRunParams = {
@@ -57,6 +59,9 @@ export const useDeleteDagRun = ({ dagId, dagRunId, 
onSuccessConfirm }: DeleteDag
       [useDagRunServiceGetDagRunsKey],
       [useTaskInstanceServiceGetTaskInstancesKey],
       [useTaskInstanceServiceGetHitlDetailsKey],
+      UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+      [useTaskInstanceServiceGetMappedTaskInstanceKey],
+      ...tiPerAttemptQueryKeys,
     ];
 
     await Promise.all([
diff --git a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts 
b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
index cdf07e49056..4a387d86cd7 100644
--- a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
@@ -25,11 +25,15 @@ import {
   useTaskInstanceServiceGetTaskInstancesKey,
   useDagRunServiceGetDagRunsKey,
   UseDagRunServiceGetDagRunKeyFn,
+  UseGanttServiceGetGanttDataKeyFn,
   useTaskInstanceServiceGetHitlDetailsKey,
+  useTaskInstanceServiceGetMappedTaskInstanceKey,
 } from "openapi/queries";
 import { toaster } from "src/components/ui";
 import { createErrorToaster } from "src/utils";
 
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+
 type DeleteTaskInstanceParams = {
   dagId: string;
   dagRunId: string;
@@ -66,9 +70,15 @@ export const useDeleteTaskInstance = ({
       [useTaskInstanceServiceGetTaskInstancesKey],
       [useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex, 
taskId }],
       [useTaskInstanceServiceGetHitlDetailsKey],
+      UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+      [useTaskInstanceServiceGetMappedTaskInstanceKey],
+      ...tiPerAttemptQueryKeys,
     ];
 
-    await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ 
queryKey: key })));
+    await Promise.all([
+      ...queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key 
})),
+      ...gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({ 
queryKey: key })),
+    ]);
 
     toaster.create({
       description: 
translate("dags:runAndTaskActions.delete.success.description", {
diff --git a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts 
b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
index 35ff56988fc..953f1d14933 100644
--- a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
@@ -16,12 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+import { useQueryClient } from "@tanstack/react-query";
 import { useEffect, useState } from "react";
 
+import {
+  useDagRunServiceGetDagRunsKey,
+  useGridServiceGetGridRunsKey,
+  useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
 import type { GridTISummaries, TaskInstanceState } from "openapi/requests";
 import { OpenAPI } from "openapi/requests/core/OpenAPI";
 import { isStatePending, useAutoRefresh } from "src/utils";
 
+const GRID_MUTATION_WATCHED_KEYS = new Set([
+  useTaskInstanceServiceGetTaskInstancesKey,
+  useGridServiceGetGridRunsKey,
+  useDagRunServiceGetDagRunsKey,
+]);
+
 /**
  * Streams TI summaries for all grid runs over a single HTTP connection 
(NDJSON).
  *
@@ -41,6 +53,7 @@ export const useGridTiSummariesStream = ({
   runIds: Array<string>;
   states?: Array<TaskInstanceState | null | undefined>;
 }) => {
+  const queryClient = useQueryClient();
   const [summariesByRunId, setSummariesByRunId] = useState<Map<string, 
GridTISummaries>>(new Map());
   const [refreshTick, setRefreshTick] = useState(0);
 
@@ -124,5 +137,34 @@ export const useGridTiSummariesStream = ({
     return () => clearInterval(timer);
   }, [hasActiveRuns, baseRefetchInterval]);
 
+  // Re-stream whenever a mutation invalidates a grid-related query (TI states,
+  // run states, or grid structure).  Invalidation events only fire from 
explicit
+  // invalidateQueries() calls — never from polling intervals — so this never
+  // double-fires with the interval-based refresh above.
+  useEffect(() => {
+    let timeoutId: ReturnType<typeof setTimeout> | undefined;
+
+    const unsubscribe = queryClient.getQueryCache().subscribe((event) => {
+      const [firstKey] = event.query.queryKey as Array<unknown>;
+
+      if (
+        event.type === "updated" &&
+        event.action.type === "invalidate" &&
+        typeof firstKey === "string" &&
+        GRID_MUTATION_WATCHED_KEYS.has(firstKey)
+      ) {
+        // Debounce: a single mutation invalidates several matching queries in 
one tick.
+        clearTimeout(timeoutId);
+        // eslint-disable-next-line max-nested-callbacks
+        timeoutId = setTimeout(() => setRefreshTick((tick) => tick + 1), 0);
+      }
+    });
+
+    return () => {
+      unsubscribe();
+      clearTimeout(timeoutId);
+    };
+  }, [queryClient]);
+
   return { summariesByRunId };
 };
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts 
b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
index 6b78396f891..c032926dc95 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
@@ -23,11 +23,14 @@ import {
   UseDagRunServiceGetDagRunKeyFn,
   useDagRunServiceGetDagRunsKey,
   useDagRunServicePatchDagRun,
+  UseGanttServiceGetGanttDataKeyFn,
+  useTaskInstanceServiceGetMappedTaskInstanceKey,
+  useTaskInstanceServiceGetTaskInstanceKey,
   useTaskInstanceServiceGetTaskInstancesKey,
 } from "openapi/queries";
 import { createErrorToaster } from "src/utils";
 
-import { gridQueryKeys } from "./gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
 import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun";
 
 export const usePatchDagRun = ({
@@ -58,7 +61,11 @@ export const usePatchDagRun = ({
       UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
       [useDagRunServiceGetDagRunsKey],
       [useTaskInstanceServiceGetTaskInstancesKey, { dagId, dagRunId }],
+      [useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId }],
+      [useTaskInstanceServiceGetMappedTaskInstanceKey, { dagId, dagRunId }],
       [useClearDagRunDryRunKey, dagId],
+      UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+      ...tiPerAttemptQueryKeys,
     ];
 
     await Promise.all([
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts 
b/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts
index f3e95c5dd8d..dcdcaca8c52 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts
@@ -25,7 +25,7 @@ import {
 } from "openapi/queries";
 import { createErrorToaster } from "src/utils";
 
-import { gridQueryKeys } from "./gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
 import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
 import { usePatchTaskGroupDryRunKey } from "./usePatchTaskGroupDryRun";
 
@@ -59,6 +59,7 @@ export const usePatchTaskGroup = ({
       [useTaskInstanceServiceGetTaskInstancesKey],
       [usePatchTaskGroupDryRunKey, dagId, dagRunId, groupId],
       [useClearTaskInstancesDryRunKey, dagId],
+      ...tiPerAttemptQueryKeys,
     ];
 
     await Promise.all([
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts 
b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
index 33e1f689e4b..0f0a5988ed8 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
@@ -28,7 +28,7 @@ import {
 } from "openapi/queries";
 import { createErrorToaster } from "src/utils";
 
-import { gridQueryKeys } from "./gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
 import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
 import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";
 
@@ -65,6 +65,7 @@ export const usePatchTaskInstance = ({
       [useTaskInstanceServiceGetTaskInstancesKey],
       [usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }],
       [useClearTaskInstancesDryRunKey, dagId],
+      ...tiPerAttemptQueryKeys,
     ];
 
     if (mapIndex !== undefined) {
diff --git a/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts 
b/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts
index 656ba03f82e..358b8b87bbc 100644
--- a/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts
@@ -23,6 +23,7 @@ import { useTranslation } from "react-i18next";
 import {
   UseDagRunServiceGetDagRunKeyFn,
   useDagRunServiceGetDagRunsKey,
+  UseGanttServiceGetGanttDataKeyFn,
   useTaskInstanceServiceGetHitlDetailsKey,
   useTaskInstanceServiceGetHitlDetailKey,
   useTaskInstanceServiceUpdateHitlDetail,
@@ -33,6 +34,8 @@ import { toaster } from "src/components/ui/Toaster";
 import { createErrorToaster } from "src/utils";
 import type { HITLResponseParams } from "src/utils/hitl";
 
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+
 export const useUpdateHITLDetail = ({
   dagId,
   dagRunId,
@@ -55,9 +58,14 @@ export const useUpdateHITLDetail = ({
       [useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex, 
taskId }],
       [useTaskInstanceServiceGetHitlDetailsKey, { dagIdPrefixPattern: dagId, 
dagRunId }],
       [useTaskInstanceServiceGetHitlDetailKey, { dagId, dagRunId }],
+      UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+      ...tiPerAttemptQueryKeys,
     ];
 
-    await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ 
queryKey: key })));
+    await Promise.all([
+      ...queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key 
})),
+      ...gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({ 
queryKey: key })),
+    ]);
 
     toaster.create({
       title: translate("response.success", { taskId }),

Reply via email to