This is an automated email from the ASF dual-hosted git repository.

choo121600 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 21f5f9353f3 Add shared test fixtures and API-based data isolation 
(#64202)
21f5f9353f3 is described below

commit 21f5f9353f37db61d4765605c81dec4faf0640e4
Author: Yeonguk Choo <[email protected]>
AuthorDate: Sun Mar 29 22:31:02 2026 +0900

    Add shared test fixtures and API-based data isolation (#64202)
---
 airflow-core/src/airflow/ui/playwright.config.ts   |   1 +
 .../airflow/ui/tests/e2e/fixtures/asset-data.ts    |  66 ++
 .../ui/tests/e2e/fixtures/audit-log-data.ts        |  69 ++
 .../airflow/ui/tests/e2e/fixtures/calendar-data.ts |  89 +++
 .../airflow/ui/tests/e2e/fixtures/dag-runs-data.ts |  91 +++
 .../ui/tests/e2e/fixtures/dashboard-data.ts        |  42 ++
 .../src/airflow/ui/tests/e2e/fixtures/data.ts      | 210 ++++++
 .../src/airflow/ui/tests/e2e/fixtures/index.ts     |  30 +
 .../src/airflow/ui/tests/e2e/fixtures/pom.ts       | 156 ++++
 .../ui/tests/e2e/fixtures/task-instances-data.ts   | 112 +++
 .../src/airflow/ui/tests/e2e/fixtures/xcom-data.ts |  79 ++
 .../src/airflow/ui/tests/e2e/global-teardown.ts    |  60 ++
 .../src/airflow/ui/tests/e2e/utils/test-helpers.ts | 797 +++++++++++++++++++++
 13 files changed, 1802 insertions(+)

diff --git a/airflow-core/src/airflow/ui/playwright.config.ts 
b/airflow-core/src/airflow/ui/playwright.config.ts
index 96373d2a097..02075f841f7 100644
--- a/airflow-core/src/airflow/ui/playwright.config.ts
+++ b/airflow-core/src/airflow/ui/playwright.config.ts
@@ -55,6 +55,7 @@ export default defineConfig({
   forbidOnly: process.env.CI !== undefined && process.env.CI !== "",
   fullyParallel: true,
   globalSetup: "./tests/e2e/global-setup.ts",
+  globalTeardown: "./tests/e2e/global-teardown.ts",
   projects: [
     {
       name: "chromium",
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/asset-data.ts 
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/asset-data.ts
new file mode 100644
index 00000000000..79b4e993c67
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/asset-data.ts
@@ -0,0 +1,66 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Asset data fixture — triggers asset_produces_1 DAG and waits for success.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a 
React Hook. */
+import { test as base } from "tests/e2e/fixtures";
+import {
+  safeCleanupDagRun,
+  apiTriggerDagRun,
+  waitForDagReady,
+  waitForDagRunStatus,
+} from "tests/e2e/utils/test-helpers";
+
+export type AssetData = {
+  dagId: string;
+};
+
+export const test = base.extend<Record<never, never>, { assetData: AssetData 
}>({
+  assetData: [
+    async ({ authenticatedRequest }, use, _workerInfo) => {
+      const dagId = "asset_produces_1";
+      let createdRunId: string | undefined;
+
+      try {
+        await waitForDagReady(authenticatedRequest, dagId);
+        await authenticatedRequest.patch(`/api/v2/dags/${dagId}`, { data: { 
is_paused: false } });
+        const { dagRunId } = await apiTriggerDagRun(authenticatedRequest, 
dagId);
+
+        createdRunId = dagRunId;
+
+        await waitForDagRunStatus(authenticatedRequest, {
+          dagId,
+          expectedState: "success",
+          runId: dagRunId,
+          timeout: 120_000,
+        });
+
+        await use({ dagId });
+      } finally {
+        if (createdRunId !== undefined) {
+          await safeCleanupDagRun(authenticatedRequest, dagId, createdRunId);
+        }
+      }
+    },
+    { scope: "worker", timeout: 180_000 },
+  ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/audit-log-data.ts 
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/audit-log-data.ts
new file mode 100644
index 00000000000..c160958aec1
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/audit-log-data.ts
@@ -0,0 +1,69 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Audit log data fixture — triggers DAG runs to generate audit log entries.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a 
React Hook. */
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import {
+  safeCleanupDagRun,
+  apiTriggerDagRun,
+  waitForDagReady,
+  waitForDagRunStatus,
+} from "tests/e2e/utils/test-helpers";
+
+export type AuditLogData = {
+  dagId: string;
+};
+
+export const test = base.extend<Record<never, never>, { auditLogData: 
AuditLogData }>({
+  auditLogData: [
+    async ({ authenticatedRequest }, use, _workerInfo) => {
+      const dagId = testConfig.testDag.id;
+      const createdRunIds: Array<string> = [];
+
+      try {
+        await waitForDagReady(authenticatedRequest, dagId);
+        await authenticatedRequest.patch(`/api/v2/dags/${dagId}`, { data: { 
is_paused: false } });
+
+        for (let i = 0; i < 3; i++) {
+          const { dagRunId } = await apiTriggerDagRun(authenticatedRequest, 
dagId);
+
+          createdRunIds.push(dagRunId);
+          await waitForDagRunStatus(authenticatedRequest, {
+            dagId,
+            expectedState: "success",
+            runId: dagRunId,
+            timeout: 60_000,
+          });
+        }
+
+        await use({ dagId });
+      } finally {
+        for (const runId of createdRunIds) {
+          await safeCleanupDagRun(authenticatedRequest, dagId, runId);
+        }
+      }
+    },
+    { scope: "worker", timeout: 180_000 },
+  ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/calendar-data.ts 
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/calendar-data.ts
new file mode 100644
index 00000000000..080f46be2f0
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/calendar-data.ts
@@ -0,0 +1,89 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Calendar data fixture — creates success + failed DAG runs for calendar 
tests.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a 
React Hook. */
+import dayjs from "dayjs";
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import {
+  apiCreateDagRun,
+  safeCleanupDagRun,
+  apiSetDagRunState,
+  uniqueRunId,
+  waitForDagReady,
+} from "tests/e2e/utils/test-helpers";
+
+export type CalendarRunsData = {
+  dagId: string;
+};
+
+export const test = base.extend<Record<never, never>, { calendarRunsData: 
CalendarRunsData }>({
+  calendarRunsData: [
+    async ({ authenticatedRequest }, use, workerInfo) => {
+      const dagId = testConfig.testDag.id;
+      const createdRunIds: Array<string> = [];
+
+      await waitForDagReady(authenticatedRequest, dagId);
+
+      const now = dayjs();
+      const yesterday = now.subtract(1, "day");
+      const baseDate = yesterday.isSame(now, "month") ? yesterday : now;
+
+      const workerHourOffset = workerInfo.parallelIndex * 2;
+      const successIso = baseDate
+        .startOf("day")
+        .hour(2 + workerHourOffset)
+        .toISOString();
+      const failedIso = baseDate
+        .startOf("day")
+        .hour(3 + workerHourOffset)
+        .toISOString();
+
+      const successRunId = uniqueRunId("cal_success");
+      const failedRunId = uniqueRunId("cal_failed");
+
+      try {
+        await apiCreateDagRun(authenticatedRequest, dagId, {
+          dag_run_id: successRunId,
+          logical_date: successIso,
+        });
+        createdRunIds.push(successRunId);
+        await apiSetDagRunState(authenticatedRequest, { dagId, runId: 
successRunId, state: "success" });
+
+        await apiCreateDagRun(authenticatedRequest, dagId, {
+          dag_run_id: failedRunId,
+          logical_date: failedIso,
+        });
+        createdRunIds.push(failedRunId);
+        await apiSetDagRunState(authenticatedRequest, { dagId, runId: 
failedRunId, state: "failed" });
+
+        await use({ dagId });
+      } finally {
+        for (const runId of createdRunIds) {
+          await safeCleanupDagRun(authenticatedRequest, dagId, runId);
+        }
+      }
+    },
+    { scope: "worker", timeout: 180_000 },
+  ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/dag-runs-data.ts 
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dag-runs-data.ts
new file mode 100644
index 00000000000..3c07bbefd4d
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dag-runs-data.ts
@@ -0,0 +1,91 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * DAG Runs page data fixture — creates runs across two DAGs for filtering 
tests.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a 
React Hook. */
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import {
+  apiCreateDagRun,
+  safeCleanupDagRun,
+  apiSetDagRunState,
+  uniqueRunId,
+  waitForDagReady,
+} from "tests/e2e/utils/test-helpers";
+
+export type DagRunsPageData = {
+  dag1Id: string;
+  dag2Id: string;
+};
+
+export const test = base.extend<Record<never, never>, { dagRunsPageData: 
DagRunsPageData }>({
+  dagRunsPageData: [
+    async ({ authenticatedRequest }, use, workerInfo) => {
+      const dag1Id = testConfig.testDag.id;
+      const dag2Id = "example_python_operator";
+      const createdRuns: Array<{ dagId: string; runId: string }> = [];
+
+      try {
+        await Promise.all([
+          waitForDagReady(authenticatedRequest, dag1Id),
+          waitForDagReady(authenticatedRequest, dag2Id),
+        ]);
+
+        const baseOffset = workerInfo.parallelIndex * 7_200_000;
+        const timestamp = Date.now() - baseOffset;
+
+        const runId1 = uniqueRunId("dagrun_failed");
+
+        await apiCreateDagRun(authenticatedRequest, dag1Id, {
+          dag_run_id: runId1,
+          logical_date: new Date(timestamp).toISOString(),
+        });
+        createdRuns.push({ dagId: dag1Id, runId: runId1 });
+        await apiSetDagRunState(authenticatedRequest, { dagId: dag1Id, runId: 
runId1, state: "failed" });
+
+        const runId2 = uniqueRunId("dagrun_success");
+
+        await apiCreateDagRun(authenticatedRequest, dag1Id, {
+          dag_run_id: runId2,
+          logical_date: new Date(timestamp + 60_000).toISOString(),
+        });
+        createdRuns.push({ dagId: dag1Id, runId: runId2 });
+        await apiSetDagRunState(authenticatedRequest, { dagId: dag1Id, runId: 
runId2, state: "success" });
+
+        const runId3 = uniqueRunId("dagrun_other");
+
+        await apiCreateDagRun(authenticatedRequest, dag2Id, {
+          dag_run_id: runId3,
+          logical_date: new Date(timestamp + 120_000).toISOString(),
+        });
+        createdRuns.push({ dagId: dag2Id, runId: runId3 });
+
+        await use({ dag1Id, dag2Id });
+      } finally {
+        for (const { dagId, runId } of createdRuns) {
+          await safeCleanupDagRun(authenticatedRequest, dagId, runId);
+        }
+      }
+    },
+    { scope: "worker", timeout: 120_000 },
+  ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts 
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
new file mode 100644
index 00000000000..f4431a2984e
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
@@ -0,0 +1,42 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Dashboard data fixture — tracks UI-triggered DAG runs for cleanup.
+ */
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import { safeCleanupDagRun } from "tests/e2e/utils/test-helpers";
+
+export type DagRunCleanup = {
+  track: (runId: string) => void;
+};
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a 
React Hook. */
+export const test = base.extend<{ dagRunCleanup: DagRunCleanup }>({
+  dagRunCleanup: async ({ authenticatedRequest }, use) => {
+    const trackedRunIds: Array<string> = [];
+
+    await use({ track: (runId: string) => trackedRunIds.push(runId) });
+
+    for (const runId of trackedRunIds) {
+      await safeCleanupDagRun(authenticatedRequest, testConfig.testDag.id, 
runId);
+    }
+  },
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/data.ts 
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/data.ts
new file mode 100644
index 00000000000..ad1e6d8b7b6
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/data.ts
@@ -0,0 +1,210 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Data-isolation fixtures for E2E tests.
+ *
+ * Extends the POM fixtures with worker-scoped and test-scoped data
+ * fixtures that create API resources and guarantee cleanup — even on
+ * test failure, timeout, or retry.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a 
React Hook. */
+import type { APIRequestContext } from "@playwright/test";
+
+import { testConfig } from "../../../playwright.config";
+import {
+  apiCreateDagRun,
+  apiSetDagRunState,
+  apiTriggerDagRun,
+  safeCleanupDagRun,
+  uniqueRunId,
+  waitForDagReady,
+  waitForDagRunStatus,
+} from "../utils/test-helpers";
+import { test as base } from "./pom";
+
+/** Shape returned by single DAG run fixtures. */
+export type DagRunFixtureData = {
+  dagId: string;
+  logicalDate: string;
+  runId: string;
+};
+
+/** Shape returned by the successAndFailedRuns fixture. */
+export type SuccessAndFailedRunsData = {
+  dagId: string;
+  failedRun: DagRunFixtureData;
+  successRun: DagRunFixtureData;
+};
+
+export type DataWorkerFixtures = {
+  /** Ensures the default test DAG is parsed and ready. Worker-scoped, no 
cleanup needed. */
+  dagReady: string;
+  /** A DAG run triggered via scheduler and completed. Worker-scoped with 
auto-cleanup. */
+  executedDagRun: DagRunFixtureData;
+  /** Two DAG runs: one success, one failed. Worker-scoped with auto-cleanup. 
*/
+  successAndFailedRuns: SuccessAndFailedRunsData;
+  /** A DAG run in "success" state (API-only, no scheduler). Worker-scoped 
with auto-cleanup. */
+  successDagRun: DagRunFixtureData;
+};
+
+export type DataTestFixtures = Record<never, never>;
+
+async function createAndSetupDagRun(
+  request: APIRequestContext,
+  dagId: string,
+  options: {
+    logicalDate?: string;
+    parallelIndex: number;
+    prefix: string;
+    state: "failed" | "queued" | "success";
+  },
+): Promise<DagRunFixtureData> {
+  await waitForDagReady(request, dagId);
+
+  const runId = uniqueRunId(`${options.prefix}_w${options.parallelIndex}`);
+
+  // Offset logical_date by 2 hours per worker to avoid collisions.
+  const offsetMs = options.parallelIndex * 7_200_000;
+  const logicalDate = options.logicalDate ?? new Date(Date.now() - 
offsetMs).toISOString();
+
+  await apiCreateDagRun(request, dagId, {
+    dag_run_id: runId,
+    logical_date: logicalDate,
+  });
+  await apiSetDagRunState(request, { dagId, runId, state: options.state });
+
+  return { dagId, logicalDate, runId };
+}
+
+async function cleanupMultipleRuns(
+  request: APIRequestContext,
+  runs: Array<{ dagId: string; runId: string }>,
+): Promise<void> {
+  for (const { dagId, runId } of runs) {
+    await safeCleanupDagRun(request, dagId, runId);
+  }
+}
+
+export const test = base.extend<DataTestFixtures, DataWorkerFixtures>({
+  dagReady: [
+    async ({ authenticatedRequest }, use) => {
+      const dagId = testConfig.testDag.id;
+
+      await waitForDagReady(authenticatedRequest, dagId);
+      await use(dagId);
+    },
+    { scope: "worker", timeout: 120_000 },
+  ],
+
+  executedDagRun: [
+    async ({ authenticatedRequest }, use) => {
+      const dagId = testConfig.testDag.id;
+      let dagRunId: string | undefined;
+
+      try {
+        await waitForDagReady(authenticatedRequest, dagId);
+        await authenticatedRequest.patch(`/api/v2/dags/${dagId}`, {
+          data: { is_paused: false },
+        });
+
+        const triggered = await apiTriggerDagRun(authenticatedRequest, dagId);
+
+        ({ dagRunId } = triggered);
+
+        await waitForDagRunStatus(authenticatedRequest, {
+          dagId,
+          expectedState: "success",
+          runId: dagRunId,
+          timeout: 120_000,
+        });
+
+        await use({ dagId, logicalDate: triggered.logicalDate, runId: dagRunId 
});
+      } finally {
+        if (dagRunId !== undefined) {
+          await safeCleanupDagRun(authenticatedRequest, dagId, dagRunId);
+        }
+        // Re-pause is handled by global-teardown.ts (globalTeardown in 
playwright.config.ts).
+      }
+    },
+    { scope: "worker", timeout: 180_000 },
+  ],
+
+  successAndFailedRuns: [
+    async ({ authenticatedRequest }, use, workerInfo) => {
+      const dagId = testConfig.testDag.id;
+      const createdRuns: Array<{ dagId: string; runId: string }> = [];
+
+      try {
+        await waitForDagReady(authenticatedRequest, dagId);
+
+        const baseOffset = workerInfo.parallelIndex * 7_200_000;
+        const timestamp = Date.now() - baseOffset;
+
+        const successRun = await createAndSetupDagRun(authenticatedRequest, 
dagId, {
+          logicalDate: new Date(timestamp).toISOString(),
+          parallelIndex: workerInfo.parallelIndex,
+          prefix: "sf_ok",
+          state: "success",
+        });
+
+        createdRuns.push({ dagId, runId: successRun.runId });
+
+        const failedRun = await createAndSetupDagRun(authenticatedRequest, 
dagId, {
+          logicalDate: new Date(timestamp + 60_000).toISOString(),
+          parallelIndex: workerInfo.parallelIndex,
+          prefix: "sf_fail",
+          state: "failed",
+        });
+
+        createdRuns.push({ dagId, runId: failedRun.runId });
+
+        await use({ dagId, failedRun, successRun });
+      } finally {
+        await cleanupMultipleRuns(authenticatedRequest, createdRuns);
+      }
+    },
+    { scope: "worker", timeout: 120_000 },
+  ],
+
+  successDagRun: [
+    async ({ authenticatedRequest }, use, workerInfo) => {
+      const dagId = testConfig.testDag.id;
+      let createdRunId: string | undefined;
+
+      try {
+        const data = await createAndSetupDagRun(authenticatedRequest, dagId, {
+          parallelIndex: workerInfo.parallelIndex,
+          prefix: "run",
+          state: "success",
+        });
+
+        createdRunId = data.runId;
+
+        await use(data);
+      } finally {
+        if (createdRunId !== undefined) {
+          await safeCleanupDagRun(authenticatedRequest, dagId, createdRunId);
+        }
+      }
+    },
+    { scope: "worker", timeout: 120_000 },
+  ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/index.ts 
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/index.ts
new file mode 100644
index 00000000000..bef87ad21a5
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/index.ts
@@ -0,0 +1,30 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Unified fixture entry point for E2E tests.
+ *
+ * Re-exports the fully extended `test` object (POM + data fixtures)
+ * so specs only need one import:
+ *
+ *   import { expect, test } from "tests/e2e/fixtures";
+ */
+export { test } from "./data";
+export type { DagRunFixtureData, SuccessAndFailedRunsData } from "./data";
+export { expect } from "@playwright/test";
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/pom.ts 
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/pom.ts
new file mode 100644
index 00000000000..d561be27faa
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/pom.ts
@@ -0,0 +1,156 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Page Object Model fixtures for E2E tests.
+ *
+ * Provides POM instances and the worker-scoped authenticatedRequest
+ * as test fixtures, eliminating manual `beforeEach` boilerplate.
+ */
+import { test as base, type APIRequestContext } from "@playwright/test";
+
+import { AUTH_FILE, testConfig } from "../../../playwright.config";
+import { AssetDetailPage } from "../pages/AssetDetailPage";
+import { AssetListPage } from "../pages/AssetListPage";
+import { BackfillPage } from "../pages/BackfillPage";
+import { ConfigurationPage } from "../pages/ConfigurationPage";
+import { ConnectionsPage } from "../pages/ConnectionsPage";
+import { DagCalendarTab } from "../pages/DagCalendarTab";
+import { DagCodePage } from "../pages/DagCodePage";
+import { DagRunsPage } from "../pages/DagRunsPage";
+import { DagRunsTabPage } from "../pages/DagRunsTabPage";
+import { DagsPage } from "../pages/DagsPage";
+import { EventsPage } from "../pages/EventsPage";
+import { GridPage } from "../pages/GridPage";
+import { HomePage } from "../pages/HomePage";
+import { PluginsPage } from "../pages/PluginsPage";
+import { PoolsPage } from "../pages/PoolsPage";
+import { ProvidersPage } from "../pages/ProvidersPage";
+import { RequiredActionsPage } from "../pages/RequiredActionsPage";
+import { TaskInstancePage } from "../pages/TaskInstancePage";
+import { TaskInstancesPage } from "../pages/TaskInstancesPage";
+import { VariablePage } from "../pages/VariablePage";
+import { XComsPage } from "../pages/XComsPage";
+
+export type PomWorkerFixtures = {
+  authenticatedRequest: APIRequestContext;
+};
+
+export type PomFixtures = {
+  assetDetailPage: AssetDetailPage;
+  assetListPage: AssetListPage;
+  backfillPage: BackfillPage;
+  configurationPage: ConfigurationPage;
+  connectionsPage: ConnectionsPage;
+  dagCalendarTab: DagCalendarTab;
+  dagCodePage: DagCodePage;
+  dagRunsPage: DagRunsPage;
+  dagRunsTabPage: DagRunsTabPage;
+  dagsPage: DagsPage;
+  eventsPage: EventsPage;
+  gridPage: GridPage;
+  homePage: HomePage;
+  pluginsPage: PluginsPage;
+  poolsPage: PoolsPage;
+  providersPage: ProvidersPage;
+  requiredActionsPage: RequiredActionsPage;
+  taskInstancePage: TaskInstancePage;
+  taskInstancesPage: TaskInstancesPage;
+  variablePage: VariablePage;
+  xcomsPage: XComsPage;
+};
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a 
React Hook. */
+export const test = base.extend<PomFixtures, PomWorkerFixtures>({
+  assetDetailPage: async ({ page }, use) => {
+    await use(new AssetDetailPage(page));
+  },
+  assetListPage: async ({ page }, use) => {
+    await use(new AssetListPage(page));
+  },
+  authenticatedRequest: [
+    async ({ playwright }, use) => {
+      const ctx = await playwright.request.newContext({
+        baseURL: testConfig.connection.baseUrl,
+        storageState: AUTH_FILE,
+      });
+
+      await use(ctx);
+      await ctx.dispose();
+    },
+    { scope: "worker" },
+  ],
+  backfillPage: async ({ page }, use) => {
+    await use(new BackfillPage(page));
+  },
+  configurationPage: async ({ page }, use) => {
+    await use(new ConfigurationPage(page));
+  },
+  connectionsPage: async ({ page }, use) => {
+    await use(new ConnectionsPage(page));
+  },
+  dagCalendarTab: async ({ page }, use) => {
+    await use(new DagCalendarTab(page));
+  },
+  dagCodePage: async ({ page }, use) => {
+    await use(new DagCodePage(page));
+  },
+  dagRunsPage: async ({ page }, use) => {
+    await use(new DagRunsPage(page));
+  },
+  dagRunsTabPage: async ({ page }, use) => {
+    await use(new DagRunsTabPage(page));
+  },
+  dagsPage: async ({ page }, use) => {
+    await use(new DagsPage(page));
+  },
+  eventsPage: async ({ page }, use) => {
+    await use(new EventsPage(page));
+  },
+  gridPage: async ({ page }, use) => {
+    await use(new GridPage(page));
+  },
+  homePage: async ({ page }, use) => {
+    await use(new HomePage(page));
+  },
+  pluginsPage: async ({ page }, use) => {
+    await use(new PluginsPage(page));
+  },
+  poolsPage: async ({ page }, use) => {
+    await use(new PoolsPage(page));
+  },
+  providersPage: async ({ page }, use) => {
+    await use(new ProvidersPage(page));
+  },
+  requiredActionsPage: async ({ page }, use) => {
+    await use(new RequiredActionsPage(page));
+  },
+  taskInstancePage: async ({ page }, use) => {
+    await use(new TaskInstancePage(page));
+  },
+  taskInstancesPage: async ({ page }, use) => {
+    await use(new TaskInstancesPage(page));
+  },
+  variablePage: async ({ page }, use) => {
+    await use(new VariablePage(page));
+  },
+  xcomsPage: async ({ page }, use) => {
+    await use(new XComsPage(page));
+  },
+});
diff --git 
a/airflow-core/src/airflow/ui/tests/e2e/fixtures/task-instances-data.ts 
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/task-instances-data.ts
new file mode 100644
index 00000000000..cde2f4e6aa2
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/task-instances-data.ts
@@ -0,0 +1,112 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Task instances data fixture — creates runs with success/failed task 
instances.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a 
React Hook. */
+import { expect, type APIRequestContext } from "@playwright/test";
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import {
+  apiCreateDagRun,
+  safeCleanupDagRun,
+  uniqueRunId,
+  waitForDagReady,
+} from "tests/e2e/utils/test-helpers";
+
+export type TaskInstancesData = {
+  dagId: string;
+};
+
+async function setAllTaskInstanceStates(
+  request: APIRequestContext,
+  options: { dagId: string; runId: string; state: string },
+): Promise<void> {
+  const { dagId, runId, state } = options;
+
+  const tasksResponse = await 
request.get(`/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances`);
+
+  expect(tasksResponse.ok()).toBeTruthy();
+
+  const tasksData = (await tasksResponse.json()) as {
+    task_instances: Array<{ task_id: string }>;
+  };
+
+  for (const task of tasksData.task_instances) {
+    await expect
+      .poll(
+        async () => {
+          const resp = await request.patch(
+            
`/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${task.task_id}`,
+            {
+              data: { new_state: state },
+              headers: { "Content-Type": "application/json" },
+              timeout: 30_000,
+            },
+          );
+
+          return resp.ok();
+        },
+        { intervals: [2000], timeout: 30_000 },
+      )
+      .toBe(true);
+  }
+}
+
+export const test = base.extend<Record<never, never>, { taskInstancesData: 
TaskInstancesData }>({
+  taskInstancesData: [
+    async ({ authenticatedRequest }, use, workerInfo) => {
+      const dagId = testConfig.testDag.id;
+      const createdRunIds: Array<string> = [];
+      const baseOffset = workerInfo.parallelIndex * 7_200_000;
+      const timestamp = Date.now() - baseOffset;
+
+      try {
+        await waitForDagReady(authenticatedRequest, dagId);
+
+        const runId1 = uniqueRunId("ti_success");
+
+        await apiCreateDagRun(authenticatedRequest, dagId, {
+          dag_run_id: runId1,
+          logical_date: new Date(timestamp).toISOString(),
+        });
+        createdRunIds.push(runId1);
+        await setAllTaskInstanceStates(authenticatedRequest, { dagId, runId: 
runId1, state: "success" });
+
+        const runId2 = uniqueRunId("ti_failed");
+
+        await apiCreateDagRun(authenticatedRequest, dagId, {
+          dag_run_id: runId2,
+          logical_date: new Date(timestamp + 60_000).toISOString(),
+        });
+        createdRunIds.push(runId2);
+        await setAllTaskInstanceStates(authenticatedRequest, { dagId, runId: 
runId2, state: "failed" });
+
+        await use({ dagId });
+      } finally {
+        for (const runId of createdRunIds) {
+          await safeCleanupDagRun(authenticatedRequest, dagId, runId);
+        }
+      }
+    },
+    { scope: "worker", timeout: 120_000 },
+  ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/xcom-data.ts 
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/xcom-data.ts
new file mode 100644
index 00000000000..81dab06f8d0
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/xcom-data.ts
@@ -0,0 +1,79 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * XCom data fixture — triggers example_xcom DAG runs to generate XCom entries.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a 
React Hook. */
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import {
+  apiCreateDagRun,
+  safeCleanupDagRun,
+  uniqueRunId,
+  waitForDagReady,
+  waitForDagRunStatus,
+} from "tests/e2e/utils/test-helpers";
+
+export type XcomRunsData = {
+  dagId: string;
+  xcomKey: string;
+};
+
+export const test = base.extend<Record<never, never>, { xcomRunsData: 
XcomRunsData }>({
+  xcomRunsData: [
+    async ({ authenticatedRequest }, use, workerInfo) => {
+      const dagId = testConfig.xcomDag.id;
+      const createdRunIds: Array<string> = [];
+      const triggerCount = 2;
+      const baseOffset = workerInfo.parallelIndex * 7_200_000;
+
+      try {
+        await waitForDagReady(authenticatedRequest, dagId);
+        await authenticatedRequest.patch(`/api/v2/dags/${dagId}`, {
+          data: { is_paused: false },
+        });
+
+        for (let i = 0; i < triggerCount; i++) {
+          const runId = uniqueRunId(`xcom_run_${i}`);
+
+          await apiCreateDagRun(authenticatedRequest, dagId, {
+            dag_run_id: runId,
+            logical_date: new Date(Date.now() - baseOffset + i * 
60_000).toISOString(),
+          });
+          createdRunIds.push(runId);
+          await waitForDagRunStatus(authenticatedRequest, {
+            dagId,
+            expectedState: "success",
+            runId,
+            timeout: 120_000,
+          });
+        }
+
+        await use({ dagId, xcomKey: "return_value" });
+      } finally {
+        for (const runId of createdRunIds) {
+          await safeCleanupDagRun(authenticatedRequest, dagId, runId);
+        }
+      }
+    },
+    { scope: "worker", timeout: 180_000 },
+  ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts 
b/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
new file mode 100644
index 00000000000..aae6cc588aa
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
@@ -0,0 +1,60 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { request } from "@playwright/test";
+
+import { AUTH_FILE, testConfig } from "../../playwright.config";
+
+/**
+ * Re-pause all DAGs that E2E fixtures may have unpaused during the test run.
+ * Runs once after all workers have finished, preventing the scheduler from
+ * creating unbounded DAG runs in shared environments.
+ */
+async function globalTeardown() {
+  const baseURL = testConfig.connection.baseUrl;
+
+  const context = await request.newContext({
+    baseURL,
+    storageState: AUTH_FILE,
+  });
+
+  const dagIds = [
+    testConfig.testDag.id,
+    testConfig.testDag.hitlId,
+    testConfig.xcomDag.id,
+    "asset_produces_1",
+  ];
+
+  for (const dagId of dagIds) {
+    try {
+      await context.patch(`/api/v2/dags/${dagId}`, {
+        data: { is_paused: true },
+        headers: { "Content-Type": "application/json" },
+        timeout: 10_000,
+      });
+    } catch (error) {
+      console.warn(
+        `[e2e teardown] Failed to re-pause DAG ${dagId}: ${error instanceof 
Error ? error.message : String(error)}`,
+      );
+    }
+  }
+
+  await context.dispose();
+}
+
+export default globalTeardown;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts 
b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
new file mode 100644
index 00000000000..3f651289567
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
@@ -0,0 +1,797 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Shared E2E test utilities for data isolation and API-based setup.
+ */
+import { expect, type APIRequestContext, type Locator, type Page } from 
"@playwright/test";
+import { randomUUID } from "node:crypto";
+import { testConfig } from "playwright.config";
+
+type RequestLike = APIRequestContext | Page;
+
+type DagRunData = {
+  conf?: Record<string, unknown>;
+  dag_run_id: string;
+  logical_date: string;
+  note?: string;
+};
+
+type DagRunResponse = {
+  dag_run_id: string;
+  state: string;
+};
+
+function getRequestContext(source: RequestLike): APIRequestContext {
+  if ("request" in source) {
+    return source.request;
+  }
+
+  return source;
+}
+
+const { baseUrl } = testConfig.connection;
+
+/** Generate a unique run ID: `{prefix}_{uuid8}`. */
+export function uniqueRunId(prefix: string): string {
+  return `${prefix}_${randomUUID().slice(0, 8)}`;
+}
+
+/**
+ * Poll GET /api/v2/dags/{dagId} until 200 — waits for DAG to be parsed.
+ */
+export async function waitForDagReady(
+  source: RequestLike,
+  dagId: string,
+  options?: { timeout?: number },
+): Promise<void> {
+  const request = getRequestContext(source);
+
+  const timeout = options?.timeout ?? 120_000;
+
+  await expect
+    .poll(
+      async () => {
+        try {
+          const response = await 
request.get(`${baseUrl}/api/v2/dags/${dagId}`, { timeout: 30_000 });
+
+          return response.ok();
+        } catch {
+          return false;
+        }
+      },
+      { intervals: [2000], timeout },
+    )
+    .toBe(true);
+}
+
+/**
+ * Trigger a DAG run with an auto-generated unique run ID.
+ * Returns the dagRunId and logicalDate for targeted polling.
+ */
+export async function apiTriggerDagRun(
+  source: RequestLike,
+  dagId: string,
+  options?: { runId?: string },
+): Promise<{ dagRunId: string; logicalDate: string }> {
+  const request = getRequestContext(source);
+
+  let dagRunId = options?.runId ?? uniqueRunId(dagId);
+  let resultLogicalDate = new Date().toISOString();
+
+  await expect(async () => {
+    // Generate a fresh logicalDate on each attempt so that a 409
+    // (logical_date collision from a parallel worker) is recoverable.
+    const logicalDate = new Date().toISOString();
+
+    const response = await 
request.post(`${baseUrl}/api/v2/dags/${dagId}/dagRuns`, {
+      data: {
+        dag_run_id: dagRunId,
+        logical_date: logicalDate,
+        note: "e2e test",
+      },
+      headers: { "Content-Type": "application/json" },
+      timeout: 30_000,
+    });
+
+    if (!response.ok()) {
+      // On 409, regenerate dag_run_id (unless caller pinned it) so the
+      // next attempt doesn't collide on either dag_run_id or logical_date.
+      if (response.status() === 409 && options?.runId === undefined) {
+        dagRunId = uniqueRunId(dagId);
+      }
+
+      throw new Error(`DAG run trigger failed (${response.status()})`);
+    }
+
+    const json = (await response.json()) as { logical_date?: string } & 
DagRunResponse;
+
+    resultLogicalDate = json.logical_date ?? logicalDate;
+  }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+
+  return { dagRunId, logicalDate: resultLogicalDate };
+}
+
+/**
+ * Create a DAG run via the API.
+ */
+export async function apiCreateDagRun(source: RequestLike, dagId: string, 
data: DagRunData): Promise<string> {
+  const request = getRequestContext(source);
+  let resultRunId = data.dag_run_id;
+
+  // Track fallback values that are regenerated on 409 collisions,
+  // without mutating the caller's `data` parameter.
+  let retryRunId: string | undefined;
+  let retryLogicalDate: string | undefined;
+
+  await expect(async () => {
+    const runId = retryRunId ?? data.dag_run_id;
+    const logicalDate = retryLogicalDate ?? data.logical_date;
+
+    const response = await 
request.post(`${baseUrl}/api/v2/dags/${dagId}/dagRuns`, {
+      data: {
+        conf: data.conf ?? {},
+        dag_run_id: runId,
+        logical_date: logicalDate,
+        note: data.note ?? "e2e test",
+      },
+      headers: { "Content-Type": "application/json" },
+      timeout: 30_000,
+    });
+
+    if (!response.ok()) {
+      // On 409, generate fresh dag_run_id and logical_date for the next retry.
+      if (response.status() === 409) {
+        retryRunId = uniqueRunId(dagId);
+        retryLogicalDate = new Date().toISOString();
+      }
+
+      throw new Error(`DAG run creation failed (${response.status()})`);
+    }
+
+    const json = (await response.json()) as DagRunResponse;
+
+    resultRunId = json.dag_run_id;
+  }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+
+  return resultRunId;
+}
+
+/**
+ * Set a DAG run's state via the API.
+ */
+export async function apiSetDagRunState(
+  source: RequestLike,
+  options: { dagId: string; runId: string; state: "failed" | "queued" | 
"success" },
+): Promise<void> {
+  const { dagId, runId, state } = options;
+  const request = getRequestContext(source);
+
+  await expect(async () => {
+    const response = await 
request.patch(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+      data: { state },
+      headers: { "Content-Type": "application/json" },
+      timeout: 30_000,
+    });
+
+    if (response.status() !== 409 && !response.ok()) {
+      throw new Error(`Set DAG run state failed (${response.status()})`);
+    }
+  }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+}
+
+/**
+ * Poll the API until the DAG run reaches the expected state.
+ */
+export async function waitForDagRunStatus(
+  source: RequestLike,
+  options: { dagId: string; expectedState: string; runId: string; timeout?: 
number },
+): Promise<void> {
+  const { dagId, expectedState, runId } = options;
+  const request = getRequestContext(source);
+
+  const timeout = options.timeout ?? 120_000;
+
+  await expect
+    .poll(
+      async () => {
+        try {
+          const response = await 
request.get(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+            timeout: 30_000,
+          });
+
+          if (!response.ok()) {
+            return `unknown (HTTP ${response.status()})`;
+          }
+
+          const data = (await response.json()) as DagRunResponse;
+
+          if (data.state === "failed" && expectedState !== "failed") {
+            throw new Error(`DAG run ${runId} failed unexpectedly`);
+          }
+
+          return data.state;
+        } catch (error) {
+          // Re-throw intentional failures (unexpected "failed" state).
+          if (error instanceof Error && error.message.includes("failed 
unexpectedly")) {
+            throw error;
+          }
+
+          // Transient network/timeout errors — retry on next interval.
+          return `unknown (${error instanceof Error ? error.message : "network 
error"})`;
+        }
+      },
+      {
+        intervals: [5000],
+        message: `DAG run ${runId} did not reach state "${expectedState}" 
within ${timeout}ms`,
+        timeout,
+      },
+    )
+    .toBe(expectedState);
+}
+
+/**
+ * Poll the API until a task instance reaches the expected state.
+ * Unlike waitForDagRunStatus, this targets a specific task within a DAG run.
+ */
+export async function waitForTaskInstanceState(
+  source: RequestLike,
+  options: {
+    dagId: string;
+    expectedState: string;
+    runId: string;
+    taskId: string;
+    timeout?: number;
+  },
+): Promise<void> {
+  const { dagId, expectedState, runId, taskId } = options;
+  const request = getRequestContext(source);
+
+  const timeout = options.timeout ?? 120_000;
+  const terminalStates = new Set(["success", "failed", "skipped", "removed", 
"upstream_failed"]);
+
+  await expect
+    .poll(
+      async () => {
+        try {
+          const response = await request.get(
+            
`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${taskId}`,
+            { timeout: 30_000 },
+          );
+
+          if (!response.ok()) {
+            return "unknown";
+          }
+
+          const data = (await response.json()) as { state: string };
+          const { state } = data;
+          const expected = expectedState.toLowerCase();
+
+          if (state !== expected && terminalStates.has(state)) {
+            throw new Error(`Task ${taskId} reached terminal state "${state}" 
instead of "${expected}"`);
+          }
+
+          return state;
+        } catch (error) {
+          if (error instanceof Error && error.message.includes("terminal 
state")) {
+            throw error;
+          }
+
+          return "unknown";
+        }
+      },
+      {
+        intervals: [3000, 5000],
+        message: `Task ${taskId} did not reach state "${expectedState}" within 
${timeout}ms`,
+        timeout,
+      },
+    )
+    .toBe(expectedState.toLowerCase());
+}
+
+/**
+ * Respond to a HITL (Human-in-the-Loop) task via the API.
+ * 409 is treated as success (already responded).
+ */
+export async function apiRespondToHITL(
+  source: RequestLike,
+  options: {
+    chosenOptions: Array<string>;
+    dagId: string;
+    mapIndex?: number;
+    paramsInput?: Record<string, unknown>;
+    runId: string;
+    taskId: string;
+  },
+): Promise<void> {
+  const { chosenOptions, dagId, runId, taskId } = options;
+  const mapIndex = options.mapIndex ?? -1;
+  const paramsInput = options.paramsInput ?? {};
+  const request = getRequestContext(source);
+
+  await expect(async () => {
+    const response = await request.patch(
+      
`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${taskId}/${mapIndex}/hitlDetails`,
+      {
+        data: { chosen_options: chosenOptions, params_input: paramsInput },
+        headers: { "Content-Type": "application/json" },
+        timeout: 30_000,
+      },
+    );
+
+    // 409 = already responded; acceptable.
+    if (response.status() !== 409 && !response.ok()) {
+      throw new Error(`HITL response failed (${response.status()})`);
+    }
+  }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+}
+
+/**
+ * Run the full HITL flow entirely via API — no browser needed.
+ *
+ * The example_hitl_operator DAG has 4 parallel HITL tasks, then an approval
+ * task, then a branch task. This function triggers the DAG, responds to each
+ * task via the API, and waits for the DAG run to complete.
+ */
+export async function setupHITLFlowViaAPI(
+  source: RequestLike,
+  dagId: string,
+  approve: boolean,
+): Promise<string> {
+  const request = getRequestContext(source);
+
+  await waitForDagReady(request, dagId);
+  await request.patch(`${baseUrl}/api/v2/dags/${dagId}`, { data: { is_paused: 
false } });
+
+  const { dagRunId } = await apiTriggerDagRun(request, dagId);
+
+  // wait_for_default_option auto-resolves (1s timeout, defaults=["option 7"]).
+  await waitForTaskInstanceState(request, {
+    dagId,
+    expectedState: "success",
+    runId: dagRunId,
+    taskId: "wait_for_default_option",
+  });
+
+  await waitForTaskInstanceState(request, {
+    dagId,
+    expectedState: "deferred",
+    runId: dagRunId,
+    taskId: "wait_for_input",
+  });
+  await apiRespondToHITL(request, {
+    chosenOptions: ["OK"],
+    dagId,
+    paramsInput: { information: "Approved by test" },
+    runId: dagRunId,
+    taskId: "wait_for_input",
+  });
+
+  await waitForTaskInstanceState(request, {
+    dagId,
+    expectedState: "deferred",
+    runId: dagRunId,
+    taskId: "wait_for_option",
+  });
+  await apiRespondToHITL(request, {
+    chosenOptions: ["option 1"],
+    dagId,
+    runId: dagRunId,
+    taskId: "wait_for_option",
+  });
+
+  await waitForTaskInstanceState(request, {
+    dagId,
+    expectedState: "deferred",
+    runId: dagRunId,
+    taskId: "wait_for_multiple_options",
+  });
+  await apiRespondToHITL(request, {
+    chosenOptions: ["option 4", "option 5"],
+    dagId,
+    runId: dagRunId,
+    taskId: "wait_for_multiple_options",
+  });
+
+  // Wait for all parallel tasks to succeed before the approval task starts.
+  await waitForTaskInstanceState(request, {
+    dagId,
+    expectedState: "success",
+    runId: dagRunId,
+    taskId: "wait_for_input",
+  });
+  await waitForTaskInstanceState(request, {
+    dagId,
+    expectedState: "success",
+    runId: dagRunId,
+    taskId: "wait_for_option",
+  });
+  await waitForTaskInstanceState(request, {
+    dagId,
+    expectedState: "success",
+    runId: dagRunId,
+    taskId: "wait_for_multiple_options",
+  });
+
+  await waitForTaskInstanceState(request, {
+    dagId,
+    expectedState: "deferred",
+    runId: dagRunId,
+    taskId: "valid_input_and_options",
+  });
+  await apiRespondToHITL(request, {
+    chosenOptions: [approve ? "Approve" : "Reject"],
+    dagId,
+    runId: dagRunId,
+    taskId: "valid_input_and_options",
+  });
+  await waitForTaskInstanceState(request, {
+    dagId,
+    expectedState: "success",
+    runId: dagRunId,
+    taskId: "valid_input_and_options",
+  });
+
+  if (approve) {
+    await waitForTaskInstanceState(request, {
+      dagId,
+      expectedState: "deferred",
+      runId: dagRunId,
+      taskId: "choose_a_branch_to_run",
+    });
+    await apiRespondToHITL(request, {
+      chosenOptions: ["task_1"],
+      dagId,
+      runId: dagRunId,
+      taskId: "choose_a_branch_to_run",
+    });
+  }
+
+  await waitForDagRunStatus(request, {
+    dagId,
+    expectedState: "success",
+    runId: dagRunId,
+    timeout: 120_000,
+  });
+
+  return dagRunId;
+}
+
+/** Delete a DAG run via the API. 404 is treated as success. */
+export async function apiDeleteDagRun(source: RequestLike, dagId: string, 
runId: string): Promise<void> {
+  const request = getRequestContext(source);
+
+  const response = await 
request.delete(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+    timeout: 30_000,
+  });
+
+  // 404 = already deleted by another worker or cleanup; acceptable.
+  if (response.status() === 404) {
+    return;
+  }
+
+  if (!response.ok()) {
+    const body = await response.text();
+
+    throw new Error(`DAG run deletion failed (${response.status()}): ${body}`);
+  }
+}
+
+/**
+ * Delete a DAG run, logging (not throwing) unexpected errors.
+ * Use this in fixture teardown where cleanup must not abort the loop.
+ * 404 is already handled inside `apiDeleteDagRun`.
+ * 409 (running state) is handled by force-failing the run first, then 
retrying.
+ */
+export async function safeCleanupDagRun(source: RequestLike, dagId: string, 
runId: string): Promise<void> {
+  try {
+    await apiDeleteDagRun(source, dagId, runId);
+  } catch (error) {
+    const message = error instanceof Error ? error.message : String(error);
+
+    // 409 = DAG run is still running — force-fail, then retry deletion.
+    if (message.includes("409")) {
+      try {
+        await apiSetDagRunState(source, { dagId, runId, state: "failed" });
+        await apiDeleteDagRun(source, dagId, runId);
+      } catch (retryError) {
+        console.warn(
+          `[e2e cleanup] Retry failed for DAG run ${dagId}/${runId}: 
${retryError instanceof Error ? retryError.message : String(retryError)}`,
+        );
+      }
+
+      return;
+    }
+
+    console.warn(`[e2e cleanup] Failed to delete DAG run ${dagId}/${runId}: 
${message}`);
+  }
+}
+
+/** Create a variable via the API. 409 is treated as success. */
+export async function apiCreateVariable(
+  source: RequestLike,
+  options: { description?: string; key: string; value: string },
+): Promise<void> {
+  const { description, key, value } = options;
+  const request = getRequestContext(source);
+
+  await expect(async () => {
+    const response = await request.post(`${baseUrl}/api/v2/variables`, {
+      data: { description: description ?? "", key, value },
+      headers: { "Content-Type": "application/json" },
+      timeout: 10_000,
+    });
+
+    if (response.status() !== 409 && !response.ok()) {
+      throw new Error(`Variable creation failed (${response.status()})`);
+    }
+  }).toPass({ intervals: [2000, 3000, 5000], timeout: 90_000 });
+}
+
+/** Delete a variable via the API. 404 is treated as success. */
+export async function apiDeleteVariable(source: RequestLike, key: string): 
Promise<void> {
+  const request = getRequestContext(source);
+
+  const response = await 
request.delete(`${baseUrl}/api/v2/variables/${encodeURIComponent(key)}`, {
+    timeout: 30_000,
+  });
+
+  // 404 = already deleted by another worker or cleanup; acceptable.
+  if (response.status() === 404) {
+    return;
+  }
+
+  if (!response.ok()) {
+    const body = await response.text();
+
+    throw new Error(`Variable deletion failed (${response.status()}): 
${body}`);
+  }
+}
+
+/** Cancel a single backfill via the API. 409 (already completed) is treated 
as success. */
+export async function apiCancelBackfill(source: RequestLike, backfillId: 
number): Promise<void> {
+  const request = getRequestContext(source);
+
+  const response = await 
request.put(`${baseUrl}/api/v2/backfills/${backfillId}/cancel`, {
+    timeout: 30_000,
+  });
+
+  if (response.status() !== 200 && response.status() !== 409) {
+    throw new Error(`Cancel backfill failed (${response.status()})`);
+  }
+}
+
+/** Cancel all active (non-completed) backfills for a DAG. */
+export async function apiCancelAllActiveBackfills(source: RequestLike, dagId: 
string): Promise<void> {
+  const request = getRequestContext(source);
+
+  const response = await 
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
+    timeout: 30_000,
+  });
+
+  if (!response.ok()) {
+    throw new Error(`List backfills failed (${response.status()})`);
+  }
+
+  const data = (await response.json()) as { backfills: Array<{ completed_at: 
string | null; id: number }> };
+
+  for (const backfill of data.backfills) {
+    if (backfill.completed_at === null) {
+      await apiCancelBackfill(source, backfill.id);
+    }
+  }
+}
+
+/** Poll until all backfills for a DAG are completed. */
+export async function apiWaitForNoActiveBackfill(
+  source: RequestLike,
+  dagId: string,
+  timeout: number = 120_000,
+): Promise<void> {
+  const request = getRequestContext(source);
+
+  await expect
+    .poll(
+      async () => {
+        try {
+          const response = await 
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
+            timeout: 30_000,
+          });
+
+          if (!response.ok()) {
+            return false;
+          }
+
+          const data = (await response.json()) as {
+            backfills: Array<{ completed_at: string | null }>;
+          };
+
+          return data.backfills.every((b) => b.completed_at !== null);
+        } catch {
+          return false;
+        }
+      },
+      {
+        intervals: [2000, 5000, 10_000],
+        message: `Active backfills for DAG ${dagId} did not clear within 
${timeout}ms`,
+        timeout,
+      },
+    )
+    .toBeTruthy();
+}
+
+/** Poll until a backfill reaches completed state. */
+export async function apiWaitForBackfillComplete(
+  source: RequestLike,
+  backfillId: number,
+  timeout: number = 120_000,
+): Promise<void> {
+  const request = getRequestContext(source);
+
+  await expect
+    .poll(
+      async () => {
+        try {
+          const response = await 
request.get(`${baseUrl}/api/v2/backfills/${backfillId}`, {
+            timeout: 30_000,
+          });
+
+          if (!response.ok()) {
+            return false;
+          }
+
+          const data = (await response.json()) as { completed_at: string | 
null };
+
+          return data.completed_at !== null;
+        } catch {
+          return false;
+        }
+      },
+      {
+        intervals: [2000, 5000, 10_000],
+        message: `Backfill ${backfillId} did not complete within ${timeout}ms`,
+        timeout,
+      },
+    )
+    .toBeTruthy();
+}
+
+/** Create a backfill via the API. On 409, cancels active backfills and 
retries once. */
+export async function apiCreateBackfill(
+  source: RequestLike,
+  dagId: string,
+  options: {
+    fromDate: string;
+    maxActiveRuns?: number;
+    reprocessBehavior?: string;
+    toDate: string;
+  },
+): Promise<number> {
+  const request = getRequestContext(source);
+  const { fromDate, maxActiveRuns, reprocessBehavior = "none", toDate } = 
options;
+
+  const body: Record<string, unknown> = {
+    dag_id: dagId,
+    from_date: fromDate,
+    reprocess_behavior: reprocessBehavior,
+    to_date: toDate,
+  };
+
+  if (maxActiveRuns !== undefined) {
+    body.max_active_runs = maxActiveRuns;
+  }
+
+  const response = await request.post(`${baseUrl}/api/v2/backfills`, {
+    data: body,
+    headers: { "Content-Type": "application/json" },
+    timeout: 30_000,
+  });
+
+  if (response.status() === 409) {
+    await apiCancelAllActiveBackfills(source, dagId);
+    await apiWaitForNoActiveBackfill(source, dagId, 30_000);
+
+    const retryResponse = await request.post(`${baseUrl}/api/v2/backfills`, {
+      data: body,
+      headers: { "Content-Type": "application/json" },
+      timeout: 30_000,
+    });
+
+    if (!retryResponse.ok()) {
+      throw new Error(`Backfill creation retry failed 
(${retryResponse.status()})`);
+    }
+
+    return ((await retryResponse.json()) as { id: number }).id;
+  }
+
+  if (!response.ok()) {
+    throw new Error(`Backfill creation failed (${response.status()})`);
+  }
+
+  return ((await response.json()) as { id: number }).id;
+}
+
+/**
+ * Wait for a table (by testId) to load and show at least one row or an empty 
message.
+ */
+export async function waitForTableLoad(
+  page: Page,
+  options?: { checkSkeletons?: boolean; testId?: string; timeout?: number },
+): Promise<void> {
+  const testId = options?.testId ?? "table-list";
+  const timeout = options?.timeout ?? 30_000;
+
+  const table = page.getByTestId(testId);
+
+  await expect(table).toBeVisible({ timeout });
+
+  // Skip skeleton check by default — XComs uses [data-scope="skeleton"] for
+  // lazy-loaded cell content, not table loading, which would cause false 
waits.
+  if (options?.checkSkeletons) {
+    await expect(table.locator('[data-testid="skeleton"], 
[data-scope="skeleton"]')).toHaveCount(0, {
+      timeout,
+    });
+  }
+
+  const firstRow = table.locator("tbody tr").first();
+  const emptyMessage = page.getByText(/no .* found/i);
+
+  await expect(firstRow.or(emptyMessage)).toBeVisible({ timeout });
+}
+
+/**
+ * Wait for DOM row count to stabilize across consecutive measurements.
+ * Uses 500ms intervals to give React concurrent rendering time to settle.
+ */
+export async function waitForStableRowCount(
+  rowLocator: Locator,
+  options?: { timeout?: number },
+): Promise<number> {
+  const timeout = options?.timeout ?? 10_000;
+  const requiredStableChecks = 2;
+  let lastStableCount = 0;
+
+  await expect
+    .poll(
+      async () => {
+        const counts: Array<number> = [];
+
+        for (let i = 0; i < requiredStableChecks + 1; i++) {
+          counts.push(await rowLocator.count());
+
+          if (i < requiredStableChecks) {
+            await new Promise((resolve) => setTimeout(resolve, 500));
+          }
+        }
+
+        const first = counts[0] ?? 0;
+        const allSame = counts.length > 0 && first > 0 && counts.every((c) => 
c === first);
+
+        if (allSame) {
+          lastStableCount = first;
+        }
+
+        return allSame;
+      },
+      { intervals: [500], timeout },
+    )
+    .toBe(true);
+
+  return lastStableCount;
+}

Reply via email to