This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 54a2a662d4e fix(timetable): fix failing to manually trigger a Dag with 
CronPartitionedTimetable (#62441)
54a2a662d4e is described below

commit 54a2a662d4e2551055bde578bdb2784ea39f8e84
Author: Wei Lee <[email protected]>
AuthorDate: Wed Feb 25 17:41:16 2026 +0900

    fix(timetable): fix failing to manually trigger a Dag with 
CronPartitionedTimetable (#62441)
---
 airflow-core/src/airflow/timetables/trigger.py       | 11 +++++------
 .../tests/unit/timetables/test_trigger_timetable.py  | 20 ++++++++++++++++++++
 2 files changed, 25 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/timetables/trigger.py 
b/airflow-core/src/airflow/timetables/trigger.py
index 854415e5722..67785a899d6 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -516,12 +516,11 @@ class CronPartitionTimetable(CronTriggerTimetable):
         data_interval: DataInterval | None,
         **extra,
     ) -> str:
-        components = [
-            run_after.isoformat(),
-            extra.get("partition_key"),
-            get_random_string(),
-        ]
-        return run_type.generate_run_id(suffix="__".join(components))
+        suffix = run_after.isoformat()
+        if partition_key := extra.get("partition_key"):
+            suffix = f"{suffix}__{partition_key}"
+        suffix = f"{suffix}__{get_random_string()}"
+        return run_type.generate_run_id(suffix=suffix)
 
     def next_run_info_from_dag_model(self, *, dag_model: DagModel) -> 
DagRunInfo:
         run_after = 
timezone.coerce_datetime(dag_model.next_dagrun_create_after)
diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py 
b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
index 8cd895c7dc1..19546459fdd 100644
--- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
@@ -31,10 +31,12 @@ from airflow.models import DagModel
 from airflow.sdk import CronPartitionTimetable
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
 from airflow.timetables.trigger import (
+    CronPartitionTimetable as CoreCronPartitionTimetable,
     CronTriggerTimetable,
     DeltaTriggerTimetable,
     MultipleCronTriggerTimetable,
 )
+from airflow.utils.types import DagRunType
 
 START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc)
 
@@ -809,3 +811,21 @@ def test_next_run_info_from_dag_model(schedule, 
partition_key, expected, dag_mak
     dm = dag_maker.dag_model
     info = 
dag_maker.serialized_dag.timetable.next_run_info_from_dag_model(dag_model=dm)
     assert info == expected
+
+
+def test_generate_run_id_without_partition_key() -> None:
+    """
+    Tests the generate_run_id method of CronPartitionTimetable.
+
+    generate_run_id shouldn't break even if when the run is manually trigger 
(partition_key might be missing).
+    """
+    cron_partitioned_timetabe = CoreCronPartitionTimetable(
+        "0 * * * *",
+        timezone=pendulum.UTC,
+    )
+    run_id = cron_partitioned_timetabe.generate_run_id(
+        run_type=DagRunType.MANUAL,
+        run_after=pendulum.DateTime(2025, 6, 7, 8, 9, tzinfo=pendulum.UTC),
+        data_interval=None,
+    )
+    assert run_id.startswith("manual__2025-06-07T08:09:00+00:00__")

Reply via email to