Copilot commented on code in PR #64271:
URL: https://github.com/apache/airflow/pull/64271#discussion_r3025345944


##########
airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx:
##########
@@ -74,6 +78,22 @@ export const Graph = () => {
 
   const hasActiveFilter = includeUpstream || includeDownstream;
 
+  const graphFilters: GraphFilterValues = useMemo(() => {
+    const durationParam = searchParams.get(SearchParamsKeys.DURATION_GTE);
+    const mapIndexParam = searchParams.get(SearchParamsKeys.MAP_INDEX);
+    const durationVal = durationParam === null ? Number.NaN : 
Number(durationParam);
+    const mapIndexVal = mapIndexParam === null ? Number.NaN : 
Number(mapIndexParam);
+
+    return {
+      durationThreshold: Number.isNaN(durationVal) ? undefined : durationVal,
+      mapIndex: Number.isNaN(mapIndexVal) ? undefined : mapIndexVal,
+      mappedFilter: searchParams.get(SearchParamsKeys.MAPPED) ?? undefined,
+      selectedOperators: searchParams.getAll(SearchParamsKeys.OPERATOR),
+      selectedStates: searchParams.getAll(SearchParamsKeys.STATE),

Review Comment:
   This reads task-instance status selections from `SearchParamsKeys.STATE`. 
Given `STATE` is used for DAG run state filters in other pages and `TASK_STATE` 
exists for task-instance state (see 
`pages/Dashboard/HistoricalMetrics/MetricSection.tsx:53`), consider switching 
this graph filter to use `SearchParamsKeys.TASK_STATE` for clearer semantics 
and fewer query-param collisions.
   ```suggestion
         selectedStates: searchParams.getAll(SearchParamsKeys.TASK_STATE),
   ```



##########
airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx:
##########
@@ -24,31 +24,117 @@ import {
   HStack,
   IconButton,
   Input,
+  type NumberInputValueChangeDetails,
   Portal,
   Separator,
   Text,
   VStack,
 } from "@chakra-ui/react";
-import { useEffect } from "react";
+import { useEffect, useMemo } from "react";
 import { useTranslation } from "react-i18next";
 import { FiFilter, FiInfo } from "react-icons/fi";
 import { useParams, useSearchParams } from "react-router-dom";
 
+import { useStructureServiceStructureData } from "openapi/queries";
+import type { NodeResponse } from "openapi/requests/types.gen";
 import { Tooltip } from "src/components/ui";
 import { Menu } from "src/components/ui/Menu";
+import { NumberInputField, NumberInputRoot } from 
"src/components/ui/NumberInput";
+import { SearchParamsKeys } from "src/constants/searchParams";
+import { taskInstanceStateOptions } from "src/constants/stateOptions";
+import useSelectedVersion from "src/hooks/useSelectedVersion";
+import { AttrSelectFilter } from 
"src/pages/Dag/Tasks/TaskFilters/AttrSelectFilter";
+import { AttrSelectFilterMulti } from 
"src/pages/Dag/Tasks/TaskFilters/AttrSelectFilterMulti";
 
-export const TaskStreamFilter = () => {
-  const { t: translate } = useTranslation(["common", "components", "dag"]);
-  const { taskId: currentTaskId } = useParams();
+const collectOperators = (nodes: Array<NodeResponse>): Array<string> => {
+  const operators = new Set<string>();
+
+  const walk = (nodeList: Array<NodeResponse>) => {
+    for (const node of nodeList) {
+      if (node.operator !== undefined && node.operator !== null) {
+        operators.add(node.operator);
+      }
+      if (node.children) {
+        walk(node.children);
+      }
+    }
+  };
+
+  walk(nodes);
+
+  return [...operators].sort();
+};
+
+const collectTaskGroups = (nodes: Array<NodeResponse>): Array<string> => {
+  const groups: Array<string> = [];
+
+  const walk = (nodeList: Array<NodeResponse>) => {
+    for (const node of nodeList) {
+      if (node.children) {
+        groups.push(node.id);
+        walk(node.children);
+      }
+    }
+  };
+
+  walk(nodes);
+
+  return groups.sort();
+};
+
+type Props = {
+  readonly dagView: "graph" | "grid";
+};
+
+export const TaskStreamFilter = ({ dagView }: Props) => {
+  const { t: translate } = useTranslation(["common", "components", "dag", 
"tasks"]);
+  const { dagId = "", runId, taskId: currentTaskId } = useParams();
   const [searchParams, setSearchParams] = useSearchParams();
+  const selectedVersion = useSelectedVersion();
 
   const filterRoot = searchParams.get("root") ?? undefined;
   const includeUpstream = searchParams.get("upstream") === "true";
   const includeDownstream = searchParams.get("downstream") === "true";
   const depth = searchParams.get("depth") ?? undefined;
   const mode = searchParams.get("mode") ?? "static";
 
-  const hasActiveFilter = includeUpstream || includeDownstream;
+  // Graph task filters
+  const { data: graphData } = useStructureServiceStructureData(
+    { dagId, versionNumber: selectedVersion },
+    undefined,
+    { enabled: dagView === "graph" && selectedVersion !== undefined },
+  );
+  const graphNodes = useMemo(() => graphData?.nodes ?? [], [graphData?.nodes]);
+  const allOperators = useMemo(() => collectOperators(graphNodes), 
[graphNodes]);
+  const allTaskGroups = useMemo(() => collectTaskGroups(graphNodes), 
[graphNodes]);
+
+  const selectedOperators = searchParams.getAll(SearchParamsKeys.OPERATOR);
+  const selectedTaskGroups = searchParams.getAll(SearchParamsKeys.TASK_GROUP);
+  const selectedStates = searchParams.getAll(SearchParamsKeys.STATE);

Review Comment:
   For the graph’s task-instance status filter, the code uses the generic 
`SearchParamsKeys.STATE` query param. Elsewhere in the UI, `STATE` is used for 
DAG run state while task-instance state uses `SearchParamsKeys.TASK_STATE` 
(e.g., `pages/Dashboard/HistoricalMetrics/MetricSection.tsx:53`).
   
   Using `TASK_STATE` here would make the URL semantics clearer and avoid 
accidental cross-page coupling if users navigate with existing query strings.
   ```suggestion
     const selectedStates = searchParams.getAll(SearchParamsKeys.TASK_STATE);
   ```



##########
airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx:
##########
@@ -151,12 +171,27 @@ export const Graph = () => {
     };
   });
 
+  const nodes = useGraphFilteredNodes(nodesWithTI, graphFilters);
+
+  const filteredNodeIds = useMemo(() => {
+    const ids = new Set<string>();
+
+    for (const node of nodes ?? []) {
+      if (node.data.isFiltered) {
+        ids.add(node.id);
+      }
+    }
+
+    return ids;
+  }, [nodes]);
+
   const edges = (data?.edges ?? []).map((edge) => ({
     ...edge,
     data: {
       ...edge.data,
       rest: {
         ...edge.data?.rest,
+        isFiltered: filteredNodeIds.has(edge.source) && 
filteredNodeIds.has(edge.target),
         isSelected:

Review Comment:
   `rest.isFiltered` is being computed and written onto edges, but the edge 
renderer (`src/components/Graph/Edge.tsx`) never reads it (it only uses 
`rest.isSelected` / `rest.isSetupTeardown` / `rest.edgeType`). This makes the 
`filteredNodeIds` pass and the extra edge field effectively dead code.
   
   Either remove `rest.isFiltered` (and the associated `filteredNodeIds` 
computation) or update the edge component to actually use it (e.g., dim 
filtered edges).



##########
airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx:
##########
@@ -289,6 +411,91 @@ export const TaskStreamFilter = () => {
                 </Button>
               </Menu.Item>
             ) : undefined}
+
+            {dagView === "graph" ? (
+              <>
+                <Separator my={2} />
+                <Text fontSize="sm" fontWeight="semibold">
+                  {translate("dag:panel.graphFilters.title")}
+                </Text>

Review Comment:
   This uses new `dag:panel.graphFilters.*` translation keys. Airflow’s i18n 
lint rule requires every non-English locale to contain all keys present in `en` 
(see `ui/rules/i18n.js`). Currently, at least `public/i18n/locales/de/dag.json` 
has no `panel.graphFilters` section, so this will introduce missing-key 
warnings.
   
   Please add the new `panel.graphFilters` keys to all locale `dag.json` files 
(they can be English placeholders if translations aren’t available yet).



##########
airflow-core/src/airflow/ui/src/layouts/Details/Graph/useGraphFilteredNodes.ts:
##########
@@ -0,0 +1,145 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import type { Node as ReactFlowNode } from "@xyflow/react";
+import { useMemo } from "react";
+
+import type { CustomNodeProps } from "src/components/Graph/reactflowUtils";
+
+type GraphFilterValues = {
+  durationThreshold?: number;
+  mapIndex?: number;
+  mappedFilter?: string;
+  selectedOperators: Array<string>;
+  selectedStates: Array<string>;
+  selectedTaskGroups: Array<string>;
+};
+
+const hasActiveFilters = (filters: GraphFilterValues): boolean =>
+  filters.selectedOperators.length > 0 ||
+  filters.selectedTaskGroups.length > 0 ||
+  filters.selectedStates.length > 0 ||
+  filters.mappedFilter !== undefined ||
+  filters.durationThreshold !== undefined ||
+  filters.mapIndex !== undefined;
+
+const getTaskDurationSeconds = (
+  minStartDate: string | null | undefined,
+  maxEndDate: string | null | undefined,
+): number | undefined => {
+  if (
+    minStartDate === null ||
+    minStartDate === undefined ||
+    maxEndDate === null ||
+    maxEndDate === undefined
+  ) {
+    return undefined;
+  }
+  const start = new Date(minStartDate).getTime();
+  const end = new Date(maxEndDate).getTime();
+
+  if (Number.isNaN(start) || Number.isNaN(end)) {
+    return undefined;
+  }
+
+  return (end - start) / 1000;
+};
+
+const isNodeFiltered = (node: ReactFlowNode<CustomNodeProps>, filters: 
GraphFilterValues): boolean => {
+  if (node.type !== "task") {
+    return false;
+  }
+
+  const { isMapped, operator, taskInstance } = node.data;
+
+  if (
+    filters.selectedOperators.length > 0 &&
+    (operator === undefined || operator === null || 
!filters.selectedOperators.includes(operator))
+  ) {
+    return true;
+  }
+
+  if (filters.selectedTaskGroups.length > 0) {
+    const matchesGroup = filters.selectedTaskGroups.some(
+      (group) => node.id === group || node.id.startsWith(`${group}.`),
+    );
+
+    if (!matchesGroup) {
+      return true;
+    }
+  }
+
+  if (filters.selectedStates.length > 0) {
+    const state = taskInstance?.state ?? "none";
+
+    if (!filters.selectedStates.includes(state)) {
+      return true;
+    }
+  }
+
+  if (filters.mappedFilter !== undefined) {
+    const isMappedBool = Boolean(isMapped);
+
+    if (filters.mappedFilter === "true" && !isMappedBool) {
+      return true;
+    }
+    if (filters.mappedFilter === "false" && isMappedBool) {
+      return true;
+    }
+  }
+
+  if (filters.mapIndex !== undefined) {
+    if (!isMapped) {
+      return true;
+    }
+  }

Review Comment:
   The `mapIndex` filter currently doesn’t filter by the selected index value. 
When `filters.mapIndex` is set, the logic only checks whether the node is 
mapped (`!isMapped => filtered`) and never compares against the requested 
index, so all mapped tasks remain visible regardless of the input.
   
   Either remove this filter until the graph nodes carry per-index information, 
or extend `CustomNodeProps.taskInstance`/node data to include `map_index` (or 
equivalent) and apply an equality check against `filters.mapIndex`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to