This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 54a2a662d4e fix(timetable): fix failing to manually trigger a Dag with
CronPartitionedTimetable (#62441)
54a2a662d4e is described below
commit 54a2a662d4e2551055bde578bdb2784ea39f8e84
Author: Wei Lee <[email protected]>
AuthorDate: Wed Feb 25 17:41:16 2026 +0900
fix(timetable): fix failing to manually trigger a Dag with
CronPartitionedTimetable (#62441)
---
airflow-core/src/airflow/timetables/trigger.py | 11 +++++------
.../tests/unit/timetables/test_trigger_timetable.py | 20 ++++++++++++++++++++
2 files changed, 25 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/timetables/trigger.py
b/airflow-core/src/airflow/timetables/trigger.py
index 854415e5722..67785a899d6 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -516,12 +516,11 @@ class CronPartitionTimetable(CronTriggerTimetable):
data_interval: DataInterval | None,
**extra,
) -> str:
- components = [
- run_after.isoformat(),
- extra.get("partition_key"),
- get_random_string(),
- ]
- return run_type.generate_run_id(suffix="__".join(components))
+ suffix = run_after.isoformat()
+ if partition_key := extra.get("partition_key"):
+ suffix = f"{suffix}__{partition_key}"
+ suffix = f"{suffix}__{get_random_string()}"
+ return run_type.generate_run_id(suffix=suffix)
def next_run_info_from_dag_model(self, *, dag_model: DagModel) ->
DagRunInfo:
run_after =
timezone.coerce_datetime(dag_model.next_dagrun_create_after)
diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
index 8cd895c7dc1..19546459fdd 100644
--- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
@@ -31,10 +31,12 @@ from airflow.models import DagModel
from airflow.sdk import CronPartitionTimetable
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
from airflow.timetables.trigger import (
+ CronPartitionTimetable as CoreCronPartitionTimetable,
CronTriggerTimetable,
DeltaTriggerTimetable,
MultipleCronTriggerTimetable,
)
+from airflow.utils.types import DagRunType
START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc)
@@ -809,3 +811,21 @@ def test_next_run_info_from_dag_model(schedule,
partition_key, expected, dag_mak
dm = dag_maker.dag_model
info =
dag_maker.serialized_dag.timetable.next_run_info_from_dag_model(dag_model=dm)
assert info == expected
+
+
+def test_generate_run_id_without_partition_key() -> None:
+ """
+ Tests the generate_run_id method of CronPartitionTimetable.
+
+ generate_run_id shouldn't break even if when the run is manually trigger
(partition_key might be missing).
+ """
+ cron_partitioned_timetabe = CoreCronPartitionTimetable(
+ "0 * * * *",
+ timezone=pendulum.UTC,
+ )
+ run_id = cron_partitioned_timetabe.generate_run_id(
+ run_type=DagRunType.MANUAL,
+ run_after=pendulum.DateTime(2025, 6, 7, 8, 9, tzinfo=pendulum.UTC),
+ data_interval=None,
+ )
+ assert run_id.startswith("manual__2025-06-07T08:09:00+00:00__")