This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ca401d00cfd Fix CLI asset materialization to use correct dag run type 
(#63815)
ca401d00cfd is described below

commit ca401d00cfd33d3ce81c212093d9f999ab9d1a57
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Mar 17 20:14:25 2026 +0530

    Fix CLI asset materialization to use correct dag run type (#63815)
---
 airflow-core/src/airflow/api/common/trigger_dag.py | 13 +++++++++----
 .../src/airflow/cli/commands/asset_command.py      |  8 ++++++--
 .../tests/unit/api/common/test_trigger_dag.py      | 22 +++++++++++++++++++++-
 .../tests/unit/cli/commands/test_asset_command.py  |  4 ++--
 4 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/airflow-core/src/airflow/api/common/trigger_dag.py 
b/airflow-core/src/airflow/api/common/trigger_dag.py
index 93e867fc5c8..3e8e1774ba2 100644
--- a/airflow-core/src/airflow/api/common/trigger_dag.py
+++ b/airflow-core/src/airflow/api/common/trigger_dag.py
@@ -44,6 +44,7 @@ def _trigger_dag(
     dag_bag: DBDagBag,
     *,
     triggered_by: DagRunTriggeredByType,
+    run_type: DagRunType = DagRunType.MANUAL,
     triggering_user_name: str | None = None,
     run_after: datetime | None = None,
     run_id: str | None = None,
@@ -60,6 +61,7 @@ def _trigger_dag(
     :param dag_id: DAG ID
     :param dag_bag: DAG Bag model
     :param triggered_by: the entity which triggers the dag_run
+    :param run_type: the type of dag run (default: MANUAL)
     :param triggering_user_name: the user name who triggers the dag_run
     :param run_after: the datetime before which dag cannot run
     :param run_id: ID of the run
@@ -95,7 +97,7 @@ def _trigger_dag(
         data_interval = None
 
     run_id = run_id or dag.timetable.generate_run_id(
-        run_type=DagRunType.MANUAL,
+        run_type=run_type,
         run_after=timezone.coerce_datetime(run_after),
         data_interval=data_interval,
     )
@@ -116,7 +118,7 @@ def _trigger_dag(
         data_interval=data_interval,
         run_after=run_after,
         conf=run_conf,
-        run_type=DagRunType.MANUAL,
+        run_type=run_type,
         triggered_by=triggered_by,
         triggering_user_name=triggering_user_name,
         note=note,
@@ -133,6 +135,7 @@ def trigger_dag(
     dag_id: str,
     *,
     triggered_by: DagRunTriggeredByType,
+    run_type: DagRunType = DagRunType.MANUAL,
     triggering_user_name: str | None = None,
     run_after: datetime | None = None,
     run_id: str | None = None,
@@ -148,6 +151,7 @@ def trigger_dag(
 
     :param dag_id: DAG ID
     :param triggered_by: the entity which triggers the dag_run
+    :param run_type: the type of dag run (default: MANUAL)
     :param triggering_user_name: the user name who triggers the dag_run
     :param run_after: the datetime before which dag won't run
     :param run_id: ID of the dag_run
@@ -161,14 +165,15 @@ def trigger_dag(
     if dag_model is None:
         raise DagNotFound(f"Dag id {dag_id} not found in DagModel")
 
-    if dag_model.allowed_run_types is not None and DagRunType.MANUAL not in 
dag_model.allowed_run_types:
-        raise ValueError(f"Dag with dag_id: '{dag_id}' does not allow manual 
runs")
+    if dag_model.allowed_run_types is not None and run_type not in 
dag_model.allowed_run_types:
+        raise ValueError(f"Dag with dag_id: '{dag_id}' does not allow 
{run_type} runs")
 
     dagbag = DBDagBag()
     dr = _trigger_dag(
         dag_id=dag_id,
         dag_bag=dagbag,
         run_id=run_id,
+        run_type=run_type,
         run_after=run_after or timezone.utcnow(),
         conf=conf,
         logical_date=logical_date,
diff --git a/airflow-core/src/airflow/cli/commands/asset_command.py 
b/airflow-core/src/airflow/cli/commands/asset_command.py
index 3b4685a0d1b..15430b76488 100644
--- a/airflow-core/src/airflow/cli/commands/asset_command.py
+++ b/airflow-core/src/airflow/cli/commands/asset_command.py
@@ -31,7 +31,7 @@ from airflow.models.asset import AssetAliasModel, AssetModel, 
TaskOutletAssetRef
 from airflow.utils import cli as cli_utils
 from airflow.utils.platform import getuser
 from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.types import DagRunTriggeredByType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 if typing.TYPE_CHECKING:
     from typing import Any
@@ -157,7 +157,11 @@ def asset_materialize(args, *, session: Session = 
NEW_SESSION) -> None:
         log.warning("Failed to get user name from os: %s, not setting the 
triggering user", e)
         user = None
     dagrun = trigger_dag(
-        dag_id=dag_id, triggered_by=DagRunTriggeredByType.CLI, 
triggering_user_name=user, session=session
+        dag_id=dag_id,
+        triggered_by=DagRunTriggeredByType.CLI,
+        run_type=DagRunType.ASSET_MATERIALIZATION,
+        triggering_user_name=user,
+        session=session,
     )
     if dagrun is not None:
         data = [DAGRunResponse.model_validate(dagrun).model_dump(mode="json")]
diff --git a/airflow-core/tests/unit/api/common/test_trigger_dag.py 
b/airflow-core/tests/unit/api/common/test_trigger_dag.py
index 184be45ff89..b94b42e95af 100644
--- a/airflow-core/tests/unit/api/common/test_trigger_dag.py
+++ b/airflow-core/tests/unit/api/common/test_trigger_dag.py
@@ -22,7 +22,7 @@ from sqlalchemy import select
 from airflow.api.common.trigger_dag import trigger_dag
 from airflow.models import DagModel
 from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.utils.types import DagRunTriggeredByType
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 from tests_common.test_utils.db import (
     clear_db_dag_bundles,
@@ -61,3 +61,23 @@ def 
test_trigger_dag_raises_error_if_manual_runs_denied(dag_maker, session):
             triggered_by=DagRunTriggeredByType.REST_API,
             session=session,
         )
+
+
+def test_trigger_dag_with_custom_run_type(dag_maker, session):
+    """Test that trigger_dag accepts and uses custom run_type parameter."""
+    with dag_maker(session=session, dag_id="TEST_DAG_2", schedule=None):
+        EmptyOperator(task_id="mytask")
+    session.commit()
+
+    dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == 
"TEST_DAG_2"))
+    dag_model.allowed_run_types = ["manual", "asset_materialization"]
+    session.commit()
+
+    dag_run = trigger_dag(
+        dag_id="TEST_DAG_2",
+        triggered_by=DagRunTriggeredByType.CLI,
+        run_type=DagRunType.ASSET_MATERIALIZATION,
+        session=session,
+    )
+
+    assert dag_run.run_type == DagRunType.ASSET_MATERIALIZATION
diff --git a/airflow-core/tests/unit/cli/commands/test_asset_command.py 
b/airflow-core/tests/unit/cli/commands/test_asset_command.py
index d57b4bf15b7..7b17f2a5cea 100644
--- a/airflow-core/tests/unit/cli/commands/test_asset_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_asset_command.py
@@ -158,7 +158,7 @@ def test_cli_assets_materialize(mock_hasattr, parser: 
ArgumentParser, stdout_cap
         "last_scheduling_decision": None,
         "note": None,
         "partition_key": None,
-        "run_type": "manual",
+        "run_type": "asset_materialization",
         "start_date": None,
         "state": "queued",
         "triggered_by": "cli",
@@ -197,7 +197,7 @@ def 
test_cli_assets_materialize_with_view_url_template(parser: ArgumentParser, s
         "last_scheduling_decision": None,
         "note": None,
         "partition_key": None,
-        "run_type": "manual",
+        "run_type": "asset_materialization",
         "start_date": None,
         "state": "queued",
         "triggered_by": "cli",

Reply via email to