This is an automated email from the ASF dual-hosted git repository.
choo121600 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 21f5f9353f3 Add shared test fixtures and API-based data isolation
(#64202)
21f5f9353f3 is described below
commit 21f5f9353f37db61d4765605c81dec4faf0640e4
Author: Yeonguk Choo <[email protected]>
AuthorDate: Sun Mar 29 22:31:02 2026 +0900
Add shared test fixtures and API-based data isolation (#64202)
---
airflow-core/src/airflow/ui/playwright.config.ts | 1 +
.../airflow/ui/tests/e2e/fixtures/asset-data.ts | 66 ++
.../ui/tests/e2e/fixtures/audit-log-data.ts | 69 ++
.../airflow/ui/tests/e2e/fixtures/calendar-data.ts | 89 +++
.../airflow/ui/tests/e2e/fixtures/dag-runs-data.ts | 91 +++
.../ui/tests/e2e/fixtures/dashboard-data.ts | 42 ++
.../src/airflow/ui/tests/e2e/fixtures/data.ts | 210 ++++++
.../src/airflow/ui/tests/e2e/fixtures/index.ts | 30 +
.../src/airflow/ui/tests/e2e/fixtures/pom.ts | 156 ++++
.../ui/tests/e2e/fixtures/task-instances-data.ts | 112 +++
.../src/airflow/ui/tests/e2e/fixtures/xcom-data.ts | 79 ++
.../src/airflow/ui/tests/e2e/global-teardown.ts | 60 ++
.../src/airflow/ui/tests/e2e/utils/test-helpers.ts | 797 +++++++++++++++++++++
13 files changed, 1802 insertions(+)
diff --git a/airflow-core/src/airflow/ui/playwright.config.ts
b/airflow-core/src/airflow/ui/playwright.config.ts
index 96373d2a097..02075f841f7 100644
--- a/airflow-core/src/airflow/ui/playwright.config.ts
+++ b/airflow-core/src/airflow/ui/playwright.config.ts
@@ -55,6 +55,7 @@ export default defineConfig({
forbidOnly: process.env.CI !== undefined && process.env.CI !== "",
fullyParallel: true,
globalSetup: "./tests/e2e/global-setup.ts",
+ globalTeardown: "./tests/e2e/global-teardown.ts",
projects: [
{
name: "chromium",
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/asset-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/asset-data.ts
new file mode 100644
index 00000000000..79b4e993c67
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/asset-data.ts
@@ -0,0 +1,66 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Asset data fixture — triggers asset_produces_1 DAG and waits for success.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a
React Hook. */
+import { test as base } from "tests/e2e/fixtures";
+import {
+ safeCleanupDagRun,
+ apiTriggerDagRun,
+ waitForDagReady,
+ waitForDagRunStatus,
+} from "tests/e2e/utils/test-helpers";
+
+export type AssetData = {
+ dagId: string;
+};
+
+export const test = base.extend<Record<never, never>, { assetData: AssetData
}>({
+ assetData: [
+ async ({ authenticatedRequest }, use, _workerInfo) => {
+ const dagId = "asset_produces_1";
+ let createdRunId: string | undefined;
+
+ try {
+ await waitForDagReady(authenticatedRequest, dagId);
+ await authenticatedRequest.patch(`/api/v2/dags/${dagId}`, { data: {
is_paused: false } });
+ const { dagRunId } = await apiTriggerDagRun(authenticatedRequest,
dagId);
+
+ createdRunId = dagRunId;
+
+ await waitForDagRunStatus(authenticatedRequest, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ timeout: 120_000,
+ });
+
+ await use({ dagId });
+ } finally {
+ if (createdRunId !== undefined) {
+ await safeCleanupDagRun(authenticatedRequest, dagId, createdRunId);
+ }
+ }
+ },
+ { scope: "worker", timeout: 180_000 },
+ ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/audit-log-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/audit-log-data.ts
new file mode 100644
index 00000000000..c160958aec1
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/audit-log-data.ts
@@ -0,0 +1,69 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Audit log data fixture — triggers DAG runs to generate audit log entries.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a
React Hook. */
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import {
+ safeCleanupDagRun,
+ apiTriggerDagRun,
+ waitForDagReady,
+ waitForDagRunStatus,
+} from "tests/e2e/utils/test-helpers";
+
+export type AuditLogData = {
+ dagId: string;
+};
+
+export const test = base.extend<Record<never, never>, { auditLogData:
AuditLogData }>({
+ auditLogData: [
+ async ({ authenticatedRequest }, use, _workerInfo) => {
+ const dagId = testConfig.testDag.id;
+ const createdRunIds: Array<string> = [];
+
+ try {
+ await waitForDagReady(authenticatedRequest, dagId);
+ await authenticatedRequest.patch(`/api/v2/dags/${dagId}`, { data: {
is_paused: false } });
+
+ for (let i = 0; i < 3; i++) {
+ const { dagRunId } = await apiTriggerDagRun(authenticatedRequest,
dagId);
+
+ createdRunIds.push(dagRunId);
+ await waitForDagRunStatus(authenticatedRequest, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ timeout: 60_000,
+ });
+ }
+
+ await use({ dagId });
+ } finally {
+ for (const runId of createdRunIds) {
+ await safeCleanupDagRun(authenticatedRequest, dagId, runId);
+ }
+ }
+ },
+ { scope: "worker", timeout: 180_000 },
+ ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/calendar-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/calendar-data.ts
new file mode 100644
index 00000000000..080f46be2f0
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/calendar-data.ts
@@ -0,0 +1,89 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Calendar data fixture — creates success + failed DAG runs for calendar
tests.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a
React Hook. */
+import dayjs from "dayjs";
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import {
+ apiCreateDagRun,
+ safeCleanupDagRun,
+ apiSetDagRunState,
+ uniqueRunId,
+ waitForDagReady,
+} from "tests/e2e/utils/test-helpers";
+
+export type CalendarRunsData = {
+ dagId: string;
+};
+
+export const test = base.extend<Record<never, never>, { calendarRunsData:
CalendarRunsData }>({
+ calendarRunsData: [
+ async ({ authenticatedRequest }, use, workerInfo) => {
+ const dagId = testConfig.testDag.id;
+ const createdRunIds: Array<string> = [];
+
+ await waitForDagReady(authenticatedRequest, dagId);
+
+ const now = dayjs();
+ const yesterday = now.subtract(1, "day");
+ const baseDate = yesterday.isSame(now, "month") ? yesterday : now;
+
+ const workerHourOffset = workerInfo.parallelIndex * 2;
+ const successIso = baseDate
+ .startOf("day")
+ .hour(2 + workerHourOffset)
+ .toISOString();
+ const failedIso = baseDate
+ .startOf("day")
+ .hour(3 + workerHourOffset)
+ .toISOString();
+
+ const successRunId = uniqueRunId("cal_success");
+ const failedRunId = uniqueRunId("cal_failed");
+
+ try {
+ await apiCreateDagRun(authenticatedRequest, dagId, {
+ dag_run_id: successRunId,
+ logical_date: successIso,
+ });
+ createdRunIds.push(successRunId);
+ await apiSetDagRunState(authenticatedRequest, { dagId, runId:
successRunId, state: "success" });
+
+ await apiCreateDagRun(authenticatedRequest, dagId, {
+ dag_run_id: failedRunId,
+ logical_date: failedIso,
+ });
+ createdRunIds.push(failedRunId);
+ await apiSetDagRunState(authenticatedRequest, { dagId, runId:
failedRunId, state: "failed" });
+
+ await use({ dagId });
+ } finally {
+ for (const runId of createdRunIds) {
+ await safeCleanupDagRun(authenticatedRequest, dagId, runId);
+ }
+ }
+ },
+ { scope: "worker", timeout: 180_000 },
+ ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/dag-runs-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dag-runs-data.ts
new file mode 100644
index 00000000000..3c07bbefd4d
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dag-runs-data.ts
@@ -0,0 +1,91 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * DAG Runs page data fixture — creates runs across two DAGs for filtering
tests.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a
React Hook. */
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import {
+ apiCreateDagRun,
+ safeCleanupDagRun,
+ apiSetDagRunState,
+ uniqueRunId,
+ waitForDagReady,
+} from "tests/e2e/utils/test-helpers";
+
+export type DagRunsPageData = {
+ dag1Id: string;
+ dag2Id: string;
+};
+
+export const test = base.extend<Record<never, never>, { dagRunsPageData:
DagRunsPageData }>({
+ dagRunsPageData: [
+ async ({ authenticatedRequest }, use, workerInfo) => {
+ const dag1Id = testConfig.testDag.id;
+ const dag2Id = "example_python_operator";
+ const createdRuns: Array<{ dagId: string; runId: string }> = [];
+
+ try {
+ await Promise.all([
+ waitForDagReady(authenticatedRequest, dag1Id),
+ waitForDagReady(authenticatedRequest, dag2Id),
+ ]);
+
+ const baseOffset = workerInfo.parallelIndex * 7_200_000;
+ const timestamp = Date.now() - baseOffset;
+
+ const runId1 = uniqueRunId("dagrun_failed");
+
+ await apiCreateDagRun(authenticatedRequest, dag1Id, {
+ dag_run_id: runId1,
+ logical_date: new Date(timestamp).toISOString(),
+ });
+ createdRuns.push({ dagId: dag1Id, runId: runId1 });
+ await apiSetDagRunState(authenticatedRequest, { dagId: dag1Id, runId:
runId1, state: "failed" });
+
+ const runId2 = uniqueRunId("dagrun_success");
+
+ await apiCreateDagRun(authenticatedRequest, dag1Id, {
+ dag_run_id: runId2,
+ logical_date: new Date(timestamp + 60_000).toISOString(),
+ });
+ createdRuns.push({ dagId: dag1Id, runId: runId2 });
+ await apiSetDagRunState(authenticatedRequest, { dagId: dag1Id, runId:
runId2, state: "success" });
+
+ const runId3 = uniqueRunId("dagrun_other");
+
+ await apiCreateDagRun(authenticatedRequest, dag2Id, {
+ dag_run_id: runId3,
+ logical_date: new Date(timestamp + 120_000).toISOString(),
+ });
+ createdRuns.push({ dagId: dag2Id, runId: runId3 });
+
+ await use({ dag1Id, dag2Id });
+ } finally {
+ for (const { dagId, runId } of createdRuns) {
+ await safeCleanupDagRun(authenticatedRequest, dagId, runId);
+ }
+ }
+ },
+ { scope: "worker", timeout: 120_000 },
+ ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
new file mode 100644
index 00000000000..f4431a2984e
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
@@ -0,0 +1,42 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Dashboard data fixture — tracks UI-triggered DAG runs for cleanup.
+ */
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import { safeCleanupDagRun } from "tests/e2e/utils/test-helpers";
+
+export type DagRunCleanup = {
+ track: (runId: string) => void;
+};
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a
React Hook. */
+export const test = base.extend<{ dagRunCleanup: DagRunCleanup }>({
+ dagRunCleanup: async ({ authenticatedRequest }, use) => {
+ const trackedRunIds: Array<string> = [];
+
+ await use({ track: (runId: string) => trackedRunIds.push(runId) });
+
+ for (const runId of trackedRunIds) {
+ await safeCleanupDagRun(authenticatedRequest, testConfig.testDag.id,
runId);
+ }
+ },
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/data.ts
new file mode 100644
index 00000000000..ad1e6d8b7b6
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/data.ts
@@ -0,0 +1,210 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Data-isolation fixtures for E2E tests.
+ *
+ * Extends the POM fixtures with worker-scoped and test-scoped data
+ * fixtures that create API resources and guarantee cleanup — even on
+ * test failure, timeout, or retry.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a
React Hook. */
+import type { APIRequestContext } from "@playwright/test";
+
+import { testConfig } from "../../../playwright.config";
+import {
+ apiCreateDagRun,
+ apiSetDagRunState,
+ apiTriggerDagRun,
+ safeCleanupDagRun,
+ uniqueRunId,
+ waitForDagReady,
+ waitForDagRunStatus,
+} from "../utils/test-helpers";
+import { test as base } from "./pom";
+
+/** Shape returned by single DAG run fixtures. */
+export type DagRunFixtureData = {
+ dagId: string;
+ logicalDate: string;
+ runId: string;
+};
+
+/** Shape returned by the successAndFailedRuns fixture. */
+export type SuccessAndFailedRunsData = {
+ dagId: string;
+ failedRun: DagRunFixtureData;
+ successRun: DagRunFixtureData;
+};
+
+export type DataWorkerFixtures = {
+ /** Ensures the default test DAG is parsed and ready. Worker-scoped, no
cleanup needed. */
+ dagReady: string;
+ /** A DAG run triggered via scheduler and completed. Worker-scoped with
auto-cleanup. */
+ executedDagRun: DagRunFixtureData;
+ /** Two DAG runs: one success, one failed. Worker-scoped with auto-cleanup.
*/
+ successAndFailedRuns: SuccessAndFailedRunsData;
+ /** A DAG run in "success" state (API-only, no scheduler). Worker-scoped
with auto-cleanup. */
+ successDagRun: DagRunFixtureData;
+};
+
+export type DataTestFixtures = Record<never, never>;
+
+async function createAndSetupDagRun(
+ request: APIRequestContext,
+ dagId: string,
+ options: {
+ logicalDate?: string;
+ parallelIndex: number;
+ prefix: string;
+ state: "failed" | "queued" | "success";
+ },
+): Promise<DagRunFixtureData> {
+ await waitForDagReady(request, dagId);
+
+ const runId = uniqueRunId(`${options.prefix}_w${options.parallelIndex}`);
+
+ // Offset logical_date by 2 hours per worker to avoid collisions.
+ const offsetMs = options.parallelIndex * 7_200_000;
+ const logicalDate = options.logicalDate ?? new Date(Date.now() -
offsetMs).toISOString();
+
+ await apiCreateDagRun(request, dagId, {
+ dag_run_id: runId,
+ logical_date: logicalDate,
+ });
+ await apiSetDagRunState(request, { dagId, runId, state: options.state });
+
+ return { dagId, logicalDate, runId };
+}
+
+async function cleanupMultipleRuns(
+ request: APIRequestContext,
+ runs: Array<{ dagId: string; runId: string }>,
+): Promise<void> {
+ for (const { dagId, runId } of runs) {
+ await safeCleanupDagRun(request, dagId, runId);
+ }
+}
+
+export const test = base.extend<DataTestFixtures, DataWorkerFixtures>({
+ dagReady: [
+ async ({ authenticatedRequest }, use) => {
+ const dagId = testConfig.testDag.id;
+
+ await waitForDagReady(authenticatedRequest, dagId);
+ await use(dagId);
+ },
+ { scope: "worker", timeout: 120_000 },
+ ],
+
+ executedDagRun: [
+ async ({ authenticatedRequest }, use) => {
+ const dagId = testConfig.testDag.id;
+ let dagRunId: string | undefined;
+
+ try {
+ await waitForDagReady(authenticatedRequest, dagId);
+ await authenticatedRequest.patch(`/api/v2/dags/${dagId}`, {
+ data: { is_paused: false },
+ });
+
+ const triggered = await apiTriggerDagRun(authenticatedRequest, dagId);
+
+ ({ dagRunId } = triggered);
+
+ await waitForDagRunStatus(authenticatedRequest, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ timeout: 120_000,
+ });
+
+ await use({ dagId, logicalDate: triggered.logicalDate, runId: dagRunId
});
+ } finally {
+ if (dagRunId !== undefined) {
+ await safeCleanupDagRun(authenticatedRequest, dagId, dagRunId);
+ }
+ // Re-pause is handled by global-teardown.ts (globalTeardown in
playwright.config.ts).
+ }
+ },
+ { scope: "worker", timeout: 180_000 },
+ ],
+
+ successAndFailedRuns: [
+ async ({ authenticatedRequest }, use, workerInfo) => {
+ const dagId = testConfig.testDag.id;
+ const createdRuns: Array<{ dagId: string; runId: string }> = [];
+
+ try {
+ await waitForDagReady(authenticatedRequest, dagId);
+
+ const baseOffset = workerInfo.parallelIndex * 7_200_000;
+ const timestamp = Date.now() - baseOffset;
+
+ const successRun = await createAndSetupDagRun(authenticatedRequest,
dagId, {
+ logicalDate: new Date(timestamp).toISOString(),
+ parallelIndex: workerInfo.parallelIndex,
+ prefix: "sf_ok",
+ state: "success",
+ });
+
+ createdRuns.push({ dagId, runId: successRun.runId });
+
+ const failedRun = await createAndSetupDagRun(authenticatedRequest,
dagId, {
+ logicalDate: new Date(timestamp + 60_000).toISOString(),
+ parallelIndex: workerInfo.parallelIndex,
+ prefix: "sf_fail",
+ state: "failed",
+ });
+
+ createdRuns.push({ dagId, runId: failedRun.runId });
+
+ await use({ dagId, failedRun, successRun });
+ } finally {
+ await cleanupMultipleRuns(authenticatedRequest, createdRuns);
+ }
+ },
+ { scope: "worker", timeout: 120_000 },
+ ],
+
+ successDagRun: [
+ async ({ authenticatedRequest }, use, workerInfo) => {
+ const dagId = testConfig.testDag.id;
+ let createdRunId: string | undefined;
+
+ try {
+ const data = await createAndSetupDagRun(authenticatedRequest, dagId, {
+ parallelIndex: workerInfo.parallelIndex,
+ prefix: "run",
+ state: "success",
+ });
+
+ createdRunId = data.runId;
+
+ await use(data);
+ } finally {
+ if (createdRunId !== undefined) {
+ await safeCleanupDagRun(authenticatedRequest, dagId, createdRunId);
+ }
+ }
+ },
+ { scope: "worker", timeout: 120_000 },
+ ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/index.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/index.ts
new file mode 100644
index 00000000000..bef87ad21a5
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/index.ts
@@ -0,0 +1,30 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Unified fixture entry point for E2E tests.
+ *
+ * Re-exports the fully extended `test` object (POM + data fixtures)
+ * so specs only need one import:
+ *
+ * import { expect, test } from "tests/e2e/fixtures";
+ */
+export { test } from "./data";
+export type { DagRunFixtureData, SuccessAndFailedRunsData } from "./data";
+export { expect } from "@playwright/test";
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/pom.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/pom.ts
new file mode 100644
index 00000000000..d561be27faa
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/pom.ts
@@ -0,0 +1,156 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Page Object Model fixtures for E2E tests.
+ *
+ * Provides POM instances and the worker-scoped authenticatedRequest
+ * as test fixtures, eliminating manual `beforeEach` boilerplate.
+ */
+import { test as base, type APIRequestContext } from "@playwright/test";
+
+import { AUTH_FILE, testConfig } from "../../../playwright.config";
+import { AssetDetailPage } from "../pages/AssetDetailPage";
+import { AssetListPage } from "../pages/AssetListPage";
+import { BackfillPage } from "../pages/BackfillPage";
+import { ConfigurationPage } from "../pages/ConfigurationPage";
+import { ConnectionsPage } from "../pages/ConnectionsPage";
+import { DagCalendarTab } from "../pages/DagCalendarTab";
+import { DagCodePage } from "../pages/DagCodePage";
+import { DagRunsPage } from "../pages/DagRunsPage";
+import { DagRunsTabPage } from "../pages/DagRunsTabPage";
+import { DagsPage } from "../pages/DagsPage";
+import { EventsPage } from "../pages/EventsPage";
+import { GridPage } from "../pages/GridPage";
+import { HomePage } from "../pages/HomePage";
+import { PluginsPage } from "../pages/PluginsPage";
+import { PoolsPage } from "../pages/PoolsPage";
+import { ProvidersPage } from "../pages/ProvidersPage";
+import { RequiredActionsPage } from "../pages/RequiredActionsPage";
+import { TaskInstancePage } from "../pages/TaskInstancePage";
+import { TaskInstancesPage } from "../pages/TaskInstancesPage";
+import { VariablePage } from "../pages/VariablePage";
+import { XComsPage } from "../pages/XComsPage";
+
+export type PomWorkerFixtures = {
+ authenticatedRequest: APIRequestContext;
+};
+
+export type PomFixtures = {
+ assetDetailPage: AssetDetailPage;
+ assetListPage: AssetListPage;
+ backfillPage: BackfillPage;
+ configurationPage: ConfigurationPage;
+ connectionsPage: ConnectionsPage;
+ dagCalendarTab: DagCalendarTab;
+ dagCodePage: DagCodePage;
+ dagRunsPage: DagRunsPage;
+ dagRunsTabPage: DagRunsTabPage;
+ dagsPage: DagsPage;
+ eventsPage: EventsPage;
+ gridPage: GridPage;
+ homePage: HomePage;
+ pluginsPage: PluginsPage;
+ poolsPage: PoolsPage;
+ providersPage: ProvidersPage;
+ requiredActionsPage: RequiredActionsPage;
+ taskInstancePage: TaskInstancePage;
+ taskInstancesPage: TaskInstancesPage;
+ variablePage: VariablePage;
+ xcomsPage: XComsPage;
+};
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a
React Hook. */
+export const test = base.extend<PomFixtures, PomWorkerFixtures>({
+ assetDetailPage: async ({ page }, use) => {
+ await use(new AssetDetailPage(page));
+ },
+ assetListPage: async ({ page }, use) => {
+ await use(new AssetListPage(page));
+ },
+ authenticatedRequest: [
+ async ({ playwright }, use) => {
+ const ctx = await playwright.request.newContext({
+ baseURL: testConfig.connection.baseUrl,
+ storageState: AUTH_FILE,
+ });
+
+ await use(ctx);
+ await ctx.dispose();
+ },
+ { scope: "worker" },
+ ],
+ backfillPage: async ({ page }, use) => {
+ await use(new BackfillPage(page));
+ },
+ configurationPage: async ({ page }, use) => {
+ await use(new ConfigurationPage(page));
+ },
+ connectionsPage: async ({ page }, use) => {
+ await use(new ConnectionsPage(page));
+ },
+ dagCalendarTab: async ({ page }, use) => {
+ await use(new DagCalendarTab(page));
+ },
+ dagCodePage: async ({ page }, use) => {
+ await use(new DagCodePage(page));
+ },
+ dagRunsPage: async ({ page }, use) => {
+ await use(new DagRunsPage(page));
+ },
+ dagRunsTabPage: async ({ page }, use) => {
+ await use(new DagRunsTabPage(page));
+ },
+ dagsPage: async ({ page }, use) => {
+ await use(new DagsPage(page));
+ },
+ eventsPage: async ({ page }, use) => {
+ await use(new EventsPage(page));
+ },
+ gridPage: async ({ page }, use) => {
+ await use(new GridPage(page));
+ },
+ homePage: async ({ page }, use) => {
+ await use(new HomePage(page));
+ },
+ pluginsPage: async ({ page }, use) => {
+ await use(new PluginsPage(page));
+ },
+ poolsPage: async ({ page }, use) => {
+ await use(new PoolsPage(page));
+ },
+ providersPage: async ({ page }, use) => {
+ await use(new ProvidersPage(page));
+ },
+ requiredActionsPage: async ({ page }, use) => {
+ await use(new RequiredActionsPage(page));
+ },
+ taskInstancePage: async ({ page }, use) => {
+ await use(new TaskInstancePage(page));
+ },
+ taskInstancesPage: async ({ page }, use) => {
+ await use(new TaskInstancesPage(page));
+ },
+ variablePage: async ({ page }, use) => {
+ await use(new VariablePage(page));
+ },
+ xcomsPage: async ({ page }, use) => {
+ await use(new XComsPage(page));
+ },
+});
diff --git
a/airflow-core/src/airflow/ui/tests/e2e/fixtures/task-instances-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/task-instances-data.ts
new file mode 100644
index 00000000000..cde2f4e6aa2
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/task-instances-data.ts
@@ -0,0 +1,112 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Task instances data fixture — creates runs with success/failed task
instances.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a
React Hook. */
+import { expect, type APIRequestContext } from "@playwright/test";
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import {
+ apiCreateDagRun,
+ safeCleanupDagRun,
+ uniqueRunId,
+ waitForDagReady,
+} from "tests/e2e/utils/test-helpers";
+
+export type TaskInstancesData = {
+ dagId: string;
+};
+
+async function setAllTaskInstanceStates(
+ request: APIRequestContext,
+ options: { dagId: string; runId: string; state: string },
+): Promise<void> {
+ const { dagId, runId, state } = options;
+
+ const tasksResponse = await
request.get(`/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances`);
+
+ expect(tasksResponse.ok()).toBeTruthy();
+
+ const tasksData = (await tasksResponse.json()) as {
+ task_instances: Array<{ task_id: string }>;
+ };
+
+ for (const task of tasksData.task_instances) {
+ await expect
+ .poll(
+ async () => {
+ const resp = await request.patch(
+
`/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${task.task_id}`,
+ {
+ data: { new_state: state },
+ headers: { "Content-Type": "application/json" },
+ timeout: 30_000,
+ },
+ );
+
+ return resp.ok();
+ },
+ { intervals: [2000], timeout: 30_000 },
+ )
+ .toBe(true);
+ }
+}
+
+export const test = base.extend<Record<never, never>, { taskInstancesData:
TaskInstancesData }>({
+ taskInstancesData: [
+ async ({ authenticatedRequest }, use, workerInfo) => {
+ const dagId = testConfig.testDag.id;
+ const createdRunIds: Array<string> = [];
+ const baseOffset = workerInfo.parallelIndex * 7_200_000;
+ const timestamp = Date.now() - baseOffset;
+
+ try {
+ await waitForDagReady(authenticatedRequest, dagId);
+
+ const runId1 = uniqueRunId("ti_success");
+
+ await apiCreateDagRun(authenticatedRequest, dagId, {
+ dag_run_id: runId1,
+ logical_date: new Date(timestamp).toISOString(),
+ });
+ createdRunIds.push(runId1);
+ await setAllTaskInstanceStates(authenticatedRequest, { dagId, runId:
runId1, state: "success" });
+
+ const runId2 = uniqueRunId("ti_failed");
+
+ await apiCreateDagRun(authenticatedRequest, dagId, {
+ dag_run_id: runId2,
+ logical_date: new Date(timestamp + 60_000).toISOString(),
+ });
+ createdRunIds.push(runId2);
+ await setAllTaskInstanceStates(authenticatedRequest, { dagId, runId:
runId2, state: "failed" });
+
+ await use({ dagId });
+ } finally {
+ for (const runId of createdRunIds) {
+ await safeCleanupDagRun(authenticatedRequest, dagId, runId);
+ }
+ }
+ },
+ { scope: "worker", timeout: 120_000 },
+ ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/xcom-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/xcom-data.ts
new file mode 100644
index 00000000000..81dab06f8d0
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/xcom-data.ts
@@ -0,0 +1,79 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * XCom data fixture — triggers example_xcom DAG runs to generate XCom entries.
+ */
+
+/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a
React Hook. */
+import { testConfig } from "playwright.config";
+import { test as base } from "tests/e2e/fixtures";
+import {
+ apiCreateDagRun,
+ safeCleanupDagRun,
+ uniqueRunId,
+ waitForDagReady,
+ waitForDagRunStatus,
+} from "tests/e2e/utils/test-helpers";
+
+export type XcomRunsData = {
+ dagId: string;
+ xcomKey: string;
+};
+
+export const test = base.extend<Record<never, never>, { xcomRunsData:
XcomRunsData }>({
+ xcomRunsData: [
+ async ({ authenticatedRequest }, use, workerInfo) => {
+ const dagId = testConfig.xcomDag.id;
+ const createdRunIds: Array<string> = [];
+ const triggerCount = 2;
+ const baseOffset = workerInfo.parallelIndex * 7_200_000;
+
+ try {
+ await waitForDagReady(authenticatedRequest, dagId);
+ await authenticatedRequest.patch(`/api/v2/dags/${dagId}`, {
+ data: { is_paused: false },
+ });
+
+ for (let i = 0; i < triggerCount; i++) {
+ const runId = uniqueRunId(`xcom_run_${i}`);
+
+ await apiCreateDagRun(authenticatedRequest, dagId, {
+ dag_run_id: runId,
+ logical_date: new Date(Date.now() - baseOffset + i *
60_000).toISOString(),
+ });
+ createdRunIds.push(runId);
+ await waitForDagRunStatus(authenticatedRequest, {
+ dagId,
+ expectedState: "success",
+ runId,
+ timeout: 120_000,
+ });
+ }
+
+ await use({ dagId, xcomKey: "return_value" });
+ } finally {
+ for (const runId of createdRunIds) {
+ await safeCleanupDagRun(authenticatedRequest, dagId, runId);
+ }
+ }
+ },
+ { scope: "worker", timeout: 180_000 },
+ ],
+});
diff --git a/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
b/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
new file mode 100644
index 00000000000..aae6cc588aa
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
@@ -0,0 +1,60 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { request } from "@playwright/test";
+
+import { AUTH_FILE, testConfig } from "../../playwright.config";
+
+/**
+ * Re-pause all DAGs that E2E fixtures may have unpaused during the test run.
+ * Runs once after all workers have finished, preventing the scheduler from
+ * creating unbounded DAG runs in shared environments.
+ */
+async function globalTeardown() {
+ const baseURL = testConfig.connection.baseUrl;
+
+ const context = await request.newContext({
+ baseURL,
+ storageState: AUTH_FILE,
+ });
+
+ const dagIds = [
+ testConfig.testDag.id,
+ testConfig.testDag.hitlId,
+ testConfig.xcomDag.id,
+ "asset_produces_1",
+ ];
+
+ for (const dagId of dagIds) {
+ try {
+ await context.patch(`/api/v2/dags/${dagId}`, {
+ data: { is_paused: true },
+ headers: { "Content-Type": "application/json" },
+ timeout: 10_000,
+ });
+ } catch (error) {
+ console.warn(
+ `[e2e teardown] Failed to re-pause DAG ${dagId}: ${error instanceof
Error ? error.message : String(error)}`,
+ );
+ }
+ }
+
+ await context.dispose();
+}
+
+export default globalTeardown;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
new file mode 100644
index 00000000000..3f651289567
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
@@ -0,0 +1,797 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Shared E2E test utilities for data isolation and API-based setup.
+ */
+import { expect, type APIRequestContext, type Locator, type Page } from
"@playwright/test";
+import { randomUUID } from "node:crypto";
+import { testConfig } from "playwright.config";
+
+type RequestLike = APIRequestContext | Page;
+
+type DagRunData = {
+ conf?: Record<string, unknown>;
+ dag_run_id: string;
+ logical_date: string;
+ note?: string;
+};
+
+type DagRunResponse = {
+ dag_run_id: string;
+ state: string;
+};
+
+function getRequestContext(source: RequestLike): APIRequestContext {
+ if ("request" in source) {
+ return source.request;
+ }
+
+ return source;
+}
+
+const { baseUrl } = testConfig.connection;
+
+/** Generate a unique run ID: `{prefix}_{uuid8}`. */
+export function uniqueRunId(prefix: string): string {
+ return `${prefix}_${randomUUID().slice(0, 8)}`;
+}
+
+/**
+ * Poll GET /api/v2/dags/{dagId} until 200 — waits for DAG to be parsed.
+ */
+export async function waitForDagReady(
+ source: RequestLike,
+ dagId: string,
+ options?: { timeout?: number },
+): Promise<void> {
+ const request = getRequestContext(source);
+
+ const timeout = options?.timeout ?? 120_000;
+
+ await expect
+ .poll(
+ async () => {
+ try {
+ const response = await
request.get(`${baseUrl}/api/v2/dags/${dagId}`, { timeout: 30_000 });
+
+ return response.ok();
+ } catch {
+ return false;
+ }
+ },
+ { intervals: [2000], timeout },
+ )
+ .toBe(true);
+}
+
+/**
+ * Trigger a DAG run with an auto-generated unique run ID.
+ * Returns the dagRunId and logicalDate for targeted polling.
+ */
+export async function apiTriggerDagRun(
+ source: RequestLike,
+ dagId: string,
+ options?: { runId?: string },
+): Promise<{ dagRunId: string; logicalDate: string }> {
+ const request = getRequestContext(source);
+
+ let dagRunId = options?.runId ?? uniqueRunId(dagId);
+ let resultLogicalDate = new Date().toISOString();
+
+ await expect(async () => {
+ // Generate a fresh logicalDate on each attempt so that a 409
+ // (logical_date collision from a parallel worker) is recoverable.
+ const logicalDate = new Date().toISOString();
+
+ const response = await
request.post(`${baseUrl}/api/v2/dags/${dagId}/dagRuns`, {
+ data: {
+ dag_run_id: dagRunId,
+ logical_date: logicalDate,
+ note: "e2e test",
+ },
+ headers: { "Content-Type": "application/json" },
+ timeout: 30_000,
+ });
+
+ if (!response.ok()) {
+ // On 409, regenerate dag_run_id (unless caller pinned it) so the
+ // next attempt doesn't collide on either dag_run_id or logical_date.
+ if (response.status() === 409 && options?.runId === undefined) {
+ dagRunId = uniqueRunId(dagId);
+ }
+
+ throw new Error(`DAG run trigger failed (${response.status()})`);
+ }
+
+ const json = (await response.json()) as { logical_date?: string } &
DagRunResponse;
+
+ resultLogicalDate = json.logical_date ?? logicalDate;
+ }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+
+ return { dagRunId, logicalDate: resultLogicalDate };
+}
+
+/**
+ * Create a DAG run via the API.
+ */
+export async function apiCreateDagRun(source: RequestLike, dagId: string,
data: DagRunData): Promise<string> {
+ const request = getRequestContext(source);
+ let resultRunId = data.dag_run_id;
+
+ // Track fallback values that are regenerated on 409 collisions,
+ // without mutating the caller's `data` parameter.
+ let retryRunId: string | undefined;
+ let retryLogicalDate: string | undefined;
+
+ await expect(async () => {
+ const runId = retryRunId ?? data.dag_run_id;
+ const logicalDate = retryLogicalDate ?? data.logical_date;
+
+ const response = await
request.post(`${baseUrl}/api/v2/dags/${dagId}/dagRuns`, {
+ data: {
+ conf: data.conf ?? {},
+ dag_run_id: runId,
+ logical_date: logicalDate,
+ note: data.note ?? "e2e test",
+ },
+ headers: { "Content-Type": "application/json" },
+ timeout: 30_000,
+ });
+
+ if (!response.ok()) {
+ // On 409, generate fresh dag_run_id and logical_date for the next retry.
+ if (response.status() === 409) {
+ retryRunId = uniqueRunId(dagId);
+ retryLogicalDate = new Date().toISOString();
+ }
+
+ throw new Error(`DAG run creation failed (${response.status()})`);
+ }
+
+ const json = (await response.json()) as DagRunResponse;
+
+ resultRunId = json.dag_run_id;
+ }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+
+ return resultRunId;
+}
+
+/**
+ * Set a DAG run's state via the API.
+ */
+export async function apiSetDagRunState(
+ source: RequestLike,
+ options: { dagId: string; runId: string; state: "failed" | "queued" |
"success" },
+): Promise<void> {
+ const { dagId, runId, state } = options;
+ const request = getRequestContext(source);
+
+ await expect(async () => {
+ const response = await
request.patch(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+ data: { state },
+ headers: { "Content-Type": "application/json" },
+ timeout: 30_000,
+ });
+
+ if (response.status() !== 409 && !response.ok()) {
+ throw new Error(`Set DAG run state failed (${response.status()})`);
+ }
+ }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+}
+
+/**
+ * Poll the API until the DAG run reaches the expected state.
+ */
+export async function waitForDagRunStatus(
+ source: RequestLike,
+ options: { dagId: string; expectedState: string; runId: string; timeout?:
number },
+): Promise<void> {
+ const { dagId, expectedState, runId } = options;
+ const request = getRequestContext(source);
+
+ const timeout = options.timeout ?? 120_000;
+
+ await expect
+ .poll(
+ async () => {
+ try {
+ const response = await
request.get(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+ timeout: 30_000,
+ });
+
+ if (!response.ok()) {
+ return `unknown (HTTP ${response.status()})`;
+ }
+
+ const data = (await response.json()) as DagRunResponse;
+
+ if (data.state === "failed" && expectedState !== "failed") {
+ throw new Error(`DAG run ${runId} failed unexpectedly`);
+ }
+
+ return data.state;
+ } catch (error) {
+ // Re-throw intentional failures (unexpected "failed" state).
+ if (error instanceof Error && error.message.includes("failed
unexpectedly")) {
+ throw error;
+ }
+
+ // Transient network/timeout errors — retry on next interval.
+ return `unknown (${error instanceof Error ? error.message : "network
error"})`;
+ }
+ },
+ {
+ intervals: [5000],
+ message: `DAG run ${runId} did not reach state "${expectedState}"
within ${timeout}ms`,
+ timeout,
+ },
+ )
+ .toBe(expectedState);
+}
+
+/**
+ * Poll the API until a task instance reaches the expected state.
+ * Unlike waitForDagRunStatus, this targets a specific task within a DAG run.
+ */
+export async function waitForTaskInstanceState(
+ source: RequestLike,
+ options: {
+ dagId: string;
+ expectedState: string;
+ runId: string;
+ taskId: string;
+ timeout?: number;
+ },
+): Promise<void> {
+ const { dagId, expectedState, runId, taskId } = options;
+ const request = getRequestContext(source);
+
+ const timeout = options.timeout ?? 120_000;
+ const terminalStates = new Set(["success", "failed", "skipped", "removed",
"upstream_failed"]);
+
+ await expect
+ .poll(
+ async () => {
+ try {
+ const response = await request.get(
+
`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${taskId}`,
+ { timeout: 30_000 },
+ );
+
+ if (!response.ok()) {
+ return "unknown";
+ }
+
+ const data = (await response.json()) as { state: string };
+ const { state } = data;
+ const expected = expectedState.toLowerCase();
+
+ if (state !== expected && terminalStates.has(state)) {
+ throw new Error(`Task ${taskId} reached terminal state "${state}"
instead of "${expected}"`);
+ }
+
+ return state;
+ } catch (error) {
+ if (error instanceof Error && error.message.includes("terminal
state")) {
+ throw error;
+ }
+
+ return "unknown";
+ }
+ },
+ {
+ intervals: [3000, 5000],
+ message: `Task ${taskId} did not reach state "${expectedState}" within
${timeout}ms`,
+ timeout,
+ },
+ )
+ .toBe(expectedState.toLowerCase());
+}
+
+/**
+ * Respond to a HITL (Human-in-the-Loop) task via the API.
+ * 409 is treated as success (already responded).
+ */
+export async function apiRespondToHITL(
+ source: RequestLike,
+ options: {
+ chosenOptions: Array<string>;
+ dagId: string;
+ mapIndex?: number;
+ paramsInput?: Record<string, unknown>;
+ runId: string;
+ taskId: string;
+ },
+): Promise<void> {
+ const { chosenOptions, dagId, runId, taskId } = options;
+ const mapIndex = options.mapIndex ?? -1;
+ const paramsInput = options.paramsInput ?? {};
+ const request = getRequestContext(source);
+
+ await expect(async () => {
+ const response = await request.patch(
+
`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${taskId}/${mapIndex}/hitlDetails`,
+ {
+ data: { chosen_options: chosenOptions, params_input: paramsInput },
+ headers: { "Content-Type": "application/json" },
+ timeout: 30_000,
+ },
+ );
+
+ // 409 = already responded; acceptable.
+ if (response.status() !== 409 && !response.ok()) {
+ throw new Error(`HITL response failed (${response.status()})`);
+ }
+ }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+}
+
+/**
+ * Run the full HITL flow entirely via API — no browser needed.
+ *
+ * The example_hitl_operator DAG has 4 parallel HITL tasks, then an approval
+ * task, then a branch task. This function triggers the DAG, responds to each
+ * task via the API, and waits for the DAG run to complete.
+ */
+export async function setupHITLFlowViaAPI(
+ source: RequestLike,
+ dagId: string,
+ approve: boolean,
+): Promise<string> {
+ const request = getRequestContext(source);
+
+ await waitForDagReady(request, dagId);
+ await request.patch(`${baseUrl}/api/v2/dags/${dagId}`, { data: { is_paused:
false } });
+
+ const { dagRunId } = await apiTriggerDagRun(request, dagId);
+
+ // wait_for_default_option auto-resolves (1s timeout, defaults=["option 7"]).
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ taskId: "wait_for_default_option",
+ });
+
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "deferred",
+ runId: dagRunId,
+ taskId: "wait_for_input",
+ });
+ await apiRespondToHITL(request, {
+ chosenOptions: ["OK"],
+ dagId,
+ paramsInput: { information: "Approved by test" },
+ runId: dagRunId,
+ taskId: "wait_for_input",
+ });
+
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "deferred",
+ runId: dagRunId,
+ taskId: "wait_for_option",
+ });
+ await apiRespondToHITL(request, {
+ chosenOptions: ["option 1"],
+ dagId,
+ runId: dagRunId,
+ taskId: "wait_for_option",
+ });
+
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "deferred",
+ runId: dagRunId,
+ taskId: "wait_for_multiple_options",
+ });
+ await apiRespondToHITL(request, {
+ chosenOptions: ["option 4", "option 5"],
+ dagId,
+ runId: dagRunId,
+ taskId: "wait_for_multiple_options",
+ });
+
+ // Wait for all parallel tasks to succeed before the approval task starts.
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ taskId: "wait_for_input",
+ });
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ taskId: "wait_for_option",
+ });
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ taskId: "wait_for_multiple_options",
+ });
+
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "deferred",
+ runId: dagRunId,
+ taskId: "valid_input_and_options",
+ });
+ await apiRespondToHITL(request, {
+ chosenOptions: [approve ? "Approve" : "Reject"],
+ dagId,
+ runId: dagRunId,
+ taskId: "valid_input_and_options",
+ });
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ taskId: "valid_input_and_options",
+ });
+
+ if (approve) {
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "deferred",
+ runId: dagRunId,
+ taskId: "choose_a_branch_to_run",
+ });
+ await apiRespondToHITL(request, {
+ chosenOptions: ["task_1"],
+ dagId,
+ runId: dagRunId,
+ taskId: "choose_a_branch_to_run",
+ });
+ }
+
+ await waitForDagRunStatus(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ timeout: 120_000,
+ });
+
+ return dagRunId;
+}
+
+/** Delete a DAG run via the API. 404 is treated as success. */
+export async function apiDeleteDagRun(source: RequestLike, dagId: string,
runId: string): Promise<void> {
+ const request = getRequestContext(source);
+
+ const response = await
request.delete(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+ timeout: 30_000,
+ });
+
+ // 404 = already deleted by another worker or cleanup; acceptable.
+ if (response.status() === 404) {
+ return;
+ }
+
+ if (!response.ok()) {
+ const body = await response.text();
+
+ throw new Error(`DAG run deletion failed (${response.status()}): ${body}`);
+ }
+}
+
+/**
+ * Delete a DAG run, logging (not throwing) unexpected errors.
+ * Use this in fixture teardown where cleanup must not abort the loop.
+ * 404 is already handled inside `apiDeleteDagRun`.
+ * 409 (running state) is handled by force-failing the run first, then
retrying.
+ */
+export async function safeCleanupDagRun(source: RequestLike, dagId: string,
runId: string): Promise<void> {
+ try {
+ await apiDeleteDagRun(source, dagId, runId);
+ } catch (error) {
+ const message = error instanceof Error ? error.message : String(error);
+
+ // 409 = DAG run is still running — force-fail, then retry deletion.
+ if (message.includes("409")) {
+ try {
+ await apiSetDagRunState(source, { dagId, runId, state: "failed" });
+ await apiDeleteDagRun(source, dagId, runId);
+ } catch (retryError) {
+ console.warn(
+ `[e2e cleanup] Retry failed for DAG run ${dagId}/${runId}:
${retryError instanceof Error ? retryError.message : String(retryError)}`,
+ );
+ }
+
+ return;
+ }
+
+ console.warn(`[e2e cleanup] Failed to delete DAG run ${dagId}/${runId}:
${message}`);
+ }
+}
+
+/** Create a variable via the API. 409 is treated as success. */
+export async function apiCreateVariable(
+ source: RequestLike,
+ options: { description?: string; key: string; value: string },
+): Promise<void> {
+ const { description, key, value } = options;
+ const request = getRequestContext(source);
+
+ await expect(async () => {
+ const response = await request.post(`${baseUrl}/api/v2/variables`, {
+ data: { description: description ?? "", key, value },
+ headers: { "Content-Type": "application/json" },
+ timeout: 10_000,
+ });
+
+ if (response.status() !== 409 && !response.ok()) {
+ throw new Error(`Variable creation failed (${response.status()})`);
+ }
+ }).toPass({ intervals: [2000, 3000, 5000], timeout: 90_000 });
+}
+
+/** Delete a variable via the API. 404 is treated as success. */
+export async function apiDeleteVariable(source: RequestLike, key: string):
Promise<void> {
+ const request = getRequestContext(source);
+
+ const response = await
request.delete(`${baseUrl}/api/v2/variables/${encodeURIComponent(key)}`, {
+ timeout: 30_000,
+ });
+
+ // 404 = already deleted by another worker or cleanup; acceptable.
+ if (response.status() === 404) {
+ return;
+ }
+
+ if (!response.ok()) {
+ const body = await response.text();
+
+ throw new Error(`Variable deletion failed (${response.status()}):
${body}`);
+ }
+}
+
+/** Cancel a single backfill via the API. 409 (already completed) is treated
as success. */
+export async function apiCancelBackfill(source: RequestLike, backfillId:
number): Promise<void> {
+ const request = getRequestContext(source);
+
+ const response = await
request.put(`${baseUrl}/api/v2/backfills/${backfillId}/cancel`, {
+ timeout: 30_000,
+ });
+
+ if (response.status() !== 200 && response.status() !== 409) {
+ throw new Error(`Cancel backfill failed (${response.status()})`);
+ }
+}
+
+/** Cancel all active (non-completed) backfills for a DAG. */
+export async function apiCancelAllActiveBackfills(source: RequestLike, dagId:
string): Promise<void> {
+ const request = getRequestContext(source);
+
+ const response = await
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
+ timeout: 30_000,
+ });
+
+ if (!response.ok()) {
+ throw new Error(`List backfills failed (${response.status()})`);
+ }
+
+ const data = (await response.json()) as { backfills: Array<{ completed_at:
string | null; id: number }> };
+
+ for (const backfill of data.backfills) {
+ if (backfill.completed_at === null) {
+ await apiCancelBackfill(source, backfill.id);
+ }
+ }
+}
+
+/** Poll until all backfills for a DAG are completed. */
+export async function apiWaitForNoActiveBackfill(
+ source: RequestLike,
+ dagId: string,
+ timeout: number = 120_000,
+): Promise<void> {
+ const request = getRequestContext(source);
+
+ await expect
+ .poll(
+ async () => {
+ try {
+ const response = await
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
+ timeout: 30_000,
+ });
+
+ if (!response.ok()) {
+ return false;
+ }
+
+ const data = (await response.json()) as {
+ backfills: Array<{ completed_at: string | null }>;
+ };
+
+ return data.backfills.every((b) => b.completed_at !== null);
+ } catch {
+ return false;
+ }
+ },
+ {
+ intervals: [2000, 5000, 10_000],
+ message: `Active backfills for DAG ${dagId} did not clear within
${timeout}ms`,
+ timeout,
+ },
+ )
+ .toBeTruthy();
+}
+
+/** Poll until a backfill reaches completed state. */
+export async function apiWaitForBackfillComplete(
+ source: RequestLike,
+ backfillId: number,
+ timeout: number = 120_000,
+): Promise<void> {
+ const request = getRequestContext(source);
+
+ await expect
+ .poll(
+ async () => {
+ try {
+ const response = await
request.get(`${baseUrl}/api/v2/backfills/${backfillId}`, {
+ timeout: 30_000,
+ });
+
+ if (!response.ok()) {
+ return false;
+ }
+
+ const data = (await response.json()) as { completed_at: string |
null };
+
+ return data.completed_at !== null;
+ } catch {
+ return false;
+ }
+ },
+ {
+ intervals: [2000, 5000, 10_000],
+ message: `Backfill ${backfillId} did not complete within ${timeout}ms`,
+ timeout,
+ },
+ )
+ .toBeTruthy();
+}
+
+/** Create a backfill via the API. On 409, cancels active backfills and
retries once. */
+export async function apiCreateBackfill(
+ source: RequestLike,
+ dagId: string,
+ options: {
+ fromDate: string;
+ maxActiveRuns?: number;
+ reprocessBehavior?: string;
+ toDate: string;
+ },
+): Promise<number> {
+ const request = getRequestContext(source);
+ const { fromDate, maxActiveRuns, reprocessBehavior = "none", toDate } =
options;
+
+ const body: Record<string, unknown> = {
+ dag_id: dagId,
+ from_date: fromDate,
+ reprocess_behavior: reprocessBehavior,
+ to_date: toDate,
+ };
+
+ if (maxActiveRuns !== undefined) {
+ body.max_active_runs = maxActiveRuns;
+ }
+
+ const response = await request.post(`${baseUrl}/api/v2/backfills`, {
+ data: body,
+ headers: { "Content-Type": "application/json" },
+ timeout: 30_000,
+ });
+
+ if (response.status() === 409) {
+ await apiCancelAllActiveBackfills(source, dagId);
+ await apiWaitForNoActiveBackfill(source, dagId, 30_000);
+
+ const retryResponse = await request.post(`${baseUrl}/api/v2/backfills`, {
+ data: body,
+ headers: { "Content-Type": "application/json" },
+ timeout: 30_000,
+ });
+
+ if (!retryResponse.ok()) {
+ throw new Error(`Backfill creation retry failed
(${retryResponse.status()})`);
+ }
+
+ return ((await retryResponse.json()) as { id: number }).id;
+ }
+
+ if (!response.ok()) {
+ throw new Error(`Backfill creation failed (${response.status()})`);
+ }
+
+ return ((await response.json()) as { id: number }).id;
+}
+
+/**
+ * Wait for a table (by testId) to load and show at least one row or an empty
message.
+ */
+export async function waitForTableLoad(
+ page: Page,
+ options?: { checkSkeletons?: boolean; testId?: string; timeout?: number },
+): Promise<void> {
+ const testId = options?.testId ?? "table-list";
+ const timeout = options?.timeout ?? 30_000;
+
+ const table = page.getByTestId(testId);
+
+ await expect(table).toBeVisible({ timeout });
+
+ // Skip skeleton check by default — XComs uses [data-scope="skeleton"] for
+ // lazy-loaded cell content, not table loading, which would cause false
waits.
+ if (options?.checkSkeletons) {
+ await expect(table.locator('[data-testid="skeleton"],
[data-scope="skeleton"]')).toHaveCount(0, {
+ timeout,
+ });
+ }
+
+ const firstRow = table.locator("tbody tr").first();
+ const emptyMessage = page.getByText(/no .* found/i);
+
+ await expect(firstRow.or(emptyMessage)).toBeVisible({ timeout });
+}
+
+/**
+ * Wait for DOM row count to stabilize across consecutive measurements.
+ * Uses 500ms intervals to give React concurrent rendering time to settle.
+ */
+export async function waitForStableRowCount(
+ rowLocator: Locator,
+ options?: { timeout?: number },
+): Promise<number> {
+ const timeout = options?.timeout ?? 10_000;
+ const requiredStableChecks = 2;
+ let lastStableCount = 0;
+
+ await expect
+ .poll(
+ async () => {
+ const counts: Array<number> = [];
+
+ for (let i = 0; i < requiredStableChecks + 1; i++) {
+ counts.push(await rowLocator.count());
+
+ if (i < requiredStableChecks) {
+ await new Promise((resolve) => setTimeout(resolve, 500));
+ }
+ }
+
+ const first = counts[0] ?? 0;
+ const allSame = counts.length > 0 && first > 0 && counts.every((c) =>
c === first);
+
+ if (allSame) {
+ lastStableCount = first;
+ }
+
+ return allSame;
+ },
+ { intervals: [500], timeout },
+ )
+ .toBe(true);
+
+ return lastStableCount;
+}