This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ca401d00cfd Fix CLI asset materialization to use correct dag run type
(#63815)
ca401d00cfd is described below
commit ca401d00cfd33d3ce81c212093d9f999ab9d1a57
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Mar 17 20:14:25 2026 +0530
Fix CLI asset materialization to use correct dag run type (#63815)
---
airflow-core/src/airflow/api/common/trigger_dag.py | 13 +++++++++----
.../src/airflow/cli/commands/asset_command.py | 8 ++++++--
.../tests/unit/api/common/test_trigger_dag.py | 22 +++++++++++++++++++++-
.../tests/unit/cli/commands/test_asset_command.py | 4 ++--
4 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/airflow-core/src/airflow/api/common/trigger_dag.py
b/airflow-core/src/airflow/api/common/trigger_dag.py
index 93e867fc5c8..3e8e1774ba2 100644
--- a/airflow-core/src/airflow/api/common/trigger_dag.py
+++ b/airflow-core/src/airflow/api/common/trigger_dag.py
@@ -44,6 +44,7 @@ def _trigger_dag(
dag_bag: DBDagBag,
*,
triggered_by: DagRunTriggeredByType,
+ run_type: DagRunType = DagRunType.MANUAL,
triggering_user_name: str | None = None,
run_after: datetime | None = None,
run_id: str | None = None,
@@ -60,6 +61,7 @@ def _trigger_dag(
:param dag_id: DAG ID
:param dag_bag: DAG Bag model
:param triggered_by: the entity which triggers the dag_run
+ :param run_type: the type of dag run (default: MANUAL)
:param triggering_user_name: the user name who triggers the dag_run
:param run_after: the datetime before which dag cannot run
:param run_id: ID of the run
@@ -95,7 +97,7 @@ def _trigger_dag(
data_interval = None
run_id = run_id or dag.timetable.generate_run_id(
- run_type=DagRunType.MANUAL,
+ run_type=run_type,
run_after=timezone.coerce_datetime(run_after),
data_interval=data_interval,
)
@@ -116,7 +118,7 @@ def _trigger_dag(
data_interval=data_interval,
run_after=run_after,
conf=run_conf,
- run_type=DagRunType.MANUAL,
+ run_type=run_type,
triggered_by=triggered_by,
triggering_user_name=triggering_user_name,
note=note,
@@ -133,6 +135,7 @@ def trigger_dag(
dag_id: str,
*,
triggered_by: DagRunTriggeredByType,
+ run_type: DagRunType = DagRunType.MANUAL,
triggering_user_name: str | None = None,
run_after: datetime | None = None,
run_id: str | None = None,
@@ -148,6 +151,7 @@ def trigger_dag(
:param dag_id: DAG ID
:param triggered_by: the entity which triggers the dag_run
+ :param run_type: the type of dag run (default: MANUAL)
:param triggering_user_name: the user name who triggers the dag_run
:param run_after: the datetime before which dag won't run
:param run_id: ID of the dag_run
@@ -161,14 +165,15 @@ def trigger_dag(
if dag_model is None:
raise DagNotFound(f"Dag id {dag_id} not found in DagModel")
- if dag_model.allowed_run_types is not None and DagRunType.MANUAL not in
dag_model.allowed_run_types:
- raise ValueError(f"Dag with dag_id: '{dag_id}' does not allow manual
runs")
+ if dag_model.allowed_run_types is not None and run_type not in
dag_model.allowed_run_types:
+ raise ValueError(f"Dag with dag_id: '{dag_id}' does not allow
{run_type} runs")
dagbag = DBDagBag()
dr = _trigger_dag(
dag_id=dag_id,
dag_bag=dagbag,
run_id=run_id,
+ run_type=run_type,
run_after=run_after or timezone.utcnow(),
conf=conf,
logical_date=logical_date,
diff --git a/airflow-core/src/airflow/cli/commands/asset_command.py
b/airflow-core/src/airflow/cli/commands/asset_command.py
index 3b4685a0d1b..15430b76488 100644
--- a/airflow-core/src/airflow/cli/commands/asset_command.py
+++ b/airflow-core/src/airflow/cli/commands/asset_command.py
@@ -31,7 +31,7 @@ from airflow.models.asset import AssetAliasModel, AssetModel,
TaskOutletAssetRef
from airflow.utils import cli as cli_utils
from airflow.utils.platform import getuser
from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.types import DagRunTriggeredByType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
if typing.TYPE_CHECKING:
from typing import Any
@@ -157,7 +157,11 @@ def asset_materialize(args, *, session: Session =
NEW_SESSION) -> None:
log.warning("Failed to get user name from os: %s, not setting the
triggering user", e)
user = None
dagrun = trigger_dag(
- dag_id=dag_id, triggered_by=DagRunTriggeredByType.CLI,
triggering_user_name=user, session=session
+ dag_id=dag_id,
+ triggered_by=DagRunTriggeredByType.CLI,
+ run_type=DagRunType.ASSET_MATERIALIZATION,
+ triggering_user_name=user,
+ session=session,
)
if dagrun is not None:
data = [DAGRunResponse.model_validate(dagrun).model_dump(mode="json")]
diff --git a/airflow-core/tests/unit/api/common/test_trigger_dag.py
b/airflow-core/tests/unit/api/common/test_trigger_dag.py
index 184be45ff89..b94b42e95af 100644
--- a/airflow-core/tests/unit/api/common/test_trigger_dag.py
+++ b/airflow-core/tests/unit/api/common/test_trigger_dag.py
@@ -22,7 +22,7 @@ from sqlalchemy import select
from airflow.api.common.trigger_dag import trigger_dag
from airflow.models import DagModel
from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.utils.types import DagRunTriggeredByType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.db import (
clear_db_dag_bundles,
@@ -61,3 +61,23 @@ def
test_trigger_dag_raises_error_if_manual_runs_denied(dag_maker, session):
triggered_by=DagRunTriggeredByType.REST_API,
session=session,
)
+
+
+def test_trigger_dag_with_custom_run_type(dag_maker, session):
+ """Test that trigger_dag accepts and uses custom run_type parameter."""
+ with dag_maker(session=session, dag_id="TEST_DAG_2", schedule=None):
+ EmptyOperator(task_id="mytask")
+ session.commit()
+
+ dag_model = session.scalar(select(DagModel).where(DagModel.dag_id ==
"TEST_DAG_2"))
+ dag_model.allowed_run_types = ["manual", "asset_materialization"]
+ session.commit()
+
+ dag_run = trigger_dag(
+ dag_id="TEST_DAG_2",
+ triggered_by=DagRunTriggeredByType.CLI,
+ run_type=DagRunType.ASSET_MATERIALIZATION,
+ session=session,
+ )
+
+ assert dag_run.run_type == DagRunType.ASSET_MATERIALIZATION
diff --git a/airflow-core/tests/unit/cli/commands/test_asset_command.py
b/airflow-core/tests/unit/cli/commands/test_asset_command.py
index d57b4bf15b7..7b17f2a5cea 100644
--- a/airflow-core/tests/unit/cli/commands/test_asset_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_asset_command.py
@@ -158,7 +158,7 @@ def test_cli_assets_materialize(mock_hasattr, parser:
ArgumentParser, stdout_cap
"last_scheduling_decision": None,
"note": None,
"partition_key": None,
- "run_type": "manual",
+ "run_type": "asset_materialization",
"start_date": None,
"state": "queued",
"triggered_by": "cli",
@@ -197,7 +197,7 @@ def
test_cli_assets_materialize_with_view_url_template(parser: ArgumentParser, s
"last_scheduling_decision": None,
"note": None,
"partition_key": None,
- "run_type": "manual",
+ "run_type": "asset_materialization",
"start_date": None,
"state": "queued",
"triggered_by": "cli",