This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 497e0d4aad6 feat: add task upstream/downstream filter to Graph & Grid  
(#57237)
497e0d4aad6 is described below

commit 497e0d4aad6227aed04e23689886cee8c9bf41cf
Author: Oscar Ligthart <[email protected]>
AuthorDate: Tue Nov 18 21:09:22 2025 +0100

    feat: add task upstream/downstream filter to Graph & Grid  (#57237)
    
    * feat: add DAG graph lineage filter
    
    * feat: propagate task filter to grid and graph view
    
    * fix: rename to task stream filter and small improvements
    
    * fix: bug in selecting root task in grid view
    
    * refactor: use searchParams instead of LocalStorage hook
    
    * fix: lint issues and improve light mode visuals
---
 .../src/airflow/ui/public/i18n/locales/en/dag.json |  12 ++
 .../src/context/openGroups/OpenGroupsProvider.tsx  |  13 +-
 .../airflow/ui/src/layouts/Details/Graph/Graph.tsx |  12 +-
 .../airflow/ui/src/layouts/Details/Grid/Grid.tsx   |  65 ++++++-
 .../ui/src/layouts/Details/PanelButtons.tsx        |   2 +
 .../ui/src/layouts/Details/TaskStreamFilter.tsx    | 200 +++++++++++++++++++++
 6 files changed, 298 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json 
b/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json
index 8085fcb942d..f101d9eb044 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json
@@ -114,6 +114,18 @@
     },
     "graphDirection": {
       "label": "Graph Direction"
+    },
+    "taskStreamFilter": {
+      "activeFilter": "Active filter",
+      "clearFilter": "Clear Filter",
+      "clickTask": "Click a task to select it as the filter root",
+      "label": "Filter",
+      "options": {
+        "both": "Both upstream & downstream",
+        "downstream": "Downstream",
+        "upstream": "Upstream"
+      },
+      "selectedTask": "Selected task"
     }
   },
   "paramsFailed": "Failed to load params",
diff --git 
a/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx 
b/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx
index c65e9a9e1c3..188b79a84d7 100644
--- a/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx
+++ b/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import { useCallback, useMemo, useEffect, type PropsWithChildren } from 
"react";
+import { useCallback, useMemo, useEffect, useRef, type PropsWithChildren } 
from "react";
 import { useDebouncedCallback } from "use-debounce";
 import { useLocalStorage } from "usehooks-ts";
 
@@ -36,6 +36,13 @@ export const OpenGroupsProvider = ({ children, dagId }: 
Props) => {
   const [openGroupIds, setOpenGroupIds] = 
useLocalStorage<Array<string>>(openGroupsKey, []);
   const [allGroupIds, setAllGroupIds] = 
useLocalStorage<Array<string>>(allGroupsKey, []);
 
+  // use a ref to track the current allGroupIds without causing re-renders
+  const allGroupIdsRef = useRef(allGroupIds);
+
+  useEffect(() => {
+    allGroupIdsRef.current = allGroupIds;
+  }, [allGroupIds]);
+
   // For Graph view support: dependencies + selected version
   const selectedVersion = useSelectedVersion();
   const [dependencies] = useLocalStorage<"all" | "immediate" | 
"tasks">(`dependencies-${dagId}`, "tasks");
@@ -55,10 +62,10 @@ export const OpenGroupsProvider = ({ children, dagId }: 
Props) => {
   useEffect(() => {
     const observedGroupIds = flattenGraphNodes(structure.nodes).allGroupIds;
 
-    if (JSON.stringify(observedGroupIds) !== JSON.stringify(allGroupIds)) {
+    if (JSON.stringify(observedGroupIds) !== 
JSON.stringify(allGroupIdsRef.current)) {
       setAllGroupIds(observedGroupIds);
     }
-  }, [structure.nodes, allGroupIds, setAllGroupIds]);
+  }, [structure.nodes, setAllGroupIds]);
 
   const debouncedSetOpenGroupIds = useDebouncedCallback(
     (newGroupIds: Array<string>) => {
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx 
b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
index 34698872fbd..bd26ef5d5ee 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
@@ -20,7 +20,7 @@ import { useToken } from "@chakra-ui/react";
 import { ReactFlow, Controls, Background, MiniMap, type Node as ReactFlowNode 
} from "@xyflow/react";
 import "@xyflow/react/dist/style.css";
 import { useEffect } from "react";
-import { useParams } from "react-router-dom";
+import { useParams, useSearchParams } from "react-router-dom";
 import { useLocalStorage } from "usehooks-ts";
 
 import { useStructureServiceStructureData } from "openapi/queries";
@@ -61,9 +61,16 @@ const nodeColor = (
 export const Graph = () => {
   const { colorMode = "light" } = useColorMode();
   const { dagId = "", groupId, runId = "", taskId } = useParams();
+  const [searchParams] = useSearchParams();
 
   const selectedVersion = useSelectedVersion();
 
+  const filterRoot = searchParams.get("root") ?? undefined;
+  const includeUpstream = searchParams.get("upstream") === "true";
+  const includeDownstream = searchParams.get("downstream") === "true";
+
+  const hasActiveFilter = includeUpstream || includeDownstream;
+
   // corresponds to the "bg", "bg.emphasized", "border.inverted" semantic 
tokens
   const [oddLight, oddDark, evenLight, evenDark, selectedDarkColor, 
selectedLightColor] = useToken("colors", [
     "bg",
@@ -84,6 +91,9 @@ export const Graph = () => {
     {
       dagId,
       externalDependencies: dependencies === "immediate",
+      includeDownstream,
+      includeUpstream,
+      root: hasActiveFilter && filterRoot !== undefined ? filterRoot : 
undefined,
       versionNumber: selectedVersion,
     },
     undefined,
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx 
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
index 0943f12f938..3690542e00b 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
@@ -22,11 +22,13 @@ import dayjsDuration from "dayjs/plugin/duration";
 import { useEffect, useMemo, useRef, useState } from "react";
 import { useTranslation } from "react-i18next";
 import { FiChevronsRight } from "react-icons/fi";
-import { Link, useParams } from "react-router-dom";
+import { Link, useParams, useSearchParams } from "react-router-dom";
 
+import { useStructureServiceStructureData } from "openapi/queries";
 import type { DagRunState, DagRunType, GridRunsResponse } from 
"openapi/requests";
 import { useOpenGroups } from "src/context/openGroups";
 import { useNavigation } from "src/hooks/navigation";
+import useSelectedVersion from "src/hooks/useSelectedVersion";
 import { useGridRuns } from "src/queries/useGridRuns.ts";
 import { useGridStructure } from "src/queries/useGridStructure.ts";
 import { isStatePending } from "src/utils";
@@ -54,6 +56,11 @@ export const Grid = ({ dagRunState, limit, runType, 
showGantt, triggeringUser }:
   const [selectedIsVisible, setSelectedIsVisible] = useState<boolean | 
undefined>();
   const { openGroupIds, toggleGroupId } = useOpenGroups();
   const { dagId = "", runId = "" } = useParams();
+  const [searchParams] = useSearchParams();
+
+  const filterRoot = searchParams.get("root") ?? undefined;
+  const includeUpstream = searchParams.get("upstream") === "true";
+  const includeDownstream = searchParams.get("downstream") === "true";
 
   const { data: gridRuns, isLoading } = useGridRuns({ dagRunState, limit, 
runType, triggeringUser });
 
@@ -77,6 +84,48 @@ export const Grid = ({ dagRunState, limit, runType, 
showGantt, triggeringUser }:
     triggeringUser,
   });
 
+  const selectedVersion = useSelectedVersion();
+
+  const hasActiveFilter = includeUpstream || includeDownstream;
+
+  // fetch filtered structure when filter is active
+  const { data: taskStructure } = useStructureServiceStructureData(
+    {
+      dagId,
+      externalDependencies: false,
+      includeDownstream,
+      includeUpstream,
+      root: hasActiveFilter && filterRoot !== undefined ? filterRoot : 
undefined,
+      versionNumber: selectedVersion,
+    },
+    undefined,
+    {
+      enabled: selectedVersion !== undefined && hasActiveFilter && filterRoot 
!== undefined,
+    },
+  );
+
+  // extract allowed task IDs from task structure when filter is active
+  const allowedTaskIds = useMemo(() => {
+    if (!hasActiveFilter || filterRoot === undefined || taskStructure === 
undefined) {
+      return undefined;
+    }
+
+    const taskIds = new Set<string>();
+
+    const addNodeAndChildren = <T extends { children?: Array<T> | null; id: 
string }>(currentNode: T) => {
+      taskIds.add(currentNode.id);
+      if (currentNode.children) {
+        currentNode.children.forEach((child) => addNodeAndChildren(child));
+      }
+    };
+
+    taskStructure.nodes.forEach((node) => {
+      addNodeAndChildren(node);
+    });
+
+    return taskIds;
+  }, [hasActiveFilter, filterRoot, taskStructure]);
+
   // calculate dag run bar heights relative to max
   const max = Math.max.apply(
     undefined,
@@ -87,7 +136,19 @@ export const Grid = ({ dagRunState, limit, runType, 
showGantt, triggeringUser }:
           .filter((duration: number | null): duration is number => duration 
!== null),
   );
 
-  const { flatNodes } = useMemo(() => flattenNodes(dagStructure, 
openGroupIds), [dagStructure, openGroupIds]);
+  const { flatNodes } = useMemo(() => {
+    const nodes = flattenNodes(dagStructure, openGroupIds);
+
+    // filter nodes based on task stream filter if active
+    if (allowedTaskIds !== undefined) {
+      return {
+        ...nodes,
+        flatNodes: nodes.flatNodes.filter((node) => 
allowedTaskIds.has(node.id)),
+      };
+    }
+
+    return nodes;
+  }, [dagStructure, openGroupIds, allowedTaskIds]);
 
   const { setMode } = useNavigation({
     onToggleGroup: toggleGroupId,
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx 
b/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx
index 1d8fe5303bc..41019e53c16 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx
@@ -53,6 +53,7 @@ import { dagRunTypeOptions, dagRunStateOptions } from 
"src/constants/stateOption
 import { useContainerWidth } from "src/utils/useContainerWidth";
 
 import { DagRunSelect } from "./DagRunSelect";
+import { TaskStreamFilter } from "./TaskStreamFilter";
 import { ToggleGroups } from "./ToggleGroups";
 
 type Props = {
@@ -259,6 +260,7 @@ export const PanelButtons = ({
         </ButtonGroup>
         <Flex alignItems="center" gap={1} justifyContent="space-between" 
pl={2} pr={6}>
           <ToggleGroups />
+          <TaskStreamFilter />
           {/* eslint-disable-next-line jsx-a11y/no-autofocus */}
           <Popover.Root autoFocus={false} positioning={{ placement: 
"bottom-end" }}>
             <Popover.Trigger asChild>
diff --git 
a/airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx 
b/airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx
new file mode 100644
index 00000000000..ce136433ce8
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx
@@ -0,0 +1,200 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Portal, Text, VStack } from "@chakra-ui/react";
+import { useTranslation } from "react-i18next";
+import { FiChevronDown, FiFilter } from "react-icons/fi";
+import { Link, useParams, useSearchParams } from "react-router-dom";
+
+import { Button } from "src/components/ui";
+import { Menu } from "src/components/ui/Menu";
+
+export const TaskStreamFilter = () => {
+  const { t: translate } = useTranslation(["components", "dag"]);
+  const { taskId: currentTaskId } = useParams();
+  const [searchParams] = useSearchParams();
+
+  const filterRoot = searchParams.get("root") ?? undefined;
+  const includeUpstream = searchParams.get("upstream") === "true";
+  const includeDownstream = searchParams.get("downstream") === "true";
+
+  const isCurrentTaskTheRoot = currentTaskId === filterRoot;
+  const bothActive = isCurrentTaskTheRoot && includeUpstream && 
includeDownstream;
+  const activeUpstream = isCurrentTaskTheRoot && includeUpstream && 
!includeDownstream;
+  const activeDownstream = isCurrentTaskTheRoot && includeDownstream && 
!includeUpstream;
+  const hasActiveFilter = includeUpstream || includeDownstream;
+
+  const buildFilterSearch = (upstream: boolean, downstream: boolean, root?: 
string) => {
+    const newParams = new URLSearchParams(searchParams);
+
+    if (upstream) {
+      newParams.set("upstream", "true");
+    } else {
+      newParams.delete("upstream");
+    }
+
+    if (downstream) {
+      newParams.set("downstream", "true");
+    } else {
+      newParams.delete("downstream");
+    }
+
+    if (root !== undefined && root !== "" && (upstream || downstream)) {
+      newParams.set("root", root);
+    } else {
+      newParams.delete("root");
+    }
+
+    return newParams.toString();
+  };
+
+  return (
+    <Menu.Root positioning={{ placement: "bottom-end" }}>
+      <Menu.Trigger asChild>
+        <Button bg="bg.subtle" size="sm" variant="outline">
+          <FiFilter />
+          {filterRoot === undefined || !hasActiveFilter
+            ? translate("dag:panel.taskStreamFilter.label")
+            : `${filterRoot}: ${
+                includeUpstream && includeDownstream
+                  ? translate("dag:panel.taskStreamFilter.options.both")
+                  : includeUpstream
+                    ? translate("dag:panel.taskStreamFilter.options.upstream")
+                    : 
translate("dag:panel.taskStreamFilter.options.downstream")
+              }`}
+          <FiChevronDown size={8} />
+        </Button>
+      </Menu.Trigger>
+      <Portal>
+        <Menu.Positioner>
+          <Menu.Content alignItems="start" display="flex" 
flexDirection="column" gap={2} p={4}>
+            <Text fontSize="sm" fontWeight="semibold">
+              {translate("dag:panel.taskStreamFilter.label")}
+            </Text>
+
+            {filterRoot !== undefined && hasActiveFilter ? (
+              <Text color="brand.solid" fontSize="xs" fontWeight="medium">
+                {translate("dag:panel.taskStreamFilter.activeFilter")}: 
{filterRoot} -{" "}
+                {includeUpstream && includeDownstream
+                  ? translate("dag:panel.taskStreamFilter.options.both")
+                  : includeUpstream
+                    ? translate("dag:panel.taskStreamFilter.options.upstream")
+                    : 
translate("dag:panel.taskStreamFilter.options.downstream")}
+              </Text>
+            ) : undefined}
+
+            {currentTaskId === undefined ? (
+              <Text color="fg.muted" fontSize="xs">
+                {translate("dag:panel.taskStreamFilter.clickTask")}
+              </Text>
+            ) : (
+              <Text color="fg.muted" fontSize="xs">
+                {translate("dag:panel.taskStreamFilter.selectedTask")}: 
<strong>{currentTaskId}</strong>
+              </Text>
+            )}
+
+            <VStack align="stretch" gap={1} width="100%">
+              <Menu.Item asChild value="upstream">
+                <Button
+                  asChild
+                  color={activeUpstream ? "white" : undefined}
+                  colorPalette={activeUpstream ? "blue" : "gray"}
+                  disabled={currentTaskId === undefined}
+                  size="sm"
+                  variant={activeUpstream ? "solid" : "ghost"}
+                  width="100%"
+                >
+                  <Link
+                    replace
+                    style={{
+                      justifyContent: "flex-start",
+                      pointerEvents: currentTaskId === undefined ? "none" : 
"auto",
+                    }}
+                    to={{ search: buildFilterSearch(true, false, 
currentTaskId) }}
+                  >
+                    {translate("dag:panel.taskStreamFilter.options.upstream")}
+                  </Link>
+                </Button>
+              </Menu.Item>
+
+              <Menu.Item asChild value="downstream">
+                <Button
+                  asChild
+                  color={activeDownstream ? "white" : undefined}
+                  colorPalette={activeDownstream ? "blue" : "gray"}
+                  disabled={currentTaskId === undefined}
+                  size="sm"
+                  variant={activeDownstream ? "solid" : "ghost"}
+                  width="100%"
+                >
+                  <Link
+                    replace
+                    style={{
+                      justifyContent: "flex-start",
+                      pointerEvents: currentTaskId === undefined ? "none" : 
"auto",
+                    }}
+                    to={{ search: buildFilterSearch(false, true, 
currentTaskId) }}
+                  >
+                    
{translate("dag:panel.taskStreamFilter.options.downstream")}
+                  </Link>
+                </Button>
+              </Menu.Item>
+
+              <Menu.Item asChild value="both">
+                <Button
+                  asChild
+                  color={bothActive ? "white" : undefined}
+                  colorPalette={bothActive ? "blue" : "gray"}
+                  disabled={currentTaskId === undefined}
+                  size="sm"
+                  variant={bothActive ? "solid" : "ghost"}
+                  width="100%"
+                >
+                  <Link
+                    replace
+                    style={{
+                      justifyContent: "flex-start",
+                      pointerEvents: currentTaskId === undefined ? "none" : 
"auto",
+                    }}
+                    to={{ search: buildFilterSearch(true, true, currentTaskId) 
}}
+                  >
+                    {translate("dag:panel.taskStreamFilter.options.both")}
+                  </Link>
+                </Button>
+              </Menu.Item>
+            </VStack>
+
+            {hasActiveFilter && filterRoot !== undefined ? (
+              <Menu.Item asChild value="clear">
+                <Button asChild size="sm" variant="outline" width="100%">
+                  <Link
+                    replace
+                    style={{ justifyContent: "center" }}
+                    to={{ search: buildFilterSearch(false, false) }}
+                  >
+                    {translate("dag:panel.taskStreamFilter.clearFilter")}
+                  </Link>
+                </Button>
+              </Menu.Item>
+            ) : undefined}
+          </Menu.Content>
+        </Menu.Positioner>
+      </Portal>
+    </Menu.Root>
+  );
+};

Reply via email to