This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c28b21178b9 Fix custom timetable generate_run_id not called for manual
triggers (#56373)
c28b21178b9 is described below
commit c28b21178b9c8c299a8ec5c74eff39d7e1da99b9
Author: Nils Werner <[email protected]>
AuthorDate: Thu Oct 16 01:51:23 2025 -0400
Fix custom timetable generate_run_id not called for manual triggers (#56373)
---
airflow-core/src/airflow/api/common/trigger_dag.py | 4 +-
.../api_fastapi/core_api/datamodels/dag_run.py | 17 ++---
.../core_api/routes/public/test_dag_run.py | 74 ++++++++++++++++++++++
3 files changed, 80 insertions(+), 15 deletions(-)
diff --git a/airflow-core/src/airflow/api/common/trigger_dag.py
b/airflow-core/src/airflow/api/common/trigger_dag.py
index fa793d221ec..614140b58c4 100644
--- a/airflow-core/src/airflow/api/common/trigger_dag.py
+++ b/airflow-core/src/airflow/api/common/trigger_dag.py
@@ -92,10 +92,10 @@ def _trigger_dag(
else:
data_interval = None
- run_id = run_id or DagRun.generate_run_id(
+ run_id = run_id or dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL,
- logical_date=coerced_logical_date,
run_after=timezone.coerce_datetime(run_after),
+ data_interval=data_interval,
)
# This intentionally does not use 'session' in the current scope because it
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 2c1818ae067..24a64987019 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -26,7 +26,6 @@ from pydantic import AliasPath, AwareDatetime, Field,
NonNegativeInt, model_vali
from airflow._shared.timezones import timezone
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.api_fastapi.core_api.datamodels.dag_versions import
DagVersionResponse
-from airflow.models import DagRun
from airflow.timetables.base import DataInterval
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -129,10 +128,10 @@ class TriggerDAGRunPostBody(StrictBaseModel):
)
run_after = data_interval.end
- run_id = self.dag_run_id or DagRun.generate_run_id(
- run_type=DagRunType.SCHEDULED,
- logical_date=coerced_logical_date,
- run_after=run_after,
+ run_id = self.dag_run_id or dag.timetable.generate_run_id(
+ run_type=DagRunType.MANUAL,
+ run_after=timezone.coerce_datetime(run_after),
+ data_interval=data_interval,
)
return {
"run_id": run_id,
@@ -143,14 +142,6 @@ class TriggerDAGRunPostBody(StrictBaseModel):
"note": self.note,
}
- @model_validator(mode="after")
- def validate_dag_run_id(self):
- if not self.dag_run_id:
- self.dag_run_id = DagRun.generate_run_id(
- run_type=DagRunType.MANUAL, logical_date=self.logical_date,
run_after=self.run_after
- )
- return self
-
class DAGRunsBatchBody(StrictBaseModel):
"""List DAG Runs body for batch endpoint."""
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 4f36edd1550..eee3faa73f3 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -33,6 +33,7 @@ from airflow.models.asset import AssetEvent, AssetModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.param import Param
+from airflow.timetables.interval import CronDataIntervalTimetable
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -50,9 +51,44 @@ from tests_common.test_utils.format_datetime import
from_datetime_to_zulu, from_
if TYPE_CHECKING:
from airflow.models.dag_version import DagVersion
+ from airflow.timetables.base import DataInterval
pytestmark = pytest.mark.db_test
+
+class CustomTimetable(CronDataIntervalTimetable):
+ """Custom timetable that generates custom run IDs."""
+
+ def generate_run_id(
+ self,
+ *,
+ run_type: DagRunType,
+ run_after,
+ data_interval: DataInterval | None,
+ **kwargs,
+ ) -> str:
+ if data_interval:
+ return f"custom_{data_interval.start.strftime('%Y%m%d%H%M%S')}"
+ return f"custom_manual_{run_after.strftime('%Y%m%d%H%M%S')}"
+
+
[email protected]
+def custom_timetable_plugin(monkeypatch):
+ """Fixture to register CustomTimetable for serialization."""
+ from airflow import plugins_manager
+ from airflow.utils.module_loading import qualname
+
+ timetable_class_name = qualname(CustomTimetable)
+ existing_timetables = getattr(plugins_manager, "timetable_classes", None)
or {}
+
+ monkeypatch.setattr(plugins_manager, "initialize_timetables_plugins",
lambda: None)
+ monkeypatch.setattr(
+ plugins_manager,
+ "timetable_classes",
+ {**existing_timetables, timetable_class_name: CustomTimetable},
+ )
+
+
DAG1_ID = "test_dag1"
DAG1_DISPLAY_NAME = "test_dag1"
DAG2_ID = "test_dag2"
@@ -1827,6 +1863,44 @@ class TestTriggerDagRun:
"note": None,
}
+ @time_machine.travel("2025-10-02 12:00:00", tick=False)
+ @pytest.mark.usefixtures("custom_timetable_plugin")
+ def test_custom_timetable_generate_run_id_for_manual_trigger(self,
dag_maker, test_client, session):
+ """Test that custom timetable's generate_run_id is used for manual
triggers (issue #55908)."""
+ custom_dag_id = "test_custom_timetable_dag"
+ with dag_maker(
+ dag_id=custom_dag_id,
+ schedule=CustomTimetable("0 0 * * *", timezone="UTC"),
+ session=session,
+ serialized=True,
+ ):
+ EmptyOperator(task_id="test_task")
+
+ session.commit()
+
+ logical_date = datetime(2025, 10, 1, 0, 0, 0, tzinfo=timezone.utc)
+ response = test_client.post(
+ f"/dags/{custom_dag_id}/dagRuns",
+ json={"logical_date": logical_date.isoformat()},
+ )
+ assert response.status_code == 200
+ run_id_with_logical_date = response.json()["dag_run_id"]
+ assert run_id_with_logical_date.startswith("custom_")
+
+ run = session.query(DagRun).filter(DagRun.run_id ==
run_id_with_logical_date).one()
+ assert run.dag_id == custom_dag_id
+
+ response = test_client.post(
+ f"/dags/{custom_dag_id}/dagRuns",
+ json={"logical_date": None},
+ )
+ assert response.status_code == 200
+ run_id_without_logical_date = response.json()["dag_run_id"]
+ assert run_id_without_logical_date.startswith("custom_manual_")
+
+ run = session.query(DagRun).filter(DagRun.run_id ==
run_id_without_logical_date).one()
+ assert run.dag_id == custom_dag_id
+
class TestWaitDagRun:
# The way we init async engine does not work well with FastAPI app init.