This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 230293c2c91 Implement partition-driven scheduling on a dag (i.e. not 
asset-driven) (#59115)
230293c2c91 is described below

commit 230293c2c91e31f8a7e0c29286c36b0d90c9b4a3
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Jan 29 23:38:18 2026 -0800

    Implement partition-driven scheduling on a dag (i.e. not asset-driven) 
(#59115)
---
 airflow-core/src/airflow/api/common/mark_tasks.py  |   7 +-
 .../api_fastapi/core_api/services/ui/calendar.py   |   7 +
 .../api_fastapi/execution_api/routes/dag_runs.py   |   2 +
 .../src/airflow/cli/commands/dag_command.py        |   9 +-
 .../src/airflow/dag_processing/collection.py       |  60 +++++-
 .../src/airflow/jobs/scheduler_job_runner.py       | 118 +++++++-----
 airflow-core/src/airflow/models/backfill.py        |  11 +-
 airflow-core/src/airflow/models/dag.py             |  43 +++--
 .../src/airflow/serialization/definitions/dag.py   |  46 +++--
 airflow-core/src/airflow/timetables/base.py        |  75 +++++++-
 airflow-core/src/airflow/timetables/trigger.py     | 209 +++++++++++++++++++-
 .../tests/unit/cli/commands/test_dag_command.py    | 138 ++++++++------
 airflow-core/tests/unit/jobs/test_scheduler_job.py |  63 ++++++-
 airflow-core/tests/unit/models/test_dag.py         |  96 +++++++---
 airflow-core/tests/unit/models/test_dagrun.py      |   2 +-
 .../tests/unit/timetables/test_events_timetable.py |   4 +-
 .../unit/timetables/test_trigger_timetable.py      | 210 +++++++++++++++++++++
 .../unit/timetables/test_workday_timetable.py      |   7 +-
 devel-common/src/tests_common/pytest_plugin.py     |  47 ++++-
 task-sdk/src/airflow/sdk/bases/timetable.py        |   2 +
 .../airflow/sdk/definitions/timetables/trigger.py  |  12 +-
 21 files changed, 959 insertions(+), 209 deletions(-)

diff --git a/airflow-core/src/airflow/api/common/mark_tasks.py 
b/airflow-core/src/airflow/api/common/mark_tasks.py
index 99b8e47e74f..09fa9cd5c5e 100644
--- a/airflow-core/src/airflow/api/common/mark_tasks.py
+++ b/airflow-core/src/airflow/api/common/mark_tasks.py
@@ -70,6 +70,9 @@ def set_state(
     :param commit: Commit tasks to be altered to the database
     :param session: database session
     :return: list of tasks that have been created and updated
+
+    TODO: "past" and "future" params currently depend on logical date, which 
is not always populated.
+      we might want to just deprecate these options.  Or alter them to do 
*something* in that case.
     """
     if not tasks:
         return []
@@ -183,7 +186,9 @@ def get_run_ids(dag: SerializedDAG, run_id: str, future: 
bool, past: bool, sessi
         run_ids = [run_id]
     else:
         dates = [
-            info.logical_date for info in 
dag.iter_dagrun_infos_between(start_date, end_date, align=False)
+            info.logical_date
+            for info in dag.iter_dagrun_infos_between(start_date, end_date, 
align=False)
+            if info.logical_date  # todo: AIP-76 this will not find anything 
where logical date is null
         ]
         run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, 
logical_date=dates, session=session)]
     return run_ids
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/calendar.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/calendar.py
index 0f00af412c6..fbcd0b0ab32 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/calendar.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/calendar.py
@@ -229,11 +229,18 @@ class CalendarService:
 
             if curr_info is None:  # No more DAG runs to schedule
                 break
+            if not curr_info.logical_date:
+                # todo: AIP-76 this is likely a partitioned dag. needs 
implementation
+                break
             if curr_info.logical_date <= prev_logical_date:  # Timetable not 
progressing, stopping
                 break
             if curr_info.logical_date.year != year:  # Crossed year boundary
                 break
 
+            if not curr_info.data_interval:
+                # todo: AIP-76 this is likely a partitioned dag. needs 
implementation
+                break
+
             if not self._is_date_in_range(curr_info.logical_date, 
logical_date):
                 last_data_interval = curr_info.data_interval
                 prev_logical_date = curr_info.logical_date
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 56fd79b825e..7763850b5ee 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -113,6 +113,8 @@ def trigger_dag_run(
         )
 
     try:
+        # todo: AIP-76 add partition key here
+        #  https://github.com/apache/airflow/issues/61075
         trigger_dag(
             dag_id=dag_id,
             run_id=run_id,
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py 
b/airflow-core/src/airflow/cli/commands/dag_command.py
index 2c3897941d4..8bab86a4966 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -44,6 +44,7 @@ from airflow.models import DagModel, DagRun, TaskInstance
 from airflow.models.dag import get_next_data_interval
 from airflow.models.errors import ParseImportError
 from airflow.models.serialized_dag import SerializedDagModel
+from airflow.timetables.base import TimeRestriction
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import get_bagged_dag, suppress_logs_and_warning, 
validate_dag_bundle_arg
 from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
@@ -312,6 +313,9 @@ def dag_next_execution(args) -> None:
 
     >>> airflow dags next-execution tutorial
     2018-08-31 10:38:00
+
+    # todo: AIP-76 determine what next execution should do for 
partition-driven dags
+    #  https://github.com/apache/airflow/issues/61076
     """
     from airflow.models.serialized_dag import SerializedDagModel
 
@@ -342,7 +346,10 @@ def dag_next_execution(args) -> None:
     print_execution_interval(next_interval)
 
     for _ in range(1, args.num_executions):
-        next_info = dag.next_dagrun_info(next_interval, restricted=False)
+        next_info = dag.timetable.next_dagrun_info(
+            last_automated_data_interval=next_interval,
+            restriction=TimeRestriction(earliest=None, latest=None, 
catchup=True),
+        )
         next_interval = None if next_info is None else next_info.data_interval
         print_execution_interval(next_interval)
 
diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index e8720b4df34..f5ad10196b4 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -49,7 +49,7 @@ from airflow.models.asset import (
     TaskInletAssetReference,
     TaskOutletAssetReference,
 )
-from airflow.models.dag import DagModel, DagOwnerAttributes, DagTag, 
get_run_data_interval
+from airflow.models.dag import DagModel, DagOwnerAttributes, DagTag
 from airflow.models.dagrun import DagRun
 from airflow.models.dagwarning import DagWarningType
 from airflow.models.errors import ParseImportError
@@ -63,6 +63,7 @@ from airflow.serialization.definitions.assets import (
 from airflow.serialization.definitions.dag import SerializedDAG
 from airflow.serialization.enums import Encoding
 from airflow.serialization.serialized_objects import BaseSerialization, 
LazyDeserializedDAG
+from airflow.timetables.trigger import CronPartitionTimetable
 from airflow.triggers.base import BaseEventTrigger
 from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries
 from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks
@@ -129,6 +130,39 @@ def _get_latest_runs_stmt(dag_id: str) -> Select:
     )
 
 
+def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select:
+    """Build a select statement to retrieve the last partitioned run for each 
Dag."""
+    # todo: AIP-76 we should add a partition date field
+    latest_run_id = (
+        select(DagRun.id)
+        .where(
+            DagRun.dag_id == dag_id,
+            DagRun.run_type.in_(
+                (
+                    DagRunType.BACKFILL_JOB,
+                    DagRunType.SCHEDULED,
+                )
+            ),
+            DagRun.partition_key.is_not(None),
+        )
+        .order_by(DagRun.id.desc())  # todo: AIP-76 add partition date and 
sort by it here
+        .limit(1)
+        .scalar_subquery()
+    )
+    return (
+        select(DagRun)
+        .where(DagRun.id == latest_run_id)
+        .options(
+            load_only(
+                DagRun.dag_id,
+                DagRun.logical_date,
+                DagRun.data_interval_start,
+                DagRun.data_interval_end,
+            )
+        )
+    )
+
+
 class _RunInfo(NamedTuple):
     latest_run: DagRun | None
     num_active_runs: int
@@ -144,7 +178,23 @@ class _RunInfo(NamedTuple):
         if not dag.timetable.can_be_scheduled:
             return cls(None, 0)
 
-        latest_run = session.scalar(_get_latest_runs_stmt(dag_id=dag.dag_id))
+        # todo: AIP-76 what's a more general way to detect?
+        #  https://github.com/apache/airflow/issues/61086
+        if isinstance(dag.timetable, CronPartitionTimetable):
+            log.info("getting latest run for partitioned dag", 
dag_id=dag.dag_id)
+            latest_run = 
session.scalar(_get_latest_runs_stmt_partitioned(dag_id=dag.dag_id))
+        else:
+            log.info("getting latest run for non-partitioned dag", 
dag_id=dag.dag_id)
+            latest_run = 
session.scalar(_get_latest_runs_stmt(dag_id=dag.dag_id))
+        if latest_run:
+            log.info(
+                "got latest run",
+                dag_id=dag.dag_id,
+                logical_date=str(latest_run.logical_date),
+                partition_key=latest_run.partition_key,
+            )
+        else:
+            log.info("no latest run found", dag_id=dag.dag_id)
         active_run_counts = DagRun.active_runs_of_dags(
             dag_ids=[dag.dag_id],
             exclude_backfill=True,
@@ -551,12 +601,8 @@ class DagModelOperation(NamedTuple):
             dm.bundle_version = self.bundle_version
 
             last_automated_run: DagRun | None = run_info.latest_run
-            if last_automated_run is None:
-                last_automated_data_interval = None
-            else:
-                last_automated_data_interval = 
get_run_data_interval(dag.timetable, last_automated_run)
             dm.exceeds_max_non_backfill = run_info.num_active_runs >= 
dm.max_active_runs
-            dm.calculate_dagrun_date_fields(dag, last_automated_data_interval)
+            dm.calculate_dagrun_date_fields(dag, 
last_automated_run=last_automated_run)
             if not dag.timetable.asset_condition:
                 dm.schedule_asset_references = []
                 dm.schedule_asset_alias_references = []
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index e13d7c2c698..0dc18ca696a 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -85,7 +85,7 @@ from airflow.models.asset import (
 )
 from airflow.models.backfill import Backfill
 from airflow.models.callback import Callback
-from airflow.models.dag import DagModel, get_next_data_interval
+from airflow.models.dag import DagModel
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagbag import DBDagBag
 from airflow.models.dagbundle import DagBundleModel
@@ -1897,24 +1897,31 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         # as DagModel.dag_id and DagModel.next_dagrun
         # This list is used to verify if the DagRun already exist so that we 
don't attempt to create
         # duplicate DagRuns
-        existing_dagruns = (
-            session.execute(
-                select(DagRun.dag_id, DagRun.logical_date).where(
+        existing_dagrun_objects = (
+            session.scalars(
+                select(DagRun)
+                .where(
                     tuple_(DagRun.dag_id, DagRun.logical_date).in_(
                         (dm.dag_id, dm.next_dagrun) for dm in dag_models
-                    ),
+                    )
                 )
+                .options(load_only(DagRun.dag_id, DagRun.logical_date))
             )
             .unique()
             .all()
         )
+        existing_dagruns = {(x.dag_id, x.logical_date): x for x in 
existing_dagrun_objects}
+
+        # todo: AIP-76 we may want to update check existing to also check 
partitioned dag runs,
+        #  but the thing is, there is not actually a restriction that
+        #  we don't create new runs with the same partition key
+        #  so it's unclear whether we should / need to.
 
         # backfill runs are not created by scheduler and their concurrency is 
separate
         # so we exclude them here
-        dag_ids = (dm.dag_id for dm in dag_models)
         active_runs_of_dags = Counter(
             DagRun.active_runs_of_dags(
-                dag_ids=dag_ids,
+                dag_ids=(dm.dag_id for dm in dag_models),
                 exclude_backfill=True,
                 session=session,
             )
@@ -1944,58 +1951,69 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
             serdag = self._get_current_dag(dag_id=dag_model.dag_id, 
session=session)
             if not serdag:
-                self.log.error("DAG '%s' not found in serialized_dag table", 
dag_model.dag_id)
+                self.log.error("Dag not found in serialized_dag table", 
dag_id=dag_model.dag_id)
                 continue
 
-            data_interval = get_next_data_interval(serdag.timetable, dag_model)
             # Explicitly check if the DagRun already exists. This is an edge 
case
             # where a Dag Run is created but `DagModel.next_dagrun` and 
`DagModel.next_dagrun_create_after`
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to set DagModel.next_dagrun_info if the DagRun already 
exists or if we
-            # create a new one. This is so that in the next scheduling loop we 
try to create new runs
-            # instead of falling in a loop of IntegrityError.
-            if (serdag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
-                try:
-                    serdag.create_dagrun(
-                        run_id=serdag.timetable.generate_run_id(
-                            run_type=DagRunType.SCHEDULED,
-                            
run_after=timezone.coerce_datetime(dag_model.next_dagrun),
-                            data_interval=data_interval,
-                        ),
-                        logical_date=dag_model.next_dagrun,
-                        data_interval=data_interval,
-                        run_after=dag_model.next_dagrun_create_after,
-                        run_type=DagRunType.SCHEDULED,
-                        triggered_by=DagRunTriggeredByType.TIMETABLE,
-                        state=DagRunState.QUEUED,
-                        creating_job_id=self.job.id,
-                        session=session,
-                    )
-                    active_runs_of_dags[serdag.dag_id] += 1
-
-                # Exceptions like ValueError, ParamValidationError, etc. are 
raised by
-                # DagModel.create_dagrun() when dag is misconfigured. The 
scheduler should not
-                # crash due to misconfigured dags. We should log any exception 
encountered
-                # and continue to the next serdag.
-                except Exception:
-                    self.log.exception("Failed creating DagRun for %s", 
serdag.dag_id)
-                    # todo: if you get a database error here, continuing does 
not work because
-                    #  session needs rollback. you need either to make smaller 
transactions and
-                    #  commit after every dag run or use savepoints.
-                    #  https://github.com/apache/airflow/issues/59120
-                    continue
+            if dr := existing_dagruns.get((dag_model.dag_id, 
dag_model.next_dagrun)):
+                self.log.warning(
+                    "run already exists; skipping dagrun creation",
+                    dag_id=dag_model.dag_id,
+                    logical_date=dag_model.next_dagrun,
+                )
+                dag_model.calculate_dagrun_date_fields(dag=serdag, 
last_automated_run=dr)
+                continue
 
-            self._set_exceeds_max_active_runs(
-                dag_model=dag_model,
-                session=session,
-                active_non_backfill_runs=active_runs_of_dags[serdag.dag_id],
-            )
-            dag_model.calculate_dagrun_date_fields(dag=serdag, 
last_automated_dag_run=data_interval)
+            try:
+                next_info = 
serdag.timetable.next_run_info_from_dag_model(dag_model=dag_model)
+                data_interval = next_info.data_interval
+                logical_date = next_info.logical_date
+                partition_key = next_info.partition_key
+                run_after = next_info.run_after
+                # todo: AIP-76 partition date is not passed to dag run
+                #  See https://github.com/apache/airflow/issues/61167.
+                created_run = serdag.create_dagrun(
+                    run_id=serdag.timetable.generate_run_id(
+                        run_type=DagRunType.SCHEDULED,
+                        run_after=run_after,
+                        data_interval=data_interval,
+                        partition_key=partition_key,
+                    ),
+                    logical_date=logical_date,
+                    data_interval=data_interval,
+                    run_after=run_after,
+                    run_type=DagRunType.SCHEDULED,
+                    triggered_by=DagRunTriggeredByType.TIMETABLE,
+                    state=DagRunState.QUEUED,
+                    creating_job_id=self.job.id,
+                    session=session,
+                    partition_key=partition_key,
+                )
+                active_runs_of_dags[dag_model.dag_id] += 1
+                dag_model.calculate_dagrun_date_fields(dag=serdag, 
last_automated_run=created_run)
+                self._set_exceeds_max_active_runs(
+                    dag_model=dag_model,
+                    session=session,
+                    
active_non_backfill_runs=active_runs_of_dags[dag_model.dag_id],
+                )
 
-        # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
-        #  memory for larger dags? or expunge_all()
+            # Exceptions like ValueError, ParamValidationError, etc. are 
raised by
+            # DagModel.create_dagrun() when dag is misconfigured. The 
scheduler should not
+            # crash due to misconfigured dags. We should log any exception 
encountered
+            # and continue to the next serdag.
+            except Exception:
+                self.log.exception("Failed creating DagRun", 
dag_id=dag_model.dag_id)
+                # todo: if you get a database error here, continuing does not 
work because
+                #  session needs rollback. you need either to make smaller 
transactions and
+                #  commit after every dag run or use savepoints.
+                #  https://github.com/apache/airflow/issues/59120
+
+            # TODO[HA]: Should we do a session.flush() so we don't have to 
keep lots of state/object in
+            #  memory for larger dags? or expunge_all()
 
     def _create_dag_runs_asset_triggered(
         self,
diff --git a/airflow-core/src/airflow/models/backfill.py 
b/airflow-core/src/airflow/models/backfill.py
index 33df6901d40..4a79acde307 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -286,7 +286,6 @@ def _do_dry_run(
         raise DagNotFound(f"Could not find dag {dag_id}")
     dag = serdag.dag
     _validate_backfill_params(dag, reverse, from_date, to_date, 
reprocess_behavior)
-
     no_schedule = session.scalar(
         select(func.count()).where(DagModel.timetable_summary == "None", 
DagModel.dag_id == dag_id)
     )
@@ -301,6 +300,8 @@ def _do_dry_run(
     )
     logical_dates: list[datetime] = []
     for info in dagrun_info_list:
+        if TYPE_CHECKING:
+            assert info.logical_date
         dr = session.scalar(
             statement=_get_latest_dag_run_row_query(dag_id=dag_id, info=info, 
session=session),
         )
@@ -428,7 +429,12 @@ def _get_info_list(
 ) -> list[DagRunInfo]:
     infos = dag.iter_dagrun_infos_between(from_date, to_date)
     now = timezone.utcnow()
-    dagrun_info_list = [x for x in infos if x.data_interval.end < now]
+    dagrun_info_list = [
+        x
+        for x in infos
+        # todo: AIP-76 update for partitioned dags
+        if x.data_interval and x.data_interval.end < now
+    ]
     if reverse:
         dagrun_info_list = list(reversed(dagrun_info_list))
     return dagrun_info_list
@@ -497,6 +503,7 @@ def _create_backfill(
         serdag = 
session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
         if not serdag:
             raise DagNotFound(f"Could not find dag {dag_id}")
+
         no_schedule = session.scalar(
             select(func.count()).where(DagModel.timetable_summary == "None", 
DagModel.dag_id == dag_id)
         )
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 2da71540db2..55a4b156f1a 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -17,7 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-import logging
 from collections import defaultdict
 from collections.abc import Callable, Collection
 from datetime import datetime, timedelta
@@ -25,6 +24,8 @@ from typing import TYPE_CHECKING, Any, cast
 
 import pendulum
 import sqlalchemy_jsonfield
+import structlog
+from dateutil.relativedelta import relativedelta
 from sqlalchemy import (
     Boolean,
     Float,
@@ -91,7 +92,7 @@ if TYPE_CHECKING:
         | Collection["SerializedAsset" | "SerializedAssetAlias"]
     )
 
-log = logging.getLogger(__name__)
+log = structlog.getLogger(__name__)
 
 TAG_MAX_LEN = 100
 
@@ -128,7 +129,7 @@ def infer_automated_data_interval(timetable: Timetable, 
logical_date: datetime)
     return DataInterval(start, end)
 
 
-def get_run_data_interval(timetable: Timetable, run: DagRun) -> DataInterval:
+def get_run_data_interval(timetable: Timetable, run: DagRun | None) -> 
DataInterval | None:
     """
     Get the data interval of this run.
 
@@ -141,6 +142,12 @@ def get_run_data_interval(timetable: Timetable, run: 
DagRun) -> DataInterval:
 
     :meta private:
     """
+    if not run:
+        return run
+
+    if run.partition_key is not None:
+        return None
+
     if (
         data_interval := _get_model_data_interval(run, "data_interval_start", 
"data_interval_end")
     ) is not None:
@@ -712,33 +719,43 @@ class DagModel(Base):
     def calculate_dagrun_date_fields(
         self,
         dag: SerializedDAG | LazyDeserializedDAG,
-        last_automated_dag_run: None | DataInterval,
+        *,
+        last_automated_run: DagRun | None,
     ) -> None:
         """
         Calculate ``next_dagrun`` and `next_dagrun_create_after``.
 
         :param dag: The DAG object
-        :param last_automated_dag_run: DataInterval of most recent run of this 
dag, or none
+        :param last_automated_run: DagRun of most recent run of this dag, or 
none
             if not yet scheduled.
+            TODO: AIP-76 This is not always latest run! See 
https://github.com/apache/airflow/issues/59618.
         """
-        if isinstance(last_automated_dag_run, datetime):
+        # TODO: AIP-76 perhaps we need to add validation for manual runs 
ensure consistency between
+        #   partition_key / partition_date and run_after
+
+        if isinstance(last_automated_run, datetime):
             raise ValueError(
                 "Passing a datetime to `DagModel.calculate_dagrun_date_fields` 
is not supported. "
                 "Provide a data interval instead."
             )
-        next_dagrun_info = 
dag.next_dagrun_info(last_automated_dagrun=last_automated_dag_run)
+
+        last_run_info = None
+        if last_automated_run:
+            last_run_info = 
dag.timetable.run_info_from_dag_run(dag_run=last_automated_run)
+        next_dagrun_info = 
dag.next_dagrun_info(last_automated_run_info=last_run_info)
         if next_dagrun_info is None:
+            # there is no next dag run after the last dag run; set to None
             self.next_dagrun_data_interval = self.next_dagrun = 
self.next_dagrun_create_after = None
         else:
             self.next_dagrun_data_interval = next_dagrun_info.data_interval
-            self.next_dagrun = next_dagrun_info.logical_date
+            self.next_dagrun = next_dagrun_info.logical_date or 
next_dagrun_info.partition_date
             self.next_dagrun_create_after = next_dagrun_info.run_after
-
         log.info(
-            "Setting next_dagrun for %s to %s, run_after=%s",
-            dag.dag_id,
-            self.next_dagrun,
-            self.next_dagrun_create_after,
+            "setting next dagrun info",
+            next_dagrun=str(self.next_dagrun),
+            next_dagrun_create_after=str(self.next_dagrun_create_after),
+            
next_dagrun_data_interval_start=str(self.next_dagrun_data_interval_start),
+            
next_dagrun_data_interval_end=str(self.next_dagrun_data_interval_end),
         )
 
     @provide_session
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py 
b/airflow-core/src/airflow/serialization/definitions/dag.py
index 8540d3093e7..ea3009c5645 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -46,6 +46,7 @@ from airflow.serialization.definitions.deadline import 
DeadlineAlertFields, Seri
 from airflow.serialization.definitions.param import SerializedParamsDict
 from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
+from airflow.timetables.trigger import CronPartitionTimetable
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.types import DagRunType
@@ -384,21 +385,20 @@ class SerializedDAG:
 
     def next_dagrun_info(
         self,
-        last_automated_dagrun: None | DataInterval,
         *,
+        last_automated_run_info: DagRunInfo | None,
         restricted: bool = True,
     ) -> DagRunInfo | None:
         """
-        Get information about the next DagRun of this dag after 
``date_last_automated_dagrun``.
+        Get the DagRunInfo object for the next run of this dag.
 
-        This calculates what time interval the next DagRun should operate on
-        (its logical date) and when it can be scheduled, according to the
+        This calculates the interval or partition and when it can be 
scheduled, according to the
         dag's timetable, start_date, end_date, etc. This doesn't check max
         active run or any other "max_active_tasks" type limits, but only
         performs calculations based on the various date and interval fields of
         this dag and its tasks.
 
-        :param last_automated_dagrun: The ``max(logical_date)`` of
+        :param last_automated_run_info: The latest run info of
             existing "automated" DagRuns for this dag (scheduled or backfill,
             but not manual).
         :param restricted: If set to *False* (default is *True*), ignore
@@ -412,18 +412,23 @@ class SerializedDAG:
         else:
             restriction = TimeRestriction(earliest=None, latest=None, 
catchup=True)
         try:
-            info = self.timetable.next_dagrun_info(
-                last_automated_data_interval=last_automated_dagrun,
+            info = self.timetable.next_dagrun_info_v2(
+                last_dagrun_info=last_automated_run_info,
                 restriction=restriction,
             )
+            log.info(
+                "get next_dagrun_info_v2",
+                last_automated_run_info=last_automated_run_info,
+                next_info=info,
+            )
+            return info
         except Exception:
             log.exception(
-                "Failed to fetch run info after data interval %s for DAG %r",
-                last_automated_dagrun,
-                self.dag_id,
+                "Failed to fetch run info",
+                last_run_info=last_automated_run_info,
+                dag_id=self.dag_id,
             )
-            info = None
-        return info
+        return None
 
     def iter_dagrun_infos_between(
         self,
@@ -447,7 +452,12 @@ class SerializedDAG:
         ``earliest`` is ``2021-06-03 23:00:00``, the first DagRunInfo would be
         ``2021-06-03 23:00:00`` if ``align=False``, and ``2021-06-04 00:00:00``
         if ``align=True``.
+
+        #  see issue https://github.com/apache/airflow/issues/60455
         """
+        if isinstance(self.timetable, CronPartitionTimetable):
+            # todo: AIP-76 need to update this so that it handles partitions
+            raise ValueError("Partition-driven timetables not supported yet")
         if earliest is None:
             earliest = self._time_restriction.earliest
         if earliest is None:
@@ -477,6 +487,10 @@ class SerializedDAG:
                 yield DagRunInfo.interval(earliest, latest)
             return
 
+        if TYPE_CHECKING:
+            # todo: AIP-76 after updating this function for partitions, this 
may not be true
+            assert info.data_interval is not None
+
         # If align=False and earliest does not fall on the timetable's logical
         # schedule, "invent" a data interval for it.
         if not align and info.logical_date != earliest:
@@ -549,6 +563,14 @@ class SerializedDAG:
         """
         from airflow.models.dagrun import RUN_ID_REGEX
 
+        log.info(
+            "creating dag run",
+            run_after=run_after,
+            run_id=run_id,
+            logical_date=logical_date,
+            partition_key=partition_key,
+        )
+
         logical_date = coerce_datetime(logical_date)
         # For manual runs where logical_date is None, ensure no data_interval 
is set.
         if logical_date is None and data_interval is not None:
diff --git a/airflow-core/src/airflow/timetables/base.py 
b/airflow-core/src/airflow/timetables/base.py
index 9105b9d4d80..5bb916a7a16 100644
--- a/airflow-core/src/airflow/timetables/base.py
+++ b/airflow-core/src/airflow/timetables/base.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 from typing import TYPE_CHECKING, Any, NamedTuple, Protocol, runtime_checkable
 
 from airflow._shared.module_loading import qualname
+from airflow._shared.timezones import timezone
 from airflow.serialization.definitions.assets import SerializedAssetBase
 
 if TYPE_CHECKING:
@@ -26,6 +27,8 @@ if TYPE_CHECKING:
 
     from pendulum import DateTime
 
+    from airflow.models.dag import DagModel
+    from airflow.models.dagrun import DagRun
     from airflow.serialization.dag_dependency import DagDependency
     from airflow.serialization.definitions.assets import (
         SerializedAsset,
@@ -118,13 +121,21 @@ class DagRunInfo(NamedTuple):
     This **MUST** be "aware", i.e. contain timezone information.
     """
 
-    data_interval: DataInterval
+    data_interval: DataInterval | None
     """The data interval this DagRun to operate over."""
 
+    partition_date: DateTime | None
+    partition_key: str | None
+
     @classmethod
     def exact(cls, at: DateTime) -> DagRunInfo:
         """Represent a run on an exact time."""
-        return cls(run_after=at, data_interval=DataInterval.exact(at))
+        return cls(
+            run_after=at,
+            data_interval=DataInterval.exact(at),
+            partition_key=None,
+            partition_date=None,
+        )
 
     @classmethod
     def interval(cls, start: DateTime, end: DateTime) -> DagRunInfo:
@@ -135,17 +146,22 @@ class DagRunInfo(NamedTuple):
         one ends, and each run is scheduled right after the interval ends. This
         applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
         """
-        return cls(run_after=end, data_interval=DataInterval(start, end))
+        return cls(
+            run_after=end,
+            data_interval=DataInterval(start, end),
+            partition_key=None,
+            partition_date=None,
+        )
 
     @property
-    def logical_date(self: DagRunInfo) -> DateTime:
+    def logical_date(self: DagRunInfo) -> DateTime | None:
         """
         Infer the logical date to represent a DagRun.
 
         This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is
         essentially the same, just a different name.
         """
-        return self.data_interval.start
+        return self.data_interval.start if self.data_interval else None
 
 
 @runtime_checkable
@@ -315,3 +331,52 @@ class Timetable(Protocol):
         :param data_interval: The data interval of the DAG run.
         """
         return run_type.generate_run_id(suffix=run_after.isoformat())
+
+    def next_dagrun_info_v2(
+        self, *, last_dagrun_info: DagRunInfo | None, restriction: 
TimeRestriction
+    ) -> DagRunInfo | None:
+        """
+        Provide information to schedule the next DagRun.
+
+        The default implementation raises ``NotImplementedError``.
+
+        :param last_dagrun_info: The DagRunInfo object of the
+            Dag's last scheduled or backfilled run.
+        :param restriction: Restriction to apply when scheduling the Dag run.
+            See documentation of :class:`TimeRestriction` for details.
+
+        :return: Information on when the next DagRun can be scheduled. None
+            means a DagRun should not be created. This does not mean no more 
runs
+            will be scheduled ever again for this Dag; the timetable can return
+            a DagRunInfo object when asked at another time.
+        """
+        return self.next_dagrun_info(
+            last_automated_data_interval=last_dagrun_info and 
last_dagrun_info.data_interval,
+            restriction=restriction,
+        )
+
+    def next_run_info_from_dag_model(self, *, dag_model: DagModel) -> 
DagRunInfo:
+        from airflow.models.dag import get_next_data_interval
+
+        run_after = 
timezone.coerce_datetime(dag_model.next_dagrun_create_after)
+        if TYPE_CHECKING:
+            assert run_after is not None
+        data_interval = get_next_data_interval(self, dag_model)
+        return DagRunInfo(
+            run_after=run_after,
+            data_interval=data_interval,
+            partition_date=None,
+            partition_key=None,
+        )
+
+    def run_info_from_dag_run(self, *, dag_run: DagRun) -> DagRunInfo:
+        from airflow.models.dag import get_run_data_interval
+
+        run_after = timezone.coerce_datetime(dag_run.run_after)
+        interval = get_run_data_interval(self, dag_run)
+        return DagRunInfo(
+            run_after=run_after,
+            data_interval=interval,
+            partition_date=None,
+            partition_key=None,
+        )
diff --git a/airflow-core/src/airflow/timetables/trigger.py 
b/airflow-core/src/airflow/timetables/trigger.py
index f0e04da042e..af4f855f0cd 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -21,19 +21,29 @@ import functools
 import math
 import operator
 import time
+from types import NoneType
 from typing import TYPE_CHECKING, Any
 
+import structlog
+
+from airflow._shared.timezones import timezone
 from airflow._shared.timezones.timezone import coerce_datetime, 
parse_timezone, utcnow
 from airflow.timetables._cron import CronMixin
 from airflow.timetables._delta import DeltaMixin
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
+from airflow.utils.strings import get_random_string
 
 if TYPE_CHECKING:
     from dateutil.relativedelta import relativedelta
     from pendulum import DateTime
     from pendulum.tz.timezone import FixedTimezone, Timezone
 
+    from airflow.models.dag import DagModel
+    from airflow.models.dagrun import DagRun
     from airflow.timetables.base import TimeRestriction
+    from airflow.utils.types import DagRunType
+
+log = structlog.get_logger()
 
 
 class _TriggerTimetable(Timetable):
@@ -285,8 +295,9 @@ class MultipleCronTriggerTimetable(Timetable):
         return ", ".join(t.summary for t in self._timetables)
 
     def infer_manual_data_interval(self, *, run_after: DateTime) -> 
DataInterval:
+        intervals = (t.infer_manual_data_interval(run_after=run_after) for t 
in self._timetables)
         return min(
-            (t.infer_manual_data_interval(run_after=run_after) for t in 
self._timetables),
+            (x for x in intervals if x),
             key=operator.attrgetter("start"),
         )
 
@@ -321,7 +332,7 @@ class MultipleCronTriggerTimetable(Timetable):
         Unix timestamp. If the input is *None* (no next run), *inf* is returned
         so it's selected last.
         """
-        if info is None:
+        if info is None or info.logical_date is None:
             return math.inf
         return info.logical_date.timestamp()
 
@@ -338,8 +349,200 @@ class MultipleCronTriggerTimetable(Timetable):
         order values by ``-logical_date`` if they are earlier than or at 
current
         time, but ``+logical_date`` if later.
         """
-        if info is None:
+        if info is None or info.logical_date is None:
             return math.inf
         if (ts := info.logical_date.timestamp()) <= now:
             return -ts
         return ts
+
+
+class CronPartitionTimetable(CronTriggerTimetable):
+    """
+    Timetable that triggers Dag runs according to a cron expression.
+
+    Creates runs for partition keys.
+
+    The cron expression determines the sequence of run dates. And
+    the partition dates are derived from those according to the ``run_offset``.
+    The partition key is then formatted using the partition date.
+
+    A ``run_offset`` of 1 means the partition_date will be one cron interval
+    after the run date; negative means the partition date will be one cron
+    interval prior to the run date.
+
+    :param cron: cron string that defines when to run
+    :param timezone: Which timezone to use to interpret the cron string
+    :param run_offset: Integer offset that determines which partition date to 
run for.
+        The partition key will be derived from the partition date.
+    :param key_format: How to translate the partition date into a string 
partition key.
+
+    *run_immediately* controls, if no *start_time* is given to the Dag, when
+    the first run of the Dag should be scheduled. It has no effect if there
+    already exist runs for this Dag.
+
+    * If *True*, always run immediately the most recent possible Dag run.
+    * If *False*, wait to run until the next scheduled time in the future.
+    * If passed a ``timedelta``, will run the most recent possible Dag run
+      if that run's ``data_interval_end`` is within timedelta of now.
+    * If *None*, the timedelta is calculated as 10% of the time between the
+      most recent past scheduled time and the next scheduled time. E.g. if
+      running every hour, this would run the previous time if less than 6
+      minutes had past since the previous run time, otherwise it would wait
+      until the next hour.
+
+    # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
+    # todo: AIP-76 we could allow a tuple of integer + time-based
+
+    """
+
+    def __init__(
+        self,
+        cron: str,
+        *,
+        timezone: str | Timezone | FixedTimezone,
+        run_offset: int | datetime.timedelta | relativedelta | None = None,
+        run_immediately: bool | datetime.timedelta = False,
+        key_format: str = "%Y-%m-%dT%H:%M:%S",  # todo: AIP-76 we can't infer 
partition date from this, so we need to store it separately
+    ) -> None:
+        super().__init__(cron, timezone=timezone, 
run_immediately=run_immediately)
+        if not isinstance(run_offset, (int, NoneType)):
+            # todo: AIP-76 implement timedelta / relative delta?
+            raise ValueError("Run offset other than integer not supported 
yet.")
+        self._run_offset = run_offset or 0
+        self._key_format = key_format
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        from airflow.serialization.decoders import decode_run_immediately
+
+        offset = data["run_offset"]
+        if not isinstance(offset, (int, NoneType)):
+            offset = None
+            log.warning(
+                "Unexpected offset type on deserialization. Only int supported 
in this version.",
+                run_offset=offset,
+            )
+
+        return cls(
+            cron=data["expression"],
+            timezone=parse_timezone(data["timezone"]),
+            run_offset=offset,
+            run_immediately=decode_run_immediately(data.get("run_immediately", 
False)),
+            key_format=data["key_format"],
+        )
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.encoders import encode_run_immediately, 
encode_timezone
+
+        return {
+            "expression": self._expression,
+            "timezone": encode_timezone(self._timezone),
+            "run_immediately": encode_run_immediately(self._run_immediately),
+            "run_offset": self._run_offset,
+            "key_format": self._key_format,
+        }
+
+    def _get_partition_date(self, *, run_date) -> DateTime:
+        if self._run_offset == 0:
+            return run_date
+        # we will need to apply offset to determine run date
+        partition_date = timezone.coerce_datetime(run_date)
+        log.info(
+            "applying offset to partition date", 
partition_date=partition_date, run_offset=self._run_offset
+        )
+        iter_func = self._get_next if self._run_offset > 0 else self._get_prev
+        for _ in range(abs(self._run_offset)):
+            partition_date = iter_func(partition_date)
+        log.info("new partition date", partition_date=partition_date)
+        return partition_date
+
+    def next_dagrun_info_v2(
+        self,
+        *,
+        last_dagrun_info: DagRunInfo | None,
+        restriction: TimeRestriction,
+    ) -> DagRunInfo | None:
+        # todo: AIP-76 add test for this logic
+        # todo: AIP-76 we will have to ensure that the start / end times apply 
to the partition date ideally,
+        #  rather than just the run after
+
+        if restriction.catchup:
+            if last_dagrun_info is not None:
+                next_start_time = self._get_next(last_dagrun_info.run_after)
+            elif restriction.earliest is None:
+                next_start_time = self._calc_first_run()
+            else:
+                next_start_time = self._align_to_next(restriction.earliest)
+        else:
+            prev_candidate = self._align_to_prev(coerce_datetime(utcnow()))
+            start_time_candidates = [prev_candidate]
+            if last_dagrun_info is not None:
+                next_candidate = self._get_next(last_dagrun_info.run_after)
+                start_time_candidates.append(next_candidate)
+            elif restriction.earliest is None:
+                # Run immediately has no effect if there is restriction on 
earliest
+                first_run = self._calc_first_run()
+                start_time_candidates.append(first_run)
+            if restriction.earliest is not None:
+                earliest = self._align_to_next(restriction.earliest)
+                start_time_candidates.append(earliest)
+            next_start_time = max(start_time_candidates)
+        if restriction.latest is not None and restriction.latest < 
next_start_time:
+            return None
+
+        partition_date, partition_key = 
self._get_partition_info(run_date=next_start_time)
+        return DagRunInfo(
+            run_after=next_start_time,
+            partition_date=partition_date,
+            partition_key=partition_key,
+            data_interval=None,
+        )
+
+    def _get_partition_info(self, run_date: DateTime) -> tuple[DateTime, str]:
+        # todo: AIP-76 it does not make sense that we would infer partition 
info from run date
+        #  in general, because they might not be 1-1
+        partition_date = self._get_partition_date(run_date=run_date)
+        partition_key = self._format_key(partition_date)
+        return partition_date, partition_key
+
+    def _format_key(self, partition_date: DateTime) -> str:
+        return partition_date.strftime(self._key_format)
+
+    def generate_run_id(
+        self,
+        *,
+        run_type: DagRunType,
+        run_after: DateTime,
+        data_interval: DataInterval | None,
+        **extra,
+    ) -> str:
+        components = [
+            run_after.isoformat(),
+            extra.get("partition_key"),
+            get_random_string(),
+        ]
+        return run_type.generate_run_id(suffix="__".join(components))
+
+    def next_run_info_from_dag_model(self, *, dag_model: DagModel) -> 
DagRunInfo:
+        run_after = 
timezone.coerce_datetime(dag_model.next_dagrun_create_after)
+        if TYPE_CHECKING:
+            assert run_after is not None
+        partition_date, partition_key = 
self._get_partition_info(run_date=run_after)
+        return DagRunInfo(
+            run_after=run_after,
+            data_interval=None,
+            partition_date=partition_date,
+            partition_key=partition_key,
+        )
+
+    def run_info_from_dag_run(self, *, dag_run: DagRun) -> DagRunInfo:
+        run_after = timezone.coerce_datetime(dag_run.run_after)
+        # todo: AIP-76 store this on DagRun so we don't need to recalculate?
+        # todo: AIP-76 this needs to be public
+        partition_date = self._get_partition_date(run_date=dag_run.run_after)
+        return DagRunInfo(
+            run_after=run_after,
+            data_interval=None,
+            partition_date=partition_date,
+            partition_key=dag_run.partition_key,
+        )
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py 
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 98edd6364fb..01f87c2ca22 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -67,6 +67,10 @@ else:
 
 pytestmark = pytest.mark.db_test
 
+jan_1 = DEFAULT_DATE
+jan_6 = DEFAULT_DATE + timedelta(days=5)
+dec_27 = DEFAULT_DATE + timedelta(days=-5)
+
 
 class TestCliDags:
     parser: argparse.ArgumentParser
@@ -145,74 +149,84 @@ class TestCliDags:
         assert "OUT" in out
         assert "ERR" in out
 
-    def test_next_execution(self, tmp_path, stdout_capture):
-        dag_test_list = [
-            ("future_schedule_daily", "timedelta(days=5)", "'0 0 * * *'", 
"True"),
-            ("future_schedule_every_4_hours", "timedelta(days=5)", 
"timedelta(hours=4)", "True"),
-            ("future_schedule_once", "timedelta(days=5)", "'@once'", "True"),
-            ("future_schedule_none", "timedelta(days=5)", "None", "True"),
-            ("past_schedule_once", "timedelta(days=-5)", "'@once'", "True"),
-            ("past_schedule_daily", "timedelta(days=-5)", "'0 0 * * *'", 
"True"),
-            ("past_schedule_daily_catchup_false", "timedelta(days=-5)", "'0 0 
* * *'", "False"),
-        ]
-
-        for f in dag_test_list:
-            file_content = os.linesep.join(
-                [
-                    "from airflow import DAG",
-                    "from airflow.providers.standard.operators.empty import 
EmptyOperator",
-                    "from datetime import timedelta; from pendulum import 
today",
-                    f"dag = DAG('{f[0]}', start_date=today() + {f[1]}, 
schedule={f[2]}, catchup={f[3]})",
-                    "task = EmptyOperator(task_id='empty_task',dag=dag)",
-                ]
-            )
-            dag_file = tmp_path / f"{f[0]}.py"
-            dag_file.write_text(file_content)
-
-        with time_machine.travel(DEFAULT_DATE):
-            clear_db_dags()
-            parse_and_sync_to_db(tmp_path, include_examples=False)
-
-        default_run = DEFAULT_DATE
-        future_run = default_run + timedelta(days=5)
-        past_run = default_run + timedelta(days=-5)
-
-        expected_output = {
-            "future_schedule_daily": (
-                future_run.isoformat(),
-                future_run.isoformat() + os.linesep + (future_run + 
timedelta(days=1)).isoformat(),
+    @pytest.mark.parametrize(
+        ("dag_id", "delta", "schedule", "catchup", "first", "second"),
+        [
+            (
+                "future_schedule_daily",
+                "timedelta(days=5)",
+                "'0 0 * * *'",
+                "True",
+                jan_6.isoformat(),
+                jan_6.isoformat() + os.linesep + (jan_6 + 
timedelta(days=1)).isoformat(),
             ),
-            "future_schedule_every_4_hours": (
-                future_run.isoformat(),
-                future_run.isoformat() + os.linesep + (future_run + 
timedelta(hours=4)).isoformat(),
+            (
+                "future_schedule_every_4_hours",
+                "timedelta(days=5)",
+                "timedelta(hours=4)",
+                "True",
+                jan_6.isoformat(),
+                jan_6.isoformat() + os.linesep + (jan_6 + 
timedelta(hours=4)).isoformat(),
             ),
-            "future_schedule_once": (future_run.isoformat(), 
future_run.isoformat() + os.linesep + "None"),
-            "future_schedule_none": ("None", "None"),
-            "past_schedule_once": (past_run.isoformat(), "None"),
-            "past_schedule_daily": (
-                past_run.isoformat(),
-                past_run.isoformat() + os.linesep + (past_run + 
timedelta(days=1)).isoformat(),
+            (
+                "future_schedule_once",
+                "timedelta(days=5)",
+                "'@once'",
+                "True",
+                jan_6.isoformat(),
+                jan_6.isoformat() + os.linesep + "None",
             ),
-            "past_schedule_daily_catchup_false": (
-                (default_run - timedelta(days=1)).isoformat(),
-                (default_run - timedelta(days=1)).isoformat() + os.linesep + 
default_run.isoformat(),
+            ("future_schedule_none", "timedelta(days=5)", "None", "True", 
"None", "None"),
+            ("past_schedule_once", "timedelta(days=-5)", "'@once'", "True", 
dec_27.isoformat(), "None"),
+            (
+                "past_schedule_daily",
+                "timedelta(days=-5)",
+                "'0 0 * * *'",
+                "True",
+                dec_27.isoformat(),
+                dec_27.isoformat() + os.linesep + (dec_27 + 
timedelta(days=1)).isoformat(),
             ),
-        }
+            (
+                "past_schedule_daily_catchup_false",
+                "timedelta(days=-5)",
+                "'0 0 * * *'",
+                "False",
+                (jan_1 - timedelta(days=1)).isoformat(),
+                (jan_1 - timedelta(days=1)).isoformat() + os.linesep + 
jan_1.isoformat(),
+            ),
+        ],
+    )
+    def test_next_execution(self, dag_id, delta, schedule, catchup, first, 
second, tmp_path, stdout_capture):
+        file_content = os.linesep.join(
+            [
+                "from airflow import DAG",
+                "from airflow.providers.standard.operators.empty import 
EmptyOperator",
+                "from datetime import timedelta; from pendulum import today",
+                f"dag = DAG('{dag_id}', start_date=today(tz='UTC') + {delta}, 
schedule={schedule}, catchup={catchup})",
+                "task = EmptyOperator(task_id='empty_task',dag=dag)",
+            ]
+        )
+        dag_file = tmp_path / f"{dag_id}.py"
+        dag_file.write_text(file_content)
 
-        for dag_id in expected_output:
-            # Test num-executions = 1 (default)
-            args = self.parser.parse_args(["dags", "next-execution", dag_id])
-            with stdout_capture as temp_stdout:
-                dag_command.dag_next_execution(args)
-                out = temp_stdout.getvalue()
-            assert expected_output[dag_id][0] in out
+        print(file_content)
+        with time_machine.travel(DEFAULT_DATE):
+            clear_db_dags()
+            parse_and_sync_to_db(tmp_path, include_examples=False)
 
-            # Test num-executions = 2
-            args = self.parser.parse_args(["dags", "next-execution", dag_id, 
"--num-executions", "2"])
-            with stdout_capture as temp_stdout:
-                dag_command.dag_next_execution(args)
-                out = temp_stdout.getvalue()
-            assert expected_output[dag_id][1] in out
+        # Test num-executions = 1 (default)
+        args = self.parser.parse_args(["dags", "next-execution", dag_id])
+        with stdout_capture as temp_stdout:
+            dag_command.dag_next_execution(args)
+            out = temp_stdout.getvalue()
+        assert first in out
+
+        # Test num-executions = 2
+        args = self.parser.parse_args(["dags", "next-execution", dag_id, 
"--num-executions", "2"])
+        with stdout_capture as temp_stdout:
+            dag_command.dag_next_execution(args)
+            out = temp_stdout.getvalue()
+        assert second in out
 
         # Rebuild Test DB for other tests
         clear_db_dags()
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index c6b3d026082..069d72b5eca 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -88,7 +88,7 @@ from airflow.sdk.definitions.callback import AsyncCallback, 
SyncCallback
 from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
 from airflow.serialization.definitions.dag import SerializedDAG
 from airflow.serialization.serialized_objects import LazyDeserializedDAG
-from airflow.timetables.base import DataInterval
+from airflow.timetables.base import DagRunInfo, DataInterval
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.span_status import SpanStatus
 from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -4053,7 +4053,7 @@ class TestSchedulerJob:
         # To increase the chances the TIs from the "full" pool will get
         # retrieved first, we schedule all TIs from the first dag first.
         def _create_dagruns(dag: SerializedDAG):
-            next_info = dag.next_dagrun_info(None)
+            next_info = dag.next_dagrun_info(last_automated_run_info=None)
             assert next_info is not None
             for i in range(30):
                 yield dag.create_dagrun(
@@ -4066,7 +4066,7 @@ class TestSchedulerJob:
                     triggered_by=DagRunTriggeredByType.TEST,
                     session=session,
                 )
-                next_info = dag.next_dagrun_info(next_info.data_interval)
+                next_info = 
dag.next_dagrun_info(last_automated_run_info=next_info)
                 if next_info is None:
                     break
 
@@ -4346,7 +4346,7 @@ class TestSchedulerJob:
                 bash_command="exit 1",
                 retries=1,
             )
-        dag_maker.dag_model.calculate_dagrun_date_fields(dag, None)
+        dag_maker.dag_model.calculate_dagrun_date_fields(dag, 
last_automated_run=None)
 
         @provide_session
         def do_schedule(session):
@@ -4555,6 +4555,7 @@ class TestSchedulerJob:
         session.rollback()
 
     def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self, 
dag_maker):
+        # todo: this fails
         dag_id = "test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs"
         with dag_maker(dag_id=dag_id, schedule="@daily"):
             EmptyOperator(task_id="task1")
@@ -4645,6 +4646,46 @@ class TestSchedulerJob:
             self.job_runner._create_dag_runs(dag_models=[dag_maker.dag_model], 
session=session)
         assert mock_calc.called
 
+    @pytest.mark.parametrize(
+        "expected",
+        [
+            DagRunInfo(
+                run_after=pendulum.now(),
+                data_interval=None,
+                partition_date=None,
+                partition_key="helloooooo",
+            ),
+            DagRunInfo(
+                run_after=pendulum.now(),
+                data_interval=DataInterval(pendulum.today(), pendulum.today()),
+                partition_date=None,
+                partition_key=None,
+            ),
+        ],
+    )
+    @patch("airflow.timetables.base.Timetable.next_run_info_from_dag_model")
+    @patch("airflow.serialization.definitions.dag.SerializedDAG.create_dagrun")
+    def test_should_use_info_from_timetable(self, mock_create, mock_next, 
expected, session, dag_maker):
+        """We should always update next_dagrun after scheduler creates a new 
dag run."""
+        mock_next.return_value = expected
+        with dag_maker(schedule="0 0 * * *"):
+            EmptyOperator(task_id="dummy")
+
+        scheduler_job = Job(executor=self.null_exec)
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+        self.job_runner._create_dag_runs(dag_models=[dag_maker.dag_model], 
session=session)
+        kwargs = mock_create.call_args.kwargs
+        # todo: AIP-76 let's add partition_date to dag run
+        #  and we should probably have something on DagRun that can return the 
DagRunInfo for
+        #  that dag run. See https://github.com/apache/airflow/issues/61167
+        actual = DagRunInfo(
+            run_after=kwargs["run_after"],
+            data_interval=kwargs["data_interval"],
+            partition_key=kwargs["partition_key"],
+            partition_date=None,
+        )
+        assert actual == expected
+
     @pytest.mark.parametrize(
         ("run_type", "expected"),
         [
@@ -5055,9 +5096,7 @@ class TestSchedulerJob:
             scheduler_messages = [
                 record.message for record in caplog.records if record.levelno 
>= logging.ERROR
             ]
-            assert scheduler_messages == [
-                "DAG 'test_scheduler_create_dag_runs_does_not_raise_error' not 
found in serialized_dag table",
-            ]
+            assert scheduler_messages == ["Dag not found in serialized_dag 
table"]
 
     def _clear_serdags(self, dag_id, session):
         SDM = SerializedDagModel
@@ -6253,7 +6292,13 @@ class TestSchedulerJob:
             run_id="dr1_run_2",
             state=State.QUEUED,
             logical_date=dag.next_dagrun_info(
-                last_automated_dagrun=data_interval, restricted=False
+                last_automated_run_info=DagRunInfo(
+                    run_after=dr1_running.run_after,
+                    data_interval=data_interval,
+                    partition_date=None,
+                    partition_key=None,
+                ),
+                restricted=False,
             ).data_interval.start,
         )
         # second dag and dagruns
@@ -7324,7 +7369,7 @@ class TestSchedulerJob:
         job_runner = SchedulerJobRunner(job=scheduler_job)
         # In the dagmodel list, the first dag should fail, but the second one 
should succeed
         job_runner._create_dag_runs([dm1], session)
-        assert "Failed creating DagRun for testdag1" in caplog.text
+        assert "Failed creating DagRun" in caplog.text
 
     def test_activate_referenced_assets_with_no_existing_warning(self, 
session, testing_dag_bundle):
         dag_warnings = session.scalars(select(DagWarning)).all()
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 1853b4402d1..e5366fc6127 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -82,6 +82,7 @@ from airflow.timetables.simple import (
     NullTimetable,
     OnceTimetable,
 )
+from airflow.timetables.trigger import CronPartitionTimetable
 from airflow.utils.file import list_py_file_paths
 from airflow.utils.session import create_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -1483,11 +1484,11 @@ my_postgres_conn:
         dag = DAG("test_scheduler_dagrun_once", 
start_date=timezone.datetime(2015, 1, 1), schedule="@once")
         scheduler_dag = create_scheduler_dag(dag)
 
-        next_info = scheduler_dag.next_dagrun_info(None)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
         assert next_info
         assert next_info.logical_date == timezone.datetime(2015, 1, 1)
 
-        next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
         assert next_info is None
 
     def test_next_dagrun_info_catchup(self):
@@ -1528,7 +1529,8 @@ my_postgres_conn:
             start_date=six_hours_ago_to_the_hour,
             catchup=False,
         )
-        next_date, _ = dag1.next_dagrun_info(None)
+        info = dag1.next_dagrun_info(last_automated_run_info=None)
+        next_date = info.run_after
         # The DR should be scheduled in the last half an hour, not 6 hours ago
         assert next_date > half_an_hour_ago
         assert next_date < timezone.utcnow()
@@ -1540,7 +1542,8 @@ my_postgres_conn:
             catchup=False,
         )
 
-        next_date, _ = dag2.next_dagrun_info(None)
+        info = dag2.next_dagrun_info(last_automated_run_info=None)
+        next_date = info.run_after
         # The DR should be scheduled in the last 2 hours, not 6 hours ago
         assert next_date > two_hours_ago
         # The DR should be scheduled BEFORE now
@@ -1553,7 +1556,8 @@ my_postgres_conn:
             catchup=False,
         )
 
-        next_date, _ = dag3.next_dagrun_info(None)
+        info = dag3.next_dagrun_info(last_automated_run_info=None)
+        next_date = info.run_after
         # The DR should be scheduled in the last 2 hours, not 6 hours ago
         assert next_date == six_hours_ago_to_the_hour
 
@@ -1572,12 +1576,12 @@ my_postgres_conn:
         )
         scheduler_dag = create_scheduler_dag(dag)
 
-        next_info = scheduler_dag.next_dagrun_info(None)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
         assert next_info
         assert next_info.logical_date == timezone.datetime(2020, 1, 4)
 
         # The date to create is in the future, this is handled by 
"DagModel.dags_needing_dagruns"
-        next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
         assert next_info
         assert next_info.logical_date == timezone.datetime(2020, 1, 5)
 
@@ -1595,20 +1599,20 @@ my_postgres_conn:
         )
         scheduler_dag = create_scheduler_dag(dag)
 
-        next_info = scheduler_dag.next_dagrun_info(None)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
         assert next_info
         assert next_info.logical_date == timezone.datetime(2020, 5, 1)
 
-        next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
         assert next_info
         assert next_info.logical_date == timezone.datetime(2020, 5, 2)
 
-        next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
         assert next_info
         assert next_info.logical_date == timezone.datetime(2020, 5, 3)
 
         # The date to create is in the future, this is handled by 
"DagModel.dags_needing_dagruns"
-        next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
         assert next_info
         assert next_info.logical_date == timezone.datetime(2020, 5, 4)
 
@@ -1616,7 +1620,7 @@ my_postgres_conn:
         """Test the DAG does not crash the scheduler if the timetable raises 
an exception."""
 
         class FailingTimetable(Timetable):
-            def next_dagrun_info(self, last_automated_data_interval, 
restriction):
+            def next_dagrun_info(self, last_automated_run_info, restriction):
                 raise RuntimeError("this fails")
 
         def _find_registered_custom_timetable(s):
@@ -1646,19 +1650,16 @@ my_postgres_conn:
             assert len(records) == 1
             record = records[0]
             assert record.exc_info is not None, "Should contain exception"
-            assert record.message == (
-                f"Failed to fetch run info after data interval {data_interval} 
"
-                f"for DAG 'test_next_dagrun_info_timetable_exception'"
-            )
+            assert record.message == "Failed to fetch run info"
 
         with caplog.at_level(level=logging.ERROR):
-            next_info = scheduler_dag.next_dagrun_info(None)
+            next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
         assert next_info is None, "failed next_dagrun_info should return None"
         _check_logs(caplog.records, data_interval=None)
         caplog.clear()
         data_interval = DataInterval(timezone.datetime(2020, 5, 1), 
timezone.datetime(2020, 5, 2))
         with caplog.at_level(level=logging.ERROR):
-            next_info = scheduler_dag.next_dagrun_info(data_interval)
+            next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=data_interval)
         assert next_info is None, "failed next_dagrun_info should return None"
         _check_logs(caplog.records, data_interval)
 
@@ -1683,7 +1684,7 @@ my_postgres_conn:
         EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
         scheduler_dag = create_scheduler_dag(dag)
 
-        next_info = scheduler_dag.next_dagrun_info(None)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
         assert next_info
         assert next_info.logical_date == timezone.datetime(2016, 1, 2, 5, 4)
 
@@ -1696,7 +1697,7 @@ my_postgres_conn:
         EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
         scheduler_dag = create_scheduler_dag(dag)
 
-        next_info = scheduler_dag.next_dagrun_info(None)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
         assert next_info
         assert next_info.logical_date == timezone.datetime(2016, 1, 1, 10, 10)
 
@@ -1711,7 +1712,7 @@ my_postgres_conn:
         EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
         scheduler_dag = create_scheduler_dag(dag)
 
-        next_info = scheduler_dag.next_dagrun_info(None)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
         assert next_info
         # With catchup=False, next_dagrun should be based on the current date
         # Verify it's not using the old start_date
@@ -1732,7 +1733,7 @@ my_postgres_conn:
         EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
         scheduler_dag = create_scheduler_dag(dag)
 
-        next_info = scheduler_dag.next_dagrun_info(None)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
         assert next_info
         # With catchup=False, next_dagrun should be based on the current date
         # Verify it's not using the old start_date
@@ -1748,11 +1749,11 @@ my_postgres_conn:
         )
         scheduler_dag = create_scheduler_dag(dag)
 
-        next_info = scheduler_dag.next_dagrun_info(None)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
         assert next_info
         assert next_info.logical_date == timezone.datetime(2024, 2, 29)
 
-        next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+        next_info = 
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
         assert next_info.logical_date == timezone.datetime(2028, 2, 29)
         assert next_info.data_interval.start == timezone.datetime(2028, 2, 29)
         assert next_info.data_interval.end == timezone.datetime(2032, 2, 29)
@@ -2748,7 +2749,7 @@ def test_iter_dagrun_infos_between_error(caplog):
     end = pendulum.instance(DEFAULT_DATE)
 
     class FailingAfterOneTimetable(Timetable):
-        def next_dagrun_info(self, last_automated_data_interval, restriction):
+        def next_dagrun_info(self, last_automated_data_interval: DataInterval 
| None, restriction):
             if last_automated_data_interval is None:
                 return DagRunInfo.interval(start, end)
             raise RuntimeError("this fails")
@@ -3554,3 +3555,48 @@ def test_get_run_data_interval_pre_aip_39():
     ds_end = current_ts.replace(hour=0, minute=0, second=0, microsecond=0)
     timetable = coerce_to_core_timetable(dag.timetable)
     assert get_run_data_interval(timetable, dr) == 
DataInterval(start=ds_start, end=ds_end)
+
+
[email protected](
+    ("schedule", "next_run", "next_interval", "next_run_after"),
+    [
+        (
+            CronPartitionTimetable(
+                "0 0 * * *",
+                timezone=pendulum.UTC,
+            ),
+            TEST_DATE + timedelta(days=1),
+            None,
+            TEST_DATE + timedelta(days=1),
+        ),
+        (
+            "0 0 * * *",
+            TEST_DATE,
+            DataInterval(start=TEST_DATE, end=TEST_DATE + timedelta(days=1)),
+            TEST_DATE + timedelta(days=1),
+        ),
+        (
+            None,
+            None,
+            None,
+            None,
+        ),
+        (
+            "@once",
+            None,
+            None,
+            None,
+        ),
+    ],
+)
+def test_calculate_dagrun_date_fields(schedule, next_run, next_interval, 
next_run_after, dag_maker, session):
+    with dag_maker(schedule=schedule, catchup=True, start_date=TEST_DATE):
+        BashOperator(task_id="hi", bash_command="yo")
+
+    run = dag_maker.create_dagrun()
+    serdag = dag_maker.serialized_dag
+    dag_model = dag_maker.dag_model
+    dag_model.calculate_dagrun_date_fields(dag=serdag, last_automated_run=run)
+    assert dag_model.next_dagrun_data_interval == next_interval
+    assert dag_model.next_dagrun == next_run
+    assert dag_model.next_dagrun_create_after == next_run_after
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 5bf2103c5e5..7449a69b5f0 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -1127,7 +1127,7 @@ class TestDagRun:
         expected_stat_tags = {"dag_id": f"{dag.dag_id}", "run_type": 
DagRunType.SCHEDULED}
         scheduler_dag = sync_dag_to_db(dag, session=session)
         try:
-            info = scheduler_dag.next_dagrun_info(None)
+            info = scheduler_dag.next_dagrun_info(last_automated_run_info=None)
             orm_dag_kwargs = {
                 "dag_id": dag.dag_id,
                 "bundle_name": "testing",
diff --git a/airflow-core/tests/unit/timetables/test_events_timetable.py 
b/airflow-core/tests/unit/timetables/test_events_timetable.py
index f4df62e39e1..80166b57389 100644
--- a/airflow-core/tests/unit/timetables/test_events_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_events_timetable.py
@@ -69,7 +69,9 @@ def restricted_timetable():
     list(zip(EVENT_DATES, EVENT_DATES)),
 )
 def test_dag_run_info_interval(start: pendulum.DateTime, end: 
pendulum.DateTime):
-    expected_info = DagRunInfo(run_after=end, 
data_interval=DataInterval(start, end))
+    expected_info = DagRunInfo(
+        run_after=end, data_interval=DataInterval(start, end), 
partition_date=None, partition_key=None
+    )
     assert DagRunInfo.interval(start, end) == expected_info
 
 
diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py 
b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
index eecd2bdfb0a..a5774bb22c5 100644
--- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
@@ -23,11 +23,14 @@ import dateutil.relativedelta
 import pendulum
 import pytest
 import time_machine
+from sqlalchemy import select
 
 from airflow._shared.timezones.timezone import utc
 from airflow.exceptions import AirflowTimetableInvalid
+from airflow.models import DagModel
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
 from airflow.timetables.trigger import (
+    CronPartitionTimetable,
     CronTriggerTimetable,
     DeltaTriggerTimetable,
     MultipleCronTriggerTimetable,
@@ -599,3 +602,210 @@ def test_multi_serialization():
     assert tt._timetables[0]._timezone == tt._timetables[1]._timezone == utc
     assert tt._timetables[0]._interval == tt._timetables[1]._interval == 
datetime.timedelta(minutes=10)
     assert tt._timetables[0]._run_immediately == 
tt._timetables[1]._run_immediately is False
+
+
[email protected]_test
[email protected]_serialized_dag
+def test_latest_run_no_history(dag_maker, session):
+    start_date = pendulum.datetime(2026, 1, 1)
+    with dag_maker(
+        "test",
+        start_date=start_date,
+        catchup=True,
+        schedule=CronPartitionTimetable(
+            "0 * * * *",
+            timezone=pendulum.UTC,
+        ),
+    ):
+        pass
+    dag_maker.sync_dagbag_to_db()
+    session.commit()
+    dm = session.scalar(select(DagModel))
+    assert dm.next_dagrun == start_date
+
+
[email protected]_test
[email protected]_serialized_dag
+def test_latest_run_with_run(dag_maker, session):
+    """
+    This ensures that the dag processor will figure out the next run correctly
+    """
+    start_date = pendulum.datetime(2026, 1, 1)
+    with dag_maker(
+        "test",
+        start_date=start_date,
+        catchup=True,
+        schedule=CronPartitionTimetable(
+            "0 0 * * *",
+            timezone=pendulum.UTC,
+        ),
+    ):
+        pass
+    dag_maker.sync_dagbag_to_db()
+    dag_maker.create_dagrun(
+        run_id="abc1234",
+        logical_date=None,
+        data_interval=None,
+        run_type="scheduled",
+        run_after=start_date + datetime.timedelta(days=3),
+        partition_key="anything",
+        session=session,
+    )
+    session.commit()
+    dag_maker.sync_dag_to_db()
+    session.commit()
+    dm = session.scalar(select(DagModel))
+    assert dm.next_dagrun == start_date + datetime.timedelta(days=4)
+
+
[email protected]_test
[email protected](
+    ("schedule", "partition_key", "expected"),
+    [
+        (
+            CronPartitionTimetable(
+                "0 0 * * *",
+                timezone=pendulum.UTC,
+            ),
+            "key-1",
+            DagRunInfo(
+                run_after=START_DATE + datetime.timedelta(days=3),
+                data_interval=None,
+                partition_date=START_DATE + datetime.timedelta(days=3),
+                partition_key="key-1",
+            ),
+        ),
+        (
+            "0 0 * * *",
+            None,
+            DagRunInfo(
+                run_after=START_DATE + datetime.timedelta(days=3),
+                data_interval=DataInterval(
+                    start=START_DATE + datetime.timedelta(days=2),
+                    end=START_DATE + datetime.timedelta(days=3),
+                ),
+                partition_date=None,
+                partition_key=None,
+            ),
+        ),
+    ],
+)
+def test_run_info_from_dag_run(schedule, partition_key, expected, dag_maker, 
session):
+    with dag_maker(
+        "test",
+        start_date=START_DATE,
+        catchup=True,
+        schedule=schedule,
+    ):
+        pass
+    dag_maker.sync_dagbag_to_db()
+    dr = dag_maker.create_dagrun(
+        run_id="abc1234",
+        logical_date=None,
+        data_interval=None,
+        run_type="scheduled",
+        run_after=START_DATE + datetime.timedelta(days=3),
+        partition_key=partition_key,
+        session=session,
+    )
+    info = dag_maker.serialized_dag.timetable.run_info_from_dag_run(dag_run=dr)
+    assert info == expected
+
+
[email protected]_test
[email protected]_serialized_dag
[email protected](
+    ("schedule", "expected"),
+    [
+        (
+            CronPartitionTimetable(
+                "0 0 * * *",
+                timezone=pendulum.UTC,
+            ),
+            DagRunInfo(
+                run_after=START_DATE,
+                data_interval=None,
+                partition_date=START_DATE,
+                partition_key="2021-09-04T00:00:00",
+            ),
+        ),
+        (
+            "0 0 * * *",
+            DagRunInfo(
+                run_after=START_DATE + datetime.timedelta(days=1),
+                data_interval=DataInterval(
+                    start=START_DATE,
+                    end=START_DATE + datetime.timedelta(days=1),
+                ),
+                partition_date=None,
+                partition_key=None,
+            ),
+        ),
+    ],
+)
+def test_next_dagrun_info_v2(schedule, expected, dag_maker, session):
+    """
+    This ensures that the dag processor will figure out the next run correctly
+    """
+    with dag_maker(
+        "test",
+        start_date=START_DATE,
+        catchup=True,
+        schedule=schedule,
+    ):
+        pass
+    dag_maker.sync_dagbag_to_db()
+    serdag = dag_maker.serialized_dag
+    timetable = serdag.timetable
+    info = timetable.next_dagrun_info_v2(
+        last_dagrun_info=None,
+        restriction=serdag._time_restriction,
+    )
+    assert info == expected
+
+
[email protected]_test
[email protected]_serialized_dag
[email protected](
+    ("schedule", "partition_key", "expected"),
+    [
+        (
+            CronPartitionTimetable(
+                "0 0 * * *",
+                timezone=pendulum.UTC,
+            ),
+            "key-1",
+            DagRunInfo(
+                run_after=START_DATE,
+                data_interval=None,
+                partition_date=START_DATE,
+                partition_key="2021-09-04T00:00:00",
+            ),
+        ),
+        (
+            "0 0 * * *",
+            None,
+            DagRunInfo(
+                run_after=START_DATE + datetime.timedelta(days=1),
+                data_interval=DataInterval(
+                    start=START_DATE,
+                    end=START_DATE + datetime.timedelta(days=1),
+                ),
+                partition_date=None,
+                partition_key=None,
+            ),
+        ),
+    ],
+)
+def test_next_run_info_from_dag_model(schedule, partition_key, expected, 
dag_maker, session):
+    with dag_maker(
+        "test",
+        start_date=START_DATE,
+        catchup=True,
+        schedule=schedule,
+    ):
+        pass
+    dag_maker.sync_dagbag_to_db()
+    dm = dag_maker.dag_model
+    info = 
dag_maker.serialized_dag.timetable.next_run_info_from_dag_model(dag_model=dm)
+    assert info == expected
diff --git a/airflow-core/tests/unit/timetables/test_workday_timetable.py 
b/airflow-core/tests/unit/timetables/test_workday_timetable.py
index aadf8e67a2f..abf730d10af 100644
--- a/airflow-core/tests/unit/timetables/test_workday_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_workday_timetable.py
@@ -57,7 +57,12 @@ def timetable():
     list(zip(WEEK_1_WEEKDAYS[:-1], WEEK_1_WEEKDAYS[1:])),
 )
 def test_dag_run_info_interval(start: pendulum.DateTime, end: 
pendulum.DateTime):
-    expected_info = DagRunInfo(run_after=end, 
data_interval=DataInterval(start, end))
+    expected_info = DagRunInfo(
+        run_after=end,
+        data_interval=DataInterval(start, end),
+        partition_date=None,
+        partition_key=None,
+    )
     assert DagRunInfo.interval(start, end) == expected_info
 
 
diff --git a/devel-common/src/tests_common/pytest_plugin.py 
b/devel-common/src/tests_common/pytest_plugin.py
index 9622906efff..dc5e8f38f6a 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -53,7 +53,7 @@ if TYPE_CHECKING:
     from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
     from airflow.sdk.types import DagRunProtocol, Operator
     from airflow.serialization.definitions.dag import SerializedDAG
-    from airflow.timetables.base import DataInterval
+    from airflow.timetables.base import DagRunInfo, DataInterval
     from airflow.typing_compat import Self
     from airflow.utils.state import DagRunState, TaskInstanceState
 
@@ -1065,7 +1065,7 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
             from airflow.utils.state import DagRunState
             from airflow.utils.types import DagRunType
 
-            timezone = _import_timezone()
+            airflow_timezone = _import_timezone()
 
             if AIRFLOW_V_3_0_PLUS:
                 from airflow.utils.types import DagRunTriggeredByType
@@ -1100,8 +1100,12 @@ def dag_maker(request) -> Generator[DagMaker, None, 
None]:
                 if run_type == DagRunType.MANUAL:
                     logical_date = self.start_date
                 else:
-                    logical_date = dag.next_dagrun_info(None).logical_date
-            logical_date = timezone.coerce_datetime(logical_date)
+                    if AIRFLOW_V_3_2_PLUS:
+                        next_run_kwargs = dict(last_automated_run_info=None)
+                    else:
+                        next_run_kwargs = dict(last_automated_dagrun=None)
+                    logical_date = 
dag.next_dagrun_info(**next_run_kwargs).logical_date
+            logical_date = airflow_timezone.coerce_datetime(logical_date)
 
             data_interval = None
             try:
@@ -1125,13 +1129,15 @@ def dag_maker(request) -> Generator[DagMaker, None, 
None]:
                     if AIRFLOW_V_3_0_PLUS:
                         kwargs["run_id"] = dag.timetable.generate_run_id(
                             run_type=run_type,
-                            run_after=logical_date or 
timezone.coerce_datetime(timezone.utcnow()),
+                            run_after=logical_date
+                            or 
airflow_timezone.coerce_datetime(airflow_timezone.utcnow()),
                             data_interval=data_interval,
                         )
                     else:
                         kwargs["run_id"] = dag.timetable.generate_run_id(
                             run_type=run_type,
-                            logical_date=logical_date or 
timezone.coerce_datetime(timezone.utcnow()),
+                            logical_date=logical_date
+                            or 
airflow_timezone.coerce_datetime(airflow_timezone.utcnow()),
                             data_interval=data_interval,
                         )
             kwargs["run_type"] = run_type
@@ -1139,7 +1145,9 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
             if AIRFLOW_V_3_0_PLUS:
                 kwargs.setdefault("triggered_by", DagRunTriggeredByType.TEST)
                 kwargs["logical_date"] = logical_date
-                kwargs.setdefault("run_after", data_interval[-1] if 
data_interval else timezone.utcnow())
+                kwargs.setdefault(
+                    "run_after", data_interval[-1] if data_interval else 
airflow_timezone.utcnow()
+                )
             else:
                 kwargs.pop("triggered_by", None)
                 kwargs["execution_date"] = logical_date
@@ -1158,9 +1166,13 @@ def dag_maker(request) -> Generator[DagMaker, None, 
None]:
             if AIRFLOW_V_3_1_PLUS:
                 from airflow.models.dag import get_run_data_interval
 
-                next_info = 
sdag.next_dagrun_info(get_run_data_interval(sdag.timetable, dagrun))
+                interval = get_run_data_interval(sdag.timetable, dagrun)
+                last_run_info = self._get_run_info(dagrun, sdag, interval)
+                next_info = 
sdag.next_dagrun_info(last_automated_run_info=last_run_info)
             else:
-                next_info = 
sdag.next_dagrun_info(sdag.get_run_data_interval(dagrun))
+                interval = sdag.get_run_data_interval(dagrun)
+                last_run_info = self._get_run_info(dagrun, sdag, interval)
+                next_info = 
sdag.next_dagrun_info(last_automated_run_info=last_run_info)
             if next_info is None:
                 raise ValueError(f"cannot create run after {dagrun}")
             return self.create_dagrun(
@@ -1169,6 +1181,20 @@ def dag_maker(request) -> Generator[DagMaker, None, 
None]:
                 **kwargs,
             )
 
+        def _get_run_info(self, dagrun: DagRun, serdag: SerializedDAG, 
interval: DataInterval) -> DagRunInfo:
+            from airflow.timetables.base import DagRunInfo
+
+            if AIRFLOW_V_3_2_PLUS:
+                return serdag.timetable.run_info_from_dag_run(dag_run=dagrun)
+
+            airflow_timezone = _import_timezone()
+            return DagRunInfo(
+                run_after=airflow_timezone.coerce_datetime(dagrun.run_after),
+                data_interval=interval,
+                partition_date=None,
+                partition_key=None,
+            )
+
         def create_ti(self, task_id, dag_run=None, dag_run_kwargs=None, 
map_index=-1):
             """
             Create a specific task instance with proper task refresh.
@@ -2519,7 +2545,8 @@ def create_runtime_ti(mocked_parse):
             )
             if drinfo:
                 data_interval = drinfo.data_interval
-                data_interval_start, data_interval_end = data_interval.start, 
data_interval.end
+                data_interval_start = data_interval and data_interval.start
+                data_interval_end = data_interval and data_interval.end
 
         dag_id = task.dag.dag_id
         task_retries = task.retries or 0
diff --git a/task-sdk/src/airflow/sdk/bases/timetable.py 
b/task-sdk/src/airflow/sdk/bases/timetable.py
index 5f5da55f1de..4488cc9ba6d 100644
--- a/task-sdk/src/airflow/sdk/bases/timetable.py
+++ b/task-sdk/src/airflow/sdk/bases/timetable.py
@@ -47,6 +47,8 @@ class BaseTimetable:
 
     asset_condition: BaseAsset | None = None
 
+    # TODO: AIP-76 just add partition-driven field here to differentiate the 
behavior
+
     def validate(self) -> None:
         """
         Validate the timetable is correctly specified.
diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py 
b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
index 66bc3cc245e..afde7e2cfd1 100644
--- a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
+++ b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
@@ -51,7 +51,7 @@ class DeltaTriggerTimetable(DeltaMixin, BaseTimetable):
 @attrs.define
 class CronTriggerTimetable(CronMixin, BaseTimetable):
     """
-    Timetable that triggers DAG runs according to a cron expression.
+    Timetable that triggers Dag runs according to a cron expression.
 
     This is different from ``CronDataIntervalTimetable``, where the cron
     expression specifies the *data interval* of a DAG run. With this timetable,
@@ -66,13 +66,13 @@ class CronTriggerTimetable(CronMixin, BaseTimetable):
     :param timezone: Which timezone to use to interpret the cron string
     :param interval: timedelta that defines the data interval start. Default 0.
 
-    *run_immediately* controls, if no *start_time* is given to the DAG, when
-    the first run of the DAG should be scheduled. It has no effect if there
-    already exist runs for this DAG.
+    *run_immediately* controls, if no *start_time* is given to the Dag, when
+    the first run of the Dag should be scheduled. It has no effect if there
+    already exist runs for this Dag.
 
-    * If *True*, always run immediately the most recent possible DAG run.
+    * If *True*, always run immediately the most recent possible Dag run.
     * If *False*, wait to run until the next scheduled time in the future.
-    * If passed a ``timedelta``, will run the most recent possible DAG run
+    * If passed a ``timedelta``, will run the most recent possible Dag run
       if that run's ``data_interval_end`` is within timedelta of now.
     * If *None*, the timedelta is calculated as 10% of the time between the
       most recent past scheduled time and the next scheduled time. E.g. if


Reply via email to