This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c28b21178b9 Fix custom timetable generate_run_id not called for manual 
triggers (#56373)
c28b21178b9 is described below

commit c28b21178b9c8c299a8ec5c74eff39d7e1da99b9
Author: Nils Werner <[email protected]>
AuthorDate: Thu Oct 16 01:51:23 2025 -0400

    Fix custom timetable generate_run_id not called for manual triggers (#56373)
---
 airflow-core/src/airflow/api/common/trigger_dag.py |  4 +-
 .../api_fastapi/core_api/datamodels/dag_run.py     | 17 ++---
 .../core_api/routes/public/test_dag_run.py         | 74 ++++++++++++++++++++++
 3 files changed, 80 insertions(+), 15 deletions(-)

diff --git a/airflow-core/src/airflow/api/common/trigger_dag.py 
b/airflow-core/src/airflow/api/common/trigger_dag.py
index fa793d221ec..614140b58c4 100644
--- a/airflow-core/src/airflow/api/common/trigger_dag.py
+++ b/airflow-core/src/airflow/api/common/trigger_dag.py
@@ -92,10 +92,10 @@ def _trigger_dag(
     else:
         data_interval = None
 
-    run_id = run_id or DagRun.generate_run_id(
+    run_id = run_id or dag.timetable.generate_run_id(
         run_type=DagRunType.MANUAL,
-        logical_date=coerced_logical_date,
         run_after=timezone.coerce_datetime(run_after),
+        data_interval=data_interval,
     )
 
     # This intentionally does not use 'session' in the current scope because it
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 2c1818ae067..24a64987019 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -26,7 +26,6 @@ from pydantic import AliasPath, AwareDatetime, Field, 
NonNegativeInt, model_vali
 from airflow._shared.timezones import timezone
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
 from airflow.api_fastapi.core_api.datamodels.dag_versions import 
DagVersionResponse
-from airflow.models import DagRun
 from airflow.timetables.base import DataInterval
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -129,10 +128,10 @@ class TriggerDAGRunPostBody(StrictBaseModel):
                 )
                 run_after = data_interval.end
 
-        run_id = self.dag_run_id or DagRun.generate_run_id(
-            run_type=DagRunType.SCHEDULED,
-            logical_date=coerced_logical_date,
-            run_after=run_after,
+        run_id = self.dag_run_id or dag.timetable.generate_run_id(
+            run_type=DagRunType.MANUAL,
+            run_after=timezone.coerce_datetime(run_after),
+            data_interval=data_interval,
         )
         return {
             "run_id": run_id,
@@ -143,14 +142,6 @@ class TriggerDAGRunPostBody(StrictBaseModel):
             "note": self.note,
         }
 
-    @model_validator(mode="after")
-    def validate_dag_run_id(self):
-        if not self.dag_run_id:
-            self.dag_run_id = DagRun.generate_run_id(
-                run_type=DagRunType.MANUAL, logical_date=self.logical_date, 
run_after=self.run_after
-            )
-        return self
-
 
 class DAGRunsBatchBody(StrictBaseModel):
     """List DAG Runs body for batch endpoint."""
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 4f36edd1550..eee3faa73f3 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -33,6 +33,7 @@ from airflow.models.asset import AssetEvent, AssetModel
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions.asset import Asset
 from airflow.sdk.definitions.param import Param
+from airflow.timetables.interval import CronDataIntervalTimetable
 from airflow.utils.session import provide_session
 from airflow.utils.state import DagRunState, State
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -50,9 +51,44 @@ from tests_common.test_utils.format_datetime import 
from_datetime_to_zulu, from_
 
 if TYPE_CHECKING:
     from airflow.models.dag_version import DagVersion
+    from airflow.timetables.base import DataInterval
 
 pytestmark = pytest.mark.db_test
 
+
+class CustomTimetable(CronDataIntervalTimetable):
+    """Custom timetable that generates custom run IDs."""
+
+    def generate_run_id(
+        self,
+        *,
+        run_type: DagRunType,
+        run_after,
+        data_interval: DataInterval | None,
+        **kwargs,
+    ) -> str:
+        if data_interval:
+            return f"custom_{data_interval.start.strftime('%Y%m%d%H%M%S')}"
+        return f"custom_manual_{run_after.strftime('%Y%m%d%H%M%S')}"
+
+
[email protected]
+def custom_timetable_plugin(monkeypatch):
+    """Fixture to register CustomTimetable for serialization."""
+    from airflow import plugins_manager
+    from airflow.utils.module_loading import qualname
+
+    timetable_class_name = qualname(CustomTimetable)
+    existing_timetables = getattr(plugins_manager, "timetable_classes", None) 
or {}
+
+    monkeypatch.setattr(plugins_manager, "initialize_timetables_plugins", 
lambda: None)
+    monkeypatch.setattr(
+        plugins_manager,
+        "timetable_classes",
+        {**existing_timetables, timetable_class_name: CustomTimetable},
+    )
+
+
 DAG1_ID = "test_dag1"
 DAG1_DISPLAY_NAME = "test_dag1"
 DAG2_ID = "test_dag2"
@@ -1827,6 +1863,44 @@ class TestTriggerDagRun:
             "note": None,
         }
 
+    @time_machine.travel("2025-10-02 12:00:00", tick=False)
+    @pytest.mark.usefixtures("custom_timetable_plugin")
+    def test_custom_timetable_generate_run_id_for_manual_trigger(self, 
dag_maker, test_client, session):
+        """Test that custom timetable's generate_run_id is used for manual 
triggers (issue #55908)."""
+        custom_dag_id = "test_custom_timetable_dag"
+        with dag_maker(
+            dag_id=custom_dag_id,
+            schedule=CustomTimetable("0 0 * * *", timezone="UTC"),
+            session=session,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="test_task")
+
+        session.commit()
+
+        logical_date = datetime(2025, 10, 1, 0, 0, 0, tzinfo=timezone.utc)
+        response = test_client.post(
+            f"/dags/{custom_dag_id}/dagRuns",
+            json={"logical_date": logical_date.isoformat()},
+        )
+        assert response.status_code == 200
+        run_id_with_logical_date = response.json()["dag_run_id"]
+        assert run_id_with_logical_date.startswith("custom_")
+
+        run = session.query(DagRun).filter(DagRun.run_id == 
run_id_with_logical_date).one()
+        assert run.dag_id == custom_dag_id
+
+        response = test_client.post(
+            f"/dags/{custom_dag_id}/dagRuns",
+            json={"logical_date": None},
+        )
+        assert response.status_code == 200
+        run_id_without_logical_date = response.json()["dag_run_id"]
+        assert run_id_without_logical_date.startswith("custom_manual_")
+
+        run = session.query(DagRun).filter(DagRun.run_id == 
run_id_without_logical_date).one()
+        assert run.dag_id == custom_dag_id
+
 
 class TestWaitDagRun:
     # The way we init async engine does not work well with FastAPI app init.

Reply via email to