This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 497e0d4aad6 feat: add task upstream/downstream filter to Graph & Grid
(#57237)
497e0d4aad6 is described below
commit 497e0d4aad6227aed04e23689886cee8c9bf41cf
Author: Oscar Ligthart <[email protected]>
AuthorDate: Tue Nov 18 21:09:22 2025 +0100
feat: add task upstream/downstream filter to Graph & Grid (#57237)
* feat: add DAG graph lineage filter
* feat: propagate task filter to grid and graph view
* fix: rename to task stream filter and small improvements
* fix: bug in selecting root task in grid view
* refactor: use searchParams instead of LocalStorage hook
* fix: lint issues and improve light mode visuals
---
.../src/airflow/ui/public/i18n/locales/en/dag.json | 12 ++
.../src/context/openGroups/OpenGroupsProvider.tsx | 13 +-
.../airflow/ui/src/layouts/Details/Graph/Graph.tsx | 12 +-
.../airflow/ui/src/layouts/Details/Grid/Grid.tsx | 65 ++++++-
.../ui/src/layouts/Details/PanelButtons.tsx | 2 +
.../ui/src/layouts/Details/TaskStreamFilter.tsx | 200 +++++++++++++++++++++
6 files changed, 298 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json
b/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json
index 8085fcb942d..f101d9eb044 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json
@@ -114,6 +114,18 @@
},
"graphDirection": {
"label": "Graph Direction"
+ },
+ "taskStreamFilter": {
+ "activeFilter": "Active filter",
+ "clearFilter": "Clear Filter",
+ "clickTask": "Click a task to select it as the filter root",
+ "label": "Filter",
+ "options": {
+ "both": "Both upstream & downstream",
+ "downstream": "Downstream",
+ "upstream": "Upstream"
+ },
+ "selectedTask": "Selected task"
}
},
"paramsFailed": "Failed to load params",
diff --git
a/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx
b/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx
index c65e9a9e1c3..188b79a84d7 100644
--- a/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx
+++ b/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { useCallback, useMemo, useEffect, type PropsWithChildren } from
"react";
+import { useCallback, useMemo, useEffect, useRef, type PropsWithChildren }
from "react";
import { useDebouncedCallback } from "use-debounce";
import { useLocalStorage } from "usehooks-ts";
@@ -36,6 +36,13 @@ export const OpenGroupsProvider = ({ children, dagId }:
Props) => {
const [openGroupIds, setOpenGroupIds] =
useLocalStorage<Array<string>>(openGroupsKey, []);
const [allGroupIds, setAllGroupIds] =
useLocalStorage<Array<string>>(allGroupsKey, []);
+ // use a ref to track the current allGroupIds without causing re-renders
+ const allGroupIdsRef = useRef(allGroupIds);
+
+ useEffect(() => {
+ allGroupIdsRef.current = allGroupIds;
+ }, [allGroupIds]);
+
// For Graph view support: dependencies + selected version
const selectedVersion = useSelectedVersion();
const [dependencies] = useLocalStorage<"all" | "immediate" |
"tasks">(`dependencies-${dagId}`, "tasks");
@@ -55,10 +62,10 @@ export const OpenGroupsProvider = ({ children, dagId }:
Props) => {
useEffect(() => {
const observedGroupIds = flattenGraphNodes(structure.nodes).allGroupIds;
- if (JSON.stringify(observedGroupIds) !== JSON.stringify(allGroupIds)) {
+ if (JSON.stringify(observedGroupIds) !==
JSON.stringify(allGroupIdsRef.current)) {
setAllGroupIds(observedGroupIds);
}
- }, [structure.nodes, allGroupIds, setAllGroupIds]);
+ }, [structure.nodes, setAllGroupIds]);
const debouncedSetOpenGroupIds = useDebouncedCallback(
(newGroupIds: Array<string>) => {
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
index 34698872fbd..bd26ef5d5ee 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
@@ -20,7 +20,7 @@ import { useToken } from "@chakra-ui/react";
import { ReactFlow, Controls, Background, MiniMap, type Node as ReactFlowNode
} from "@xyflow/react";
import "@xyflow/react/dist/style.css";
import { useEffect } from "react";
-import { useParams } from "react-router-dom";
+import { useParams, useSearchParams } from "react-router-dom";
import { useLocalStorage } from "usehooks-ts";
import { useStructureServiceStructureData } from "openapi/queries";
@@ -61,9 +61,16 @@ const nodeColor = (
export const Graph = () => {
const { colorMode = "light" } = useColorMode();
const { dagId = "", groupId, runId = "", taskId } = useParams();
+ const [searchParams] = useSearchParams();
const selectedVersion = useSelectedVersion();
+ const filterRoot = searchParams.get("root") ?? undefined;
+ const includeUpstream = searchParams.get("upstream") === "true";
+ const includeDownstream = searchParams.get("downstream") === "true";
+
+ const hasActiveFilter = includeUpstream || includeDownstream;
+
// corresponds to the "bg", "bg.emphasized", "border.inverted" semantic
tokens
const [oddLight, oddDark, evenLight, evenDark, selectedDarkColor,
selectedLightColor] = useToken("colors", [
"bg",
@@ -84,6 +91,9 @@ export const Graph = () => {
{
dagId,
externalDependencies: dependencies === "immediate",
+ includeDownstream,
+ includeUpstream,
+ root: hasActiveFilter && filterRoot !== undefined ? filterRoot :
undefined,
versionNumber: selectedVersion,
},
undefined,
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
index 0943f12f938..3690542e00b 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
@@ -22,11 +22,13 @@ import dayjsDuration from "dayjs/plugin/duration";
import { useEffect, useMemo, useRef, useState } from "react";
import { useTranslation } from "react-i18next";
import { FiChevronsRight } from "react-icons/fi";
-import { Link, useParams } from "react-router-dom";
+import { Link, useParams, useSearchParams } from "react-router-dom";
+import { useStructureServiceStructureData } from "openapi/queries";
import type { DagRunState, DagRunType, GridRunsResponse } from
"openapi/requests";
import { useOpenGroups } from "src/context/openGroups";
import { useNavigation } from "src/hooks/navigation";
+import useSelectedVersion from "src/hooks/useSelectedVersion";
import { useGridRuns } from "src/queries/useGridRuns.ts";
import { useGridStructure } from "src/queries/useGridStructure.ts";
import { isStatePending } from "src/utils";
@@ -54,6 +56,11 @@ export const Grid = ({ dagRunState, limit, runType,
showGantt, triggeringUser }:
const [selectedIsVisible, setSelectedIsVisible] = useState<boolean |
undefined>();
const { openGroupIds, toggleGroupId } = useOpenGroups();
const { dagId = "", runId = "" } = useParams();
+ const [searchParams] = useSearchParams();
+
+ const filterRoot = searchParams.get("root") ?? undefined;
+ const includeUpstream = searchParams.get("upstream") === "true";
+ const includeDownstream = searchParams.get("downstream") === "true";
const { data: gridRuns, isLoading } = useGridRuns({ dagRunState, limit,
runType, triggeringUser });
@@ -77,6 +84,48 @@ export const Grid = ({ dagRunState, limit, runType,
showGantt, triggeringUser }:
triggeringUser,
});
+ const selectedVersion = useSelectedVersion();
+
+ const hasActiveFilter = includeUpstream || includeDownstream;
+
+ // fetch filtered structure when filter is active
+ const { data: taskStructure } = useStructureServiceStructureData(
+ {
+ dagId,
+ externalDependencies: false,
+ includeDownstream,
+ includeUpstream,
+ root: hasActiveFilter && filterRoot !== undefined ? filterRoot :
undefined,
+ versionNumber: selectedVersion,
+ },
+ undefined,
+ {
+ enabled: selectedVersion !== undefined && hasActiveFilter && filterRoot
!== undefined,
+ },
+ );
+
+ // extract allowed task IDs from task structure when filter is active
+ const allowedTaskIds = useMemo(() => {
+ if (!hasActiveFilter || filterRoot === undefined || taskStructure ===
undefined) {
+ return undefined;
+ }
+
+ const taskIds = new Set<string>();
+
+ const addNodeAndChildren = <T extends { children?: Array<T> | null; id:
string }>(currentNode: T) => {
+ taskIds.add(currentNode.id);
+ if (currentNode.children) {
+ currentNode.children.forEach((child) => addNodeAndChildren(child));
+ }
+ };
+
+ taskStructure.nodes.forEach((node) => {
+ addNodeAndChildren(node);
+ });
+
+ return taskIds;
+ }, [hasActiveFilter, filterRoot, taskStructure]);
+
// calculate dag run bar heights relative to max
const max = Math.max.apply(
undefined,
@@ -87,7 +136,19 @@ export const Grid = ({ dagRunState, limit, runType,
showGantt, triggeringUser }:
.filter((duration: number | null): duration is number => duration
!== null),
);
- const { flatNodes } = useMemo(() => flattenNodes(dagStructure,
openGroupIds), [dagStructure, openGroupIds]);
+ const { flatNodes } = useMemo(() => {
+ const nodes = flattenNodes(dagStructure, openGroupIds);
+
+ // filter nodes based on task stream filter if active
+ if (allowedTaskIds !== undefined) {
+ return {
+ ...nodes,
+ flatNodes: nodes.flatNodes.filter((node) =>
allowedTaskIds.has(node.id)),
+ };
+ }
+
+ return nodes;
+ }, [dagStructure, openGroupIds, allowedTaskIds]);
const { setMode } = useNavigation({
onToggleGroup: toggleGroupId,
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx
index 1d8fe5303bc..41019e53c16 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx
@@ -53,6 +53,7 @@ import { dagRunTypeOptions, dagRunStateOptions } from
"src/constants/stateOption
import { useContainerWidth } from "src/utils/useContainerWidth";
import { DagRunSelect } from "./DagRunSelect";
+import { TaskStreamFilter } from "./TaskStreamFilter";
import { ToggleGroups } from "./ToggleGroups";
type Props = {
@@ -259,6 +260,7 @@ export const PanelButtons = ({
</ButtonGroup>
<Flex alignItems="center" gap={1} justifyContent="space-between"
pl={2} pr={6}>
<ToggleGroups />
+ <TaskStreamFilter />
{/* eslint-disable-next-line jsx-a11y/no-autofocus */}
<Popover.Root autoFocus={false} positioning={{ placement:
"bottom-end" }}>
<Popover.Trigger asChild>
diff --git
a/airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx
new file mode 100644
index 00000000000..ce136433ce8
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx
@@ -0,0 +1,200 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Portal, Text, VStack } from "@chakra-ui/react";
+import { useTranslation } from "react-i18next";
+import { FiChevronDown, FiFilter } from "react-icons/fi";
+import { Link, useParams, useSearchParams } from "react-router-dom";
+
+import { Button } from "src/components/ui";
+import { Menu } from "src/components/ui/Menu";
+
+export const TaskStreamFilter = () => {
+ const { t: translate } = useTranslation(["components", "dag"]);
+ const { taskId: currentTaskId } = useParams();
+ const [searchParams] = useSearchParams();
+
+ const filterRoot = searchParams.get("root") ?? undefined;
+ const includeUpstream = searchParams.get("upstream") === "true";
+ const includeDownstream = searchParams.get("downstream") === "true";
+
+ const isCurrentTaskTheRoot = currentTaskId === filterRoot;
+ const bothActive = isCurrentTaskTheRoot && includeUpstream &&
includeDownstream;
+ const activeUpstream = isCurrentTaskTheRoot && includeUpstream &&
!includeDownstream;
+ const activeDownstream = isCurrentTaskTheRoot && includeDownstream &&
!includeUpstream;
+ const hasActiveFilter = includeUpstream || includeDownstream;
+
+ const buildFilterSearch = (upstream: boolean, downstream: boolean, root?:
string) => {
+ const newParams = new URLSearchParams(searchParams);
+
+ if (upstream) {
+ newParams.set("upstream", "true");
+ } else {
+ newParams.delete("upstream");
+ }
+
+ if (downstream) {
+ newParams.set("downstream", "true");
+ } else {
+ newParams.delete("downstream");
+ }
+
+ if (root !== undefined && root !== "" && (upstream || downstream)) {
+ newParams.set("root", root);
+ } else {
+ newParams.delete("root");
+ }
+
+ return newParams.toString();
+ };
+
+ return (
+ <Menu.Root positioning={{ placement: "bottom-end" }}>
+ <Menu.Trigger asChild>
+ <Button bg="bg.subtle" size="sm" variant="outline">
+ <FiFilter />
+ {filterRoot === undefined || !hasActiveFilter
+ ? translate("dag:panel.taskStreamFilter.label")
+ : `${filterRoot}: ${
+ includeUpstream && includeDownstream
+ ? translate("dag:panel.taskStreamFilter.options.both")
+ : includeUpstream
+ ? translate("dag:panel.taskStreamFilter.options.upstream")
+ :
translate("dag:panel.taskStreamFilter.options.downstream")
+ }`}
+ <FiChevronDown size={8} />
+ </Button>
+ </Menu.Trigger>
+ <Portal>
+ <Menu.Positioner>
+ <Menu.Content alignItems="start" display="flex"
flexDirection="column" gap={2} p={4}>
+ <Text fontSize="sm" fontWeight="semibold">
+ {translate("dag:panel.taskStreamFilter.label")}
+ </Text>
+
+ {filterRoot !== undefined && hasActiveFilter ? (
+ <Text color="brand.solid" fontSize="xs" fontWeight="medium">
+ {translate("dag:panel.taskStreamFilter.activeFilter")}:
{filterRoot} -{" "}
+ {includeUpstream && includeDownstream
+ ? translate("dag:panel.taskStreamFilter.options.both")
+ : includeUpstream
+ ? translate("dag:panel.taskStreamFilter.options.upstream")
+ :
translate("dag:panel.taskStreamFilter.options.downstream")}
+ </Text>
+ ) : undefined}
+
+ {currentTaskId === undefined ? (
+ <Text color="fg.muted" fontSize="xs">
+ {translate("dag:panel.taskStreamFilter.clickTask")}
+ </Text>
+ ) : (
+ <Text color="fg.muted" fontSize="xs">
+ {translate("dag:panel.taskStreamFilter.selectedTask")}:
<strong>{currentTaskId}</strong>
+ </Text>
+ )}
+
+ <VStack align="stretch" gap={1} width="100%">
+ <Menu.Item asChild value="upstream">
+ <Button
+ asChild
+ color={activeUpstream ? "white" : undefined}
+ colorPalette={activeUpstream ? "blue" : "gray"}
+ disabled={currentTaskId === undefined}
+ size="sm"
+ variant={activeUpstream ? "solid" : "ghost"}
+ width="100%"
+ >
+ <Link
+ replace
+ style={{
+ justifyContent: "flex-start",
+ pointerEvents: currentTaskId === undefined ? "none" :
"auto",
+ }}
+ to={{ search: buildFilterSearch(true, false,
currentTaskId) }}
+ >
+ {translate("dag:panel.taskStreamFilter.options.upstream")}
+ </Link>
+ </Button>
+ </Menu.Item>
+
+ <Menu.Item asChild value="downstream">
+ <Button
+ asChild
+ color={activeDownstream ? "white" : undefined}
+ colorPalette={activeDownstream ? "blue" : "gray"}
+ disabled={currentTaskId === undefined}
+ size="sm"
+ variant={activeDownstream ? "solid" : "ghost"}
+ width="100%"
+ >
+ <Link
+ replace
+ style={{
+ justifyContent: "flex-start",
+ pointerEvents: currentTaskId === undefined ? "none" :
"auto",
+ }}
+ to={{ search: buildFilterSearch(false, true,
currentTaskId) }}
+ >
+
{translate("dag:panel.taskStreamFilter.options.downstream")}
+ </Link>
+ </Button>
+ </Menu.Item>
+
+ <Menu.Item asChild value="both">
+ <Button
+ asChild
+ color={bothActive ? "white" : undefined}
+ colorPalette={bothActive ? "blue" : "gray"}
+ disabled={currentTaskId === undefined}
+ size="sm"
+ variant={bothActive ? "solid" : "ghost"}
+ width="100%"
+ >
+ <Link
+ replace
+ style={{
+ justifyContent: "flex-start",
+ pointerEvents: currentTaskId === undefined ? "none" :
"auto",
+ }}
+ to={{ search: buildFilterSearch(true, true, currentTaskId)
}}
+ >
+ {translate("dag:panel.taskStreamFilter.options.both")}
+ </Link>
+ </Button>
+ </Menu.Item>
+ </VStack>
+
+ {hasActiveFilter && filterRoot !== undefined ? (
+ <Menu.Item asChild value="clear">
+ <Button asChild size="sm" variant="outline" width="100%">
+ <Link
+ replace
+ style={{ justifyContent: "center" }}
+ to={{ search: buildFilterSearch(false, false) }}
+ >
+ {translate("dag:panel.taskStreamFilter.clearFilter")}
+ </Link>
+ </Button>
+ </Menu.Item>
+ ) : undefined}
+ </Menu.Content>
+ </Menu.Positioner>
+ </Portal>
+ </Menu.Root>
+ );
+};