This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ed51aff0562 Add DagRunType for operator (#63733)
ed51aff0562 is described below
commit ed51aff0562556fa1dd7d1ff6152018e70db03c2
Author: Henry Chen <[email protected]>
AuthorDate: Fri Apr 3 04:31:50 2026 +0800
Add DagRunType for operator (#63733)
* Execution API: Enforce OPERATOR run type for operator-triggered DAG runs
* naming operator as operator_triggered
* Fix trigger_dag syntax error after rebase
* address unrelated changes
---
.../api_fastapi/core_api/openapi/_private_ui.yaml | 1 +
.../core_api/openapi/v2-rest-api-generated.yaml | 1 +
.../api_fastapi/execution_api/routes/dag_runs.py | 7 ++---
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 2 +-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 2 +-
.../src/airflow/ui/src/components/RunTypeIcon.tsx | 3 ++
airflow-core/src/airflow/utils/types.py | 1 +
.../tests/unit/api/common/test_trigger_dag.py | 19 +++++++++++++
.../execution_api/versions/head/test_dag_runs.py | 32 ++++++++++++++++++++--
.../src/airflowctl/api/datamodels/generated.py | 1 +
.../src/airflow/sdk/api/datamodels/_generated.py | 1 +
11 files changed, 62 insertions(+), 8 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 0e58f9cba26..0ae06e44046 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -2191,6 +2191,7 @@ components:
- backfill
- scheduled
- manual
+ - operator_triggered
- asset_triggered
- asset_materialization
title: DagRunType
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 6a8b2a3d7e1..ace51229702 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11175,6 +11175,7 @@ components:
- backfill
- scheduled
- manual
+ - operator_triggered
- asset_triggered
- asset_materialization
title: DagRunType
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 69a06eca9b5..448d0b6945a 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -112,14 +112,12 @@ def trigger_dag_run(
},
)
- # TODO: TriggerDagRunOperator also calls this route but creates MANUAL
runs.
- # Consider a dedicated run type for operator-triggered runs.
- if dm.allowed_run_types is not None and DagRunType.MANUAL not in
dm.allowed_run_types:
+ if dm.allowed_run_types is not None and DagRunType.OPERATOR_TRIGGERED not
in dm.allowed_run_types:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
detail={
"reason": "denied_run_type",
- "message": f"Dag with dag_id '{dag_id}' does not allow manual
runs",
+ "message": f"Dag with dag_id '{dag_id}' does not allow
operator-triggered runs",
},
)
@@ -127,6 +125,7 @@ def trigger_dag_run(
trigger_dag(
dag_id=dag_id,
run_id=run_id,
+ run_type=DagRunType.OPERATOR_TRIGGERED,
conf=payload.conf,
logical_date=payload.logical_date,
triggered_by=DagRunTriggeredByType.OPERATOR,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index cbc0ecaa172..c37bb840017 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3323,7 +3323,7 @@ export const $DagRunTriggeredByType = {
export const $DagRunType = {
type: 'string',
- enum: ['backfill', 'scheduled', 'manual', 'asset_triggered',
'asset_materialization'],
+ enum: ['backfill', 'scheduled', 'manual', 'operator_triggered',
'asset_triggered', 'asset_materialization'],
title: 'DagRunType',
description: 'Class with DagRun types.'
} as const;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 2567a22bacc..9a6c9fb7935 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -818,7 +818,7 @@ export type DagRunTriggeredByType = 'cli' | 'operator' |
'rest_api' | 'ui' | 'te
/**
* Class with DagRun types.
*/
-export type DagRunType = 'backfill' | 'scheduled' | 'manual' |
'asset_triggered' | 'asset_materialization';
+export type DagRunType = 'backfill' | 'scheduled' | 'manual' |
'operator_triggered' | 'asset_triggered' | 'asset_materialization';
/**
* DAG schedule reference serializer for assets.
diff --git a/airflow-core/src/airflow/ui/src/components/RunTypeIcon.tsx
b/airflow-core/src/airflow/ui/src/components/RunTypeIcon.tsx
index 1382a55b549..9eef6066a0b 100644
--- a/airflow-core/src/airflow/ui/src/components/RunTypeIcon.tsx
+++ b/airflow-core/src/airflow/ui/src/components/RunTypeIcon.tsx
@@ -18,6 +18,7 @@
*/
import type { IconBaseProps } from "react-icons";
import { HiDatabase } from "react-icons/hi";
+import { HiLightningBolt } from "react-icons/hi";
import { MdPlayArrow, MdOutlineSchedule } from "react-icons/md";
import { RiArrowGoBackFill } from "react-icons/ri";
@@ -41,6 +42,8 @@ export const RunTypeIcon = ({ runType, ...rest }: Props) => {
return <RiArrowGoBackFill style={iconStyle} {...rest} />;
case "manual":
return <MdPlayArrow style={iconStyle} {...rest} />;
+ case "operator_triggered":
+ return <HiLightningBolt style={iconStyle} {...rest} />;
case "scheduled":
return <MdOutlineSchedule style={iconStyle} {...rest} />;
default:
diff --git a/airflow-core/src/airflow/utils/types.py
b/airflow-core/src/airflow/utils/types.py
index 391fbe5f71c..46fb18339e1 100644
--- a/airflow-core/src/airflow/utils/types.py
+++ b/airflow-core/src/airflow/utils/types.py
@@ -28,6 +28,7 @@ class DagRunType(str, enum.Enum):
BACKFILL_JOB = "backfill"
SCHEDULED = "scheduled"
MANUAL = "manual"
+ OPERATOR_TRIGGERED = "operator_triggered"
ASSET_TRIGGERED = "asset_triggered"
ASSET_MATERIALIZATION = "asset_materialization"
diff --git a/airflow-core/tests/unit/api/common/test_trigger_dag.py
b/airflow-core/tests/unit/api/common/test_trigger_dag.py
index b94b42e95af..49a385865f4 100644
--- a/airflow-core/tests/unit/api/common/test_trigger_dag.py
+++ b/airflow-core/tests/unit/api/common/test_trigger_dag.py
@@ -81,3 +81,22 @@ def test_trigger_dag_with_custom_run_type(dag_maker,
session):
)
assert dag_run.run_type == DagRunType.ASSET_MATERIALIZATION
+
+
+def test_trigger_dag_operator_denied_when_only_manual_allowed(dag_maker,
session):
+ with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *"):
+ EmptyOperator(task_id="mytask")
+ session.commit()
+ dag_model = session.scalar(select(DagModel).where(DagModel.dag_id ==
"TEST_DAG_1"))
+ dag_model.allowed_run_types = ["manual"]
+ session.commit()
+
+ with pytest.raises(
+ ValueError, match="Dag with dag_id: 'TEST_DAG_1' does not allow
operator_triggered runs"
+ ):
+ trigger_dag(
+ dag_id="TEST_DAG_1",
+ run_type=DagRunType.OPERATOR_TRIGGERED,
+ triggered_by=DagRunTriggeredByType.OPERATOR,
+ session=session,
+ )
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index a2910313951..8b1fa606694 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -60,6 +60,7 @@ class TestDagRunTrigger:
dag_run = session.scalars(select(DagRun).where(DagRun.run_id ==
run_id)).one()
assert dag_run.conf == {"key1": "value1"}
assert dag_run.logical_date == logical_date
+ assert dag_run.run_type == DagRunType.OPERATOR_TRIGGERED
def test_trigger_dag_run_with_partition_key(self, client, session,
dag_maker):
dag_id = "test_trigger_dag_run_partition_key"
@@ -130,7 +131,7 @@ class TestDagRunTrigger:
}
def test_trigger_dag_run_denied_run_type(self, client, session, dag_maker):
- """Test that a Dag with allowed_run_types excluding 'manual' cannot be
triggered."""
+ """Test that a Dag with denied operator run type cannot be
triggered."""
dag_id = "test_trigger_dag_run_denied"
run_id = "test_run_id"
logical_date = timezone.datetime(2025, 2, 20)
@@ -151,7 +152,34 @@ class TestDagRunTrigger:
assert response.status_code == 400
assert response.json() == {
"detail": {
- "message": f"Dag with dag_id '{dag_id}' does not allow manual
runs",
+ "message": f"Dag with dag_id '{dag_id}' does not allow
operator-triggered runs",
+ "reason": "denied_run_type",
+ }
+ }
+
+ def test_trigger_dag_run_manual_denied_for_operator(self, client, session,
dag_maker):
+ """Test that MANUAL-only allowed_run_types rejects operator-triggered
runs."""
+ dag_id = "test_trigger_dag_run_manual_allowed"
+ run_id = "test_run_id"
+ logical_date = timezone.datetime(2025, 2, 20)
+
+ with dag_maker(dag_id=dag_id, session=session, serialized=True):
+ EmptyOperator(task_id="test_task")
+
+ session.execute(
+ update(DagModel).where(DagModel.dag_id ==
dag_id).values(allowed_run_types=["manual"])
+ )
+ session.commit()
+
+ response = client.post(
+ f"/execution/dag-runs/{dag_id}/{run_id}",
+ json={"logical_date": logical_date.isoformat()},
+ )
+
+ assert response.status_code == 400
+ assert response.json() == {
+ "detail": {
+ "message": f"Dag with dag_id '{dag_id}' does not allow
operator-triggered runs",
"reason": "denied_run_type",
}
}
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 17aa78431a0..a0b32b7fbdd 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -395,6 +395,7 @@ class DagRunType(str, Enum):
BACKFILL = "backfill"
SCHEDULED = "scheduled"
MANUAL = "manual"
+ OPERATOR_TRIGGERED = "operator_triggered"
ASSET_TRIGGERED = "asset_triggered"
ASSET_MATERIALIZATION = "asset_materialization"
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index e08f00562d3..d45f8239306 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -143,6 +143,7 @@ class DagRunType(str, Enum):
BACKFILL = "backfill"
SCHEDULED = "scheduled"
MANUAL = "manual"
+ OPERATOR_TRIGGERED = "operator_triggered"
ASSET_TRIGGERED = "asset_triggered"
ASSET_MATERIALIZATION = "asset_materialization"