This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 230293c2c91 Implement partition-driven scheduling on a dag (i.e. not
asset-driven) (#59115)
230293c2c91 is described below
commit 230293c2c91e31f8a7e0c29286c36b0d90c9b4a3
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Jan 29 23:38:18 2026 -0800
Implement partition-driven scheduling on a dag (i.e. not asset-driven)
(#59115)
---
airflow-core/src/airflow/api/common/mark_tasks.py | 7 +-
.../api_fastapi/core_api/services/ui/calendar.py | 7 +
.../api_fastapi/execution_api/routes/dag_runs.py | 2 +
.../src/airflow/cli/commands/dag_command.py | 9 +-
.../src/airflow/dag_processing/collection.py | 60 +++++-
.../src/airflow/jobs/scheduler_job_runner.py | 118 +++++++-----
airflow-core/src/airflow/models/backfill.py | 11 +-
airflow-core/src/airflow/models/dag.py | 43 +++--
.../src/airflow/serialization/definitions/dag.py | 46 +++--
airflow-core/src/airflow/timetables/base.py | 75 +++++++-
airflow-core/src/airflow/timetables/trigger.py | 209 +++++++++++++++++++-
.../tests/unit/cli/commands/test_dag_command.py | 138 ++++++++------
airflow-core/tests/unit/jobs/test_scheduler_job.py | 63 ++++++-
airflow-core/tests/unit/models/test_dag.py | 96 +++++++---
airflow-core/tests/unit/models/test_dagrun.py | 2 +-
.../tests/unit/timetables/test_events_timetable.py | 4 +-
.../unit/timetables/test_trigger_timetable.py | 210 +++++++++++++++++++++
.../unit/timetables/test_workday_timetable.py | 7 +-
devel-common/src/tests_common/pytest_plugin.py | 47 ++++-
task-sdk/src/airflow/sdk/bases/timetable.py | 2 +
.../airflow/sdk/definitions/timetables/trigger.py | 12 +-
21 files changed, 959 insertions(+), 209 deletions(-)
diff --git a/airflow-core/src/airflow/api/common/mark_tasks.py
b/airflow-core/src/airflow/api/common/mark_tasks.py
index 99b8e47e74f..09fa9cd5c5e 100644
--- a/airflow-core/src/airflow/api/common/mark_tasks.py
+++ b/airflow-core/src/airflow/api/common/mark_tasks.py
@@ -70,6 +70,9 @@ def set_state(
:param commit: Commit tasks to be altered to the database
:param session: database session
:return: list of tasks that have been created and updated
+
+ TODO: "past" and "future" params currently depend on logical date, which
is not always populated.
+ we might want to just deprecate these options. Or alter them to do
*something* in that case.
"""
if not tasks:
return []
@@ -183,7 +186,9 @@ def get_run_ids(dag: SerializedDAG, run_id: str, future:
bool, past: bool, sessi
run_ids = [run_id]
else:
dates = [
- info.logical_date for info in
dag.iter_dagrun_infos_between(start_date, end_date, align=False)
+ info.logical_date
+ for info in dag.iter_dagrun_infos_between(start_date, end_date,
align=False)
+ if info.logical_date # todo: AIP-76 this will not find anything
where logical date is null
]
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id,
logical_date=dates, session=session)]
return run_ids
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/calendar.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/calendar.py
index 0f00af412c6..fbcd0b0ab32 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/calendar.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/calendar.py
@@ -229,11 +229,18 @@ class CalendarService:
if curr_info is None: # No more DAG runs to schedule
break
+ if not curr_info.logical_date:
+ # todo: AIP-76 this is likely a partitioned dag. needs
implementation
+ break
if curr_info.logical_date <= prev_logical_date: # Timetable not
progressing, stopping
break
if curr_info.logical_date.year != year: # Crossed year boundary
break
+ if not curr_info.data_interval:
+ # todo: AIP-76 this is likely a partitioned dag. needs
implementation
+ break
+
if not self._is_date_in_range(curr_info.logical_date,
logical_date):
last_data_interval = curr_info.data_interval
prev_logical_date = curr_info.logical_date
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 56fd79b825e..7763850b5ee 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -113,6 +113,8 @@ def trigger_dag_run(
)
try:
+ # todo: AIP-76 add partition key here
+ # https://github.com/apache/airflow/issues/61075
trigger_dag(
dag_id=dag_id,
run_id=run_id,
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py
b/airflow-core/src/airflow/cli/commands/dag_command.py
index 2c3897941d4..8bab86a4966 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -44,6 +44,7 @@ from airflow.models import DagModel, DagRun, TaskInstance
from airflow.models.dag import get_next_data_interval
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
+from airflow.timetables.base import TimeRestriction
from airflow.utils import cli as cli_utils
from airflow.utils.cli import get_bagged_dag, suppress_logs_and_warning,
validate_dag_bundle_arg
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
@@ -312,6 +313,9 @@ def dag_next_execution(args) -> None:
>>> airflow dags next-execution tutorial
2018-08-31 10:38:00
+
+ # todo: AIP-76 determine what next execution should do for
partition-driven dags
+ # https://github.com/apache/airflow/issues/61076
"""
from airflow.models.serialized_dag import SerializedDagModel
@@ -342,7 +346,10 @@ def dag_next_execution(args) -> None:
print_execution_interval(next_interval)
for _ in range(1, args.num_executions):
- next_info = dag.next_dagrun_info(next_interval, restricted=False)
+ next_info = dag.timetable.next_dagrun_info(
+ last_automated_data_interval=next_interval,
+ restriction=TimeRestriction(earliest=None, latest=None,
catchup=True),
+ )
next_interval = None if next_info is None else next_info.data_interval
print_execution_interval(next_interval)
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index e8720b4df34..f5ad10196b4 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -49,7 +49,7 @@ from airflow.models.asset import (
TaskInletAssetReference,
TaskOutletAssetReference,
)
-from airflow.models.dag import DagModel, DagOwnerAttributes, DagTag,
get_run_data_interval
+from airflow.models.dag import DagModel, DagOwnerAttributes, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarningType
from airflow.models.errors import ParseImportError
@@ -63,6 +63,7 @@ from airflow.serialization.definitions.assets import (
from airflow.serialization.definitions.dag import SerializedDAG
from airflow.serialization.enums import Encoding
from airflow.serialization.serialized_objects import BaseSerialization,
LazyDeserializedDAG
+from airflow.timetables.trigger import CronPartitionTimetable
from airflow.triggers.base import BaseEventTrigger
from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries
from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks
@@ -129,6 +130,39 @@ def _get_latest_runs_stmt(dag_id: str) -> Select:
)
+def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select:
+ """Build a select statement to retrieve the last partitioned run for each
Dag."""
+ # todo: AIP-76 we should add a partition date field
+ latest_run_id = (
+ select(DagRun.id)
+ .where(
+ DagRun.dag_id == dag_id,
+ DagRun.run_type.in_(
+ (
+ DagRunType.BACKFILL_JOB,
+ DagRunType.SCHEDULED,
+ )
+ ),
+ DagRun.partition_key.is_not(None),
+ )
+ .order_by(DagRun.id.desc()) # todo: AIP-76 add partition date and
sort by it here
+ .limit(1)
+ .scalar_subquery()
+ )
+ return (
+ select(DagRun)
+ .where(DagRun.id == latest_run_id)
+ .options(
+ load_only(
+ DagRun.dag_id,
+ DagRun.logical_date,
+ DagRun.data_interval_start,
+ DagRun.data_interval_end,
+ )
+ )
+ )
+
+
class _RunInfo(NamedTuple):
latest_run: DagRun | None
num_active_runs: int
@@ -144,7 +178,23 @@ class _RunInfo(NamedTuple):
if not dag.timetable.can_be_scheduled:
return cls(None, 0)
- latest_run = session.scalar(_get_latest_runs_stmt(dag_id=dag.dag_id))
+ # todo: AIP-76 what's a more general way to detect?
+ # https://github.com/apache/airflow/issues/61086
+ if isinstance(dag.timetable, CronPartitionTimetable):
+ log.info("getting latest run for partitioned dag",
dag_id=dag.dag_id)
+ latest_run =
session.scalar(_get_latest_runs_stmt_partitioned(dag_id=dag.dag_id))
+ else:
+ log.info("getting latest run for non-partitioned dag",
dag_id=dag.dag_id)
+ latest_run =
session.scalar(_get_latest_runs_stmt(dag_id=dag.dag_id))
+ if latest_run:
+ log.info(
+ "got latest run",
+ dag_id=dag.dag_id,
+ logical_date=str(latest_run.logical_date),
+ partition_key=latest_run.partition_key,
+ )
+ else:
+ log.info("no latest run found", dag_id=dag.dag_id)
active_run_counts = DagRun.active_runs_of_dags(
dag_ids=[dag.dag_id],
exclude_backfill=True,
@@ -551,12 +601,8 @@ class DagModelOperation(NamedTuple):
dm.bundle_version = self.bundle_version
last_automated_run: DagRun | None = run_info.latest_run
- if last_automated_run is None:
- last_automated_data_interval = None
- else:
- last_automated_data_interval =
get_run_data_interval(dag.timetable, last_automated_run)
dm.exceeds_max_non_backfill = run_info.num_active_runs >=
dm.max_active_runs
- dm.calculate_dagrun_date_fields(dag, last_automated_data_interval)
+ dm.calculate_dagrun_date_fields(dag,
last_automated_run=last_automated_run)
if not dag.timetable.asset_condition:
dm.schedule_asset_references = []
dm.schedule_asset_alias_references = []
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index e13d7c2c698..0dc18ca696a 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -85,7 +85,7 @@ from airflow.models.asset import (
)
from airflow.models.backfill import Backfill
from airflow.models.callback import Callback
-from airflow.models.dag import DagModel, get_next_data_interval
+from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagbag import DBDagBag
from airflow.models.dagbundle import DagBundleModel
@@ -1897,24 +1897,31 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# as DagModel.dag_id and DagModel.next_dagrun
# This list is used to verify if the DagRun already exist so that we
don't attempt to create
# duplicate DagRuns
- existing_dagruns = (
- session.execute(
- select(DagRun.dag_id, DagRun.logical_date).where(
+ existing_dagrun_objects = (
+ session.scalars(
+ select(DagRun)
+ .where(
tuple_(DagRun.dag_id, DagRun.logical_date).in_(
(dm.dag_id, dm.next_dagrun) for dm in dag_models
- ),
+ )
)
+ .options(load_only(DagRun.dag_id, DagRun.logical_date))
)
.unique()
.all()
)
+ existing_dagruns = {(x.dag_id, x.logical_date): x for x in
existing_dagrun_objects}
+
+ # todo: AIP-76 we may want to update check existing to also check
partitioned dag runs,
+ # but the thing is, there is not actually a restriction that
+ # we don't create new runs with the same partition key
+ # so it's unclear whether we should / need to.
# backfill runs are not created by scheduler and their concurrency is
separate
# so we exclude them here
- dag_ids = (dm.dag_id for dm in dag_models)
active_runs_of_dags = Counter(
DagRun.active_runs_of_dags(
- dag_ids=dag_ids,
+ dag_ids=(dm.dag_id for dm in dag_models),
exclude_backfill=True,
session=session,
)
@@ -1944,58 +1951,69 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
serdag = self._get_current_dag(dag_id=dag_model.dag_id,
session=session)
if not serdag:
- self.log.error("DAG '%s' not found in serialized_dag table",
dag_model.dag_id)
+ self.log.error("Dag not found in serialized_dag table",
dag_id=dag_model.dag_id)
continue
- data_interval = get_next_data_interval(serdag.timetable, dag_model)
# Explicitly check if the DagRun already exists. This is an edge
case
# where a Dag Run is created but `DagModel.next_dagrun` and
`DagModel.next_dagrun_create_after`
# are not updated.
# We opted to check DagRun existence instead
# of catching an Integrity error and rolling back the session i.e
- # we need to set DagModel.next_dagrun_info if the DagRun already
exists or if we
- # create a new one. This is so that in the next scheduling loop we
try to create new runs
- # instead of falling in a loop of IntegrityError.
- if (serdag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
- try:
- serdag.create_dagrun(
- run_id=serdag.timetable.generate_run_id(
- run_type=DagRunType.SCHEDULED,
-
run_after=timezone.coerce_datetime(dag_model.next_dagrun),
- data_interval=data_interval,
- ),
- logical_date=dag_model.next_dagrun,
- data_interval=data_interval,
- run_after=dag_model.next_dagrun_create_after,
- run_type=DagRunType.SCHEDULED,
- triggered_by=DagRunTriggeredByType.TIMETABLE,
- state=DagRunState.QUEUED,
- creating_job_id=self.job.id,
- session=session,
- )
- active_runs_of_dags[serdag.dag_id] += 1
-
- # Exceptions like ValueError, ParamValidationError, etc. are
raised by
- # DagModel.create_dagrun() when dag is misconfigured. The
scheduler should not
- # crash due to misconfigured dags. We should log any exception
encountered
- # and continue to the next serdag.
- except Exception:
- self.log.exception("Failed creating DagRun for %s",
serdag.dag_id)
- # todo: if you get a database error here, continuing does
not work because
- # session needs rollback. you need either to make smaller
transactions and
- # commit after every dag run or use savepoints.
- # https://github.com/apache/airflow/issues/59120
- continue
+ if dr := existing_dagruns.get((dag_model.dag_id,
dag_model.next_dagrun)):
+ self.log.warning(
+ "run already exists; skipping dagrun creation",
+ dag_id=dag_model.dag_id,
+ logical_date=dag_model.next_dagrun,
+ )
+ dag_model.calculate_dagrun_date_fields(dag=serdag,
last_automated_run=dr)
+ continue
- self._set_exceeds_max_active_runs(
- dag_model=dag_model,
- session=session,
- active_non_backfill_runs=active_runs_of_dags[serdag.dag_id],
- )
- dag_model.calculate_dagrun_date_fields(dag=serdag,
last_automated_dag_run=data_interval)
+ try:
+ next_info =
serdag.timetable.next_run_info_from_dag_model(dag_model=dag_model)
+ data_interval = next_info.data_interval
+ logical_date = next_info.logical_date
+ partition_key = next_info.partition_key
+ run_after = next_info.run_after
+ # todo: AIP-76 partition date is not passed to dag run
+ # See https://github.com/apache/airflow/issues/61167.
+ created_run = serdag.create_dagrun(
+ run_id=serdag.timetable.generate_run_id(
+ run_type=DagRunType.SCHEDULED,
+ run_after=run_after,
+ data_interval=data_interval,
+ partition_key=partition_key,
+ ),
+ logical_date=logical_date,
+ data_interval=data_interval,
+ run_after=run_after,
+ run_type=DagRunType.SCHEDULED,
+ triggered_by=DagRunTriggeredByType.TIMETABLE,
+ state=DagRunState.QUEUED,
+ creating_job_id=self.job.id,
+ session=session,
+ partition_key=partition_key,
+ )
+ active_runs_of_dags[dag_model.dag_id] += 1
+ dag_model.calculate_dagrun_date_fields(dag=serdag,
last_automated_run=created_run)
+ self._set_exceeds_max_active_runs(
+ dag_model=dag_model,
+ session=session,
+
active_non_backfill_runs=active_runs_of_dags[dag_model.dag_id],
+ )
- # TODO[HA]: Should we do a session.flush() so we don't have to keep
lots of state/object in
- # memory for larger dags? or expunge_all()
+ # Exceptions like ValueError, ParamValidationError, etc. are
raised by
+ # DagModel.create_dagrun() when dag is misconfigured. The
scheduler should not
+ # crash due to misconfigured dags. We should log any exception
encountered
+ # and continue to the next serdag.
+ except Exception:
+ self.log.exception("Failed creating DagRun",
dag_id=dag_model.dag_id)
+ # todo: if you get a database error here, continuing does not
work because
+ # session needs rollback. you need either to make smaller
transactions and
+ # commit after every dag run or use savepoints.
+ # https://github.com/apache/airflow/issues/59120
+
+ # TODO[HA]: Should we do a session.flush() so we don't have to
keep lots of state/object in
+ # memory for larger dags? or expunge_all()
def _create_dag_runs_asset_triggered(
self,
diff --git a/airflow-core/src/airflow/models/backfill.py
b/airflow-core/src/airflow/models/backfill.py
index 33df6901d40..4a79acde307 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -286,7 +286,6 @@ def _do_dry_run(
raise DagNotFound(f"Could not find dag {dag_id}")
dag = serdag.dag
_validate_backfill_params(dag, reverse, from_date, to_date,
reprocess_behavior)
-
no_schedule = session.scalar(
select(func.count()).where(DagModel.timetable_summary == "None",
DagModel.dag_id == dag_id)
)
@@ -301,6 +300,8 @@ def _do_dry_run(
)
logical_dates: list[datetime] = []
for info in dagrun_info_list:
+ if TYPE_CHECKING:
+ assert info.logical_date
dr = session.scalar(
statement=_get_latest_dag_run_row_query(dag_id=dag_id, info=info,
session=session),
)
@@ -428,7 +429,12 @@ def _get_info_list(
) -> list[DagRunInfo]:
infos = dag.iter_dagrun_infos_between(from_date, to_date)
now = timezone.utcnow()
- dagrun_info_list = [x for x in infos if x.data_interval.end < now]
+ dagrun_info_list = [
+ x
+ for x in infos
+ # todo: AIP-76 update for partitioned dags
+ if x.data_interval and x.data_interval.end < now
+ ]
if reverse:
dagrun_info_list = list(reversed(dagrun_info_list))
return dagrun_info_list
@@ -497,6 +503,7 @@ def _create_backfill(
serdag =
session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
if not serdag:
raise DagNotFound(f"Could not find dag {dag_id}")
+
no_schedule = session.scalar(
select(func.count()).where(DagModel.timetable_summary == "None",
DagModel.dag_id == dag_id)
)
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 2da71540db2..55a4b156f1a 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -17,7 +17,6 @@
# under the License.
from __future__ import annotations
-import logging
from collections import defaultdict
from collections.abc import Callable, Collection
from datetime import datetime, timedelta
@@ -25,6 +24,8 @@ from typing import TYPE_CHECKING, Any, cast
import pendulum
import sqlalchemy_jsonfield
+import structlog
+from dateutil.relativedelta import relativedelta
from sqlalchemy import (
Boolean,
Float,
@@ -91,7 +92,7 @@ if TYPE_CHECKING:
| Collection["SerializedAsset" | "SerializedAssetAlias"]
)
-log = logging.getLogger(__name__)
+log = structlog.getLogger(__name__)
TAG_MAX_LEN = 100
@@ -128,7 +129,7 @@ def infer_automated_data_interval(timetable: Timetable,
logical_date: datetime)
return DataInterval(start, end)
-def get_run_data_interval(timetable: Timetable, run: DagRun) -> DataInterval:
+def get_run_data_interval(timetable: Timetable, run: DagRun | None) ->
DataInterval | None:
"""
Get the data interval of this run.
@@ -141,6 +142,12 @@ def get_run_data_interval(timetable: Timetable, run:
DagRun) -> DataInterval:
:meta private:
"""
+ if not run:
+ return run
+
+ if run.partition_key is not None:
+ return None
+
if (
data_interval := _get_model_data_interval(run, "data_interval_start",
"data_interval_end")
) is not None:
@@ -712,33 +719,43 @@ class DagModel(Base):
def calculate_dagrun_date_fields(
self,
dag: SerializedDAG | LazyDeserializedDAG,
- last_automated_dag_run: None | DataInterval,
+ *,
+ last_automated_run: DagRun | None,
) -> None:
"""
Calculate ``next_dagrun`` and `next_dagrun_create_after``.
:param dag: The DAG object
- :param last_automated_dag_run: DataInterval of most recent run of this
dag, or none
+ :param last_automated_run: DagRun of most recent run of this dag, or
none
if not yet scheduled.
+ TODO: AIP-76 This is not always latest run! See
https://github.com/apache/airflow/issues/59618.
"""
- if isinstance(last_automated_dag_run, datetime):
+ # TODO: AIP-76 perhaps we need to add validation for manual runs
ensure consistency between
+ # partition_key / partition_date and run_after
+
+ if isinstance(last_automated_run, datetime):
raise ValueError(
"Passing a datetime to `DagModel.calculate_dagrun_date_fields`
is not supported. "
"Provide a data interval instead."
)
- next_dagrun_info =
dag.next_dagrun_info(last_automated_dagrun=last_automated_dag_run)
+
+ last_run_info = None
+ if last_automated_run:
+ last_run_info =
dag.timetable.run_info_from_dag_run(dag_run=last_automated_run)
+ next_dagrun_info =
dag.next_dagrun_info(last_automated_run_info=last_run_info)
if next_dagrun_info is None:
+ # there is no next dag run after the last dag run; set to None
self.next_dagrun_data_interval = self.next_dagrun =
self.next_dagrun_create_after = None
else:
self.next_dagrun_data_interval = next_dagrun_info.data_interval
- self.next_dagrun = next_dagrun_info.logical_date
+ self.next_dagrun = next_dagrun_info.logical_date or
next_dagrun_info.partition_date
self.next_dagrun_create_after = next_dagrun_info.run_after
-
log.info(
- "Setting next_dagrun for %s to %s, run_after=%s",
- dag.dag_id,
- self.next_dagrun,
- self.next_dagrun_create_after,
+ "setting next dagrun info",
+ next_dagrun=str(self.next_dagrun),
+ next_dagrun_create_after=str(self.next_dagrun_create_after),
+
next_dagrun_data_interval_start=str(self.next_dagrun_data_interval_start),
+
next_dagrun_data_interval_end=str(self.next_dagrun_data_interval_end),
)
@provide_session
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index 8540d3093e7..ea3009c5645 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -46,6 +46,7 @@ from airflow.serialization.definitions.deadline import
DeadlineAlertFields, Seri
from airflow.serialization.definitions.param import SerializedParamsDict
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
+from airflow.timetables.trigger import CronPartitionTimetable
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
@@ -384,21 +385,20 @@ class SerializedDAG:
def next_dagrun_info(
self,
- last_automated_dagrun: None | DataInterval,
*,
+ last_automated_run_info: DagRunInfo | None,
restricted: bool = True,
) -> DagRunInfo | None:
"""
- Get information about the next DagRun of this dag after
``date_last_automated_dagrun``.
+ Get the DagRunInfo object for the next run of this dag.
- This calculates what time interval the next DagRun should operate on
- (its logical date) and when it can be scheduled, according to the
+ This calculates the interval or partition and when it can be
scheduled, according to the
dag's timetable, start_date, end_date, etc. This doesn't check max
active run or any other "max_active_tasks" type limits, but only
performs calculations based on the various date and interval fields of
this dag and its tasks.
- :param last_automated_dagrun: The ``max(logical_date)`` of
+ :param last_automated_run_info: The latest run info of
existing "automated" DagRuns for this dag (scheduled or backfill,
but not manual).
:param restricted: If set to *False* (default is *True*), ignore
@@ -412,18 +412,23 @@ class SerializedDAG:
else:
restriction = TimeRestriction(earliest=None, latest=None,
catchup=True)
try:
- info = self.timetable.next_dagrun_info(
- last_automated_data_interval=last_automated_dagrun,
+ info = self.timetable.next_dagrun_info_v2(
+ last_dagrun_info=last_automated_run_info,
restriction=restriction,
)
+ log.info(
+ "get next_dagrun_info_v2",
+ last_automated_run_info=last_automated_run_info,
+ next_info=info,
+ )
+ return info
except Exception:
log.exception(
- "Failed to fetch run info after data interval %s for DAG %r",
- last_automated_dagrun,
- self.dag_id,
+ "Failed to fetch run info",
+ last_run_info=last_automated_run_info,
+ dag_id=self.dag_id,
)
- info = None
- return info
+ return None
def iter_dagrun_infos_between(
self,
@@ -447,7 +452,12 @@ class SerializedDAG:
``earliest`` is ``2021-06-03 23:00:00``, the first DagRunInfo would be
``2021-06-03 23:00:00`` if ``align=False``, and ``2021-06-04 00:00:00``
if ``align=True``.
+
+ # see issue https://github.com/apache/airflow/issues/60455
"""
+ if isinstance(self.timetable, CronPartitionTimetable):
+ # todo: AIP-76 need to update this so that it handles partitions
+ raise ValueError("Partition-driven timetables not supported yet")
if earliest is None:
earliest = self._time_restriction.earliest
if earliest is None:
@@ -477,6 +487,10 @@ class SerializedDAG:
yield DagRunInfo.interval(earliest, latest)
return
+ if TYPE_CHECKING:
+ # todo: AIP-76 after updating this function for partitions, this
may not be true
+ assert info.data_interval is not None
+
# If align=False and earliest does not fall on the timetable's logical
# schedule, "invent" a data interval for it.
if not align and info.logical_date != earliest:
@@ -549,6 +563,14 @@ class SerializedDAG:
"""
from airflow.models.dagrun import RUN_ID_REGEX
+ log.info(
+ "creating dag run",
+ run_after=run_after,
+ run_id=run_id,
+ logical_date=logical_date,
+ partition_key=partition_key,
+ )
+
logical_date = coerce_datetime(logical_date)
# For manual runs where logical_date is None, ensure no data_interval
is set.
if logical_date is None and data_interval is not None:
diff --git a/airflow-core/src/airflow/timetables/base.py
b/airflow-core/src/airflow/timetables/base.py
index 9105b9d4d80..5bb916a7a16 100644
--- a/airflow-core/src/airflow/timetables/base.py
+++ b/airflow-core/src/airflow/timetables/base.py
@@ -19,6 +19,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any, NamedTuple, Protocol, runtime_checkable
from airflow._shared.module_loading import qualname
+from airflow._shared.timezones import timezone
from airflow.serialization.definitions.assets import SerializedAssetBase
if TYPE_CHECKING:
@@ -26,6 +27,8 @@ if TYPE_CHECKING:
from pendulum import DateTime
+ from airflow.models.dag import DagModel
+ from airflow.models.dagrun import DagRun
from airflow.serialization.dag_dependency import DagDependency
from airflow.serialization.definitions.assets import (
SerializedAsset,
@@ -118,13 +121,21 @@ class DagRunInfo(NamedTuple):
This **MUST** be "aware", i.e. contain timezone information.
"""
- data_interval: DataInterval
+ data_interval: DataInterval | None
"""The data interval this DagRun to operate over."""
+ partition_date: DateTime | None
+ partition_key: str | None
+
@classmethod
def exact(cls, at: DateTime) -> DagRunInfo:
"""Represent a run on an exact time."""
- return cls(run_after=at, data_interval=DataInterval.exact(at))
+ return cls(
+ run_after=at,
+ data_interval=DataInterval.exact(at),
+ partition_key=None,
+ partition_date=None,
+ )
@classmethod
def interval(cls, start: DateTime, end: DateTime) -> DagRunInfo:
@@ -135,17 +146,22 @@ class DagRunInfo(NamedTuple):
one ends, and each run is scheduled right after the interval ends. This
applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
"""
- return cls(run_after=end, data_interval=DataInterval(start, end))
+ return cls(
+ run_after=end,
+ data_interval=DataInterval(start, end),
+ partition_key=None,
+ partition_date=None,
+ )
@property
- def logical_date(self: DagRunInfo) -> DateTime:
+ def logical_date(self: DagRunInfo) -> DateTime | None:
"""
Infer the logical date to represent a DagRun.
This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is
essentially the same, just a different name.
"""
- return self.data_interval.start
+ return self.data_interval.start if self.data_interval else None
@runtime_checkable
@@ -315,3 +331,52 @@ class Timetable(Protocol):
:param data_interval: The data interval of the DAG run.
"""
return run_type.generate_run_id(suffix=run_after.isoformat())
+
+ def next_dagrun_info_v2(
+ self, *, last_dagrun_info: DagRunInfo | None, restriction:
TimeRestriction
+ ) -> DagRunInfo | None:
+ """
+ Provide information to schedule the next DagRun.
+
+ The default implementation raises ``NotImplementedError``.
+
+ :param last_dagrun_info: The DagRunInfo object of the
+ Dag's last scheduled or backfilled run.
+ :param restriction: Restriction to apply when scheduling the Dag run.
+ See documentation of :class:`TimeRestriction` for details.
+
+ :return: Information on when the next DagRun can be scheduled. None
+ means a DagRun should not be created. This does not mean no more
runs
+ will be scheduled ever again for this Dag; the timetable can return
+ a DagRunInfo object when asked at another time.
+ """
+ return self.next_dagrun_info(
+ last_automated_data_interval=last_dagrun_info and
last_dagrun_info.data_interval,
+ restriction=restriction,
+ )
+
+ def next_run_info_from_dag_model(self, *, dag_model: DagModel) ->
DagRunInfo:
+ from airflow.models.dag import get_next_data_interval
+
+ run_after =
timezone.coerce_datetime(dag_model.next_dagrun_create_after)
+ if TYPE_CHECKING:
+ assert run_after is not None
+ data_interval = get_next_data_interval(self, dag_model)
+ return DagRunInfo(
+ run_after=run_after,
+ data_interval=data_interval,
+ partition_date=None,
+ partition_key=None,
+ )
+
+ def run_info_from_dag_run(self, *, dag_run: DagRun) -> DagRunInfo:
+ from airflow.models.dag import get_run_data_interval
+
+ run_after = timezone.coerce_datetime(dag_run.run_after)
+ interval = get_run_data_interval(self, dag_run)
+ return DagRunInfo(
+ run_after=run_after,
+ data_interval=interval,
+ partition_date=None,
+ partition_key=None,
+ )
diff --git a/airflow-core/src/airflow/timetables/trigger.py
b/airflow-core/src/airflow/timetables/trigger.py
index f0e04da042e..af4f855f0cd 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -21,19 +21,29 @@ import functools
import math
import operator
import time
+from types import NoneType
from typing import TYPE_CHECKING, Any
+import structlog
+
+from airflow._shared.timezones import timezone
from airflow._shared.timezones.timezone import coerce_datetime,
parse_timezone, utcnow
from airflow.timetables._cron import CronMixin
from airflow.timetables._delta import DeltaMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
+from airflow.utils.strings import get_random_string
if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta
from pendulum import DateTime
from pendulum.tz.timezone import FixedTimezone, Timezone
+ from airflow.models.dag import DagModel
+ from airflow.models.dagrun import DagRun
from airflow.timetables.base import TimeRestriction
+ from airflow.utils.types import DagRunType
+
+log = structlog.get_logger()
class _TriggerTimetable(Timetable):
@@ -285,8 +295,9 @@ class MultipleCronTriggerTimetable(Timetable):
return ", ".join(t.summary for t in self._timetables)
def infer_manual_data_interval(self, *, run_after: DateTime) ->
DataInterval:
+ intervals = (t.infer_manual_data_interval(run_after=run_after) for t
in self._timetables)
return min(
- (t.infer_manual_data_interval(run_after=run_after) for t in
self._timetables),
+ (x for x in intervals if x),
key=operator.attrgetter("start"),
)
@@ -321,7 +332,7 @@ class MultipleCronTriggerTimetable(Timetable):
Unix timestamp. If the input is *None* (no next run), *inf* is returned
so it's selected last.
"""
- if info is None:
+ if info is None or info.logical_date is None:
return math.inf
return info.logical_date.timestamp()
@@ -338,8 +349,200 @@ class MultipleCronTriggerTimetable(Timetable):
order values by ``-logical_date`` if they are earlier than or at
current
time, but ``+logical_date`` if later.
"""
- if info is None:
+ if info is None or info.logical_date is None:
return math.inf
if (ts := info.logical_date.timestamp()) <= now:
return -ts
return ts
+
+
+class CronPartitionTimetable(CronTriggerTimetable):
+ """
+ Timetable that triggers Dag runs according to a cron expression.
+
+ Creates runs for partition keys.
+
+ The cron expression determines the sequence of run dates. And
+ the partition dates are derived from those according to the ``run_offset``.
+ The partition key is then formatted using the partition date.
+
+ A ``run_offset`` of 1 means the partition_date will be one cron interval
+ after the run date; negative means the partition date will be one cron
+ interval prior to the run date.
+
+ :param cron: cron string that defines when to run
+ :param timezone: Which timezone to use to interpret the cron string
+ :param run_offset: Integer offset that determines which partition date to
run for.
+ The partition key will be derived from the partition date.
+ :param key_format: How to translate the partition date into a string
partition key.
+
+ *run_immediately* controls, if no *start_time* is given to the Dag, when
+ the first run of the Dag should be scheduled. It has no effect if there
+ already exist runs for this Dag.
+
+ * If *True*, always run immediately the most recent possible Dag run.
+ * If *False*, wait to run until the next scheduled time in the future.
+ * If passed a ``timedelta``, will run the most recent possible Dag run
+ if that run's ``data_interval_end`` is within timedelta of now.
+ * If *None*, the timedelta is calculated as 10% of the time between the
+ most recent past scheduled time and the next scheduled time. E.g. if
+ running every hour, this would run the previous time if less than 6
+ minutes had past since the previous run time, otherwise it would wait
+ until the next hour.
+
+ # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
+ # todo: AIP-76 we could allow a tuple of integer + time-based
+
+ """
+
+ def __init__(
+ self,
+ cron: str,
+ *,
+ timezone: str | Timezone | FixedTimezone,
+ run_offset: int | datetime.timedelta | relativedelta | None = None,
+ run_immediately: bool | datetime.timedelta = False,
+ key_format: str = "%Y-%m-%dT%H:%M:%S", # todo: AIP-76 we can't infer
partition date from this, so we need to store it separately
+ ) -> None:
+ super().__init__(cron, timezone=timezone,
run_immediately=run_immediately)
+ if not isinstance(run_offset, (int, NoneType)):
+ # todo: AIP-76 implement timedelta / relative delta?
+ raise ValueError("Run offset other than integer not supported
yet.")
+ self._run_offset = run_offset or 0
+ self._key_format = key_format
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Timetable:
+ from airflow.serialization.decoders import decode_run_immediately
+
+ offset = data["run_offset"]
+ if not isinstance(offset, (int, NoneType)):
+ offset = None
+ log.warning(
+ "Unexpected offset type on deserialization. Only int supported
in this version.",
+ run_offset=offset,
+ )
+
+ return cls(
+ cron=data["expression"],
+ timezone=parse_timezone(data["timezone"]),
+ run_offset=offset,
+ run_immediately=decode_run_immediately(data.get("run_immediately",
False)),
+ key_format=data["key_format"],
+ )
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.encoders import encode_run_immediately,
encode_timezone
+
+ return {
+ "expression": self._expression,
+ "timezone": encode_timezone(self._timezone),
+ "run_immediately": encode_run_immediately(self._run_immediately),
+ "run_offset": self._run_offset,
+ "key_format": self._key_format,
+ }
+
+ def _get_partition_date(self, *, run_date) -> DateTime:
+ if self._run_offset == 0:
+ return run_date
+ # we will need to apply offset to determine run date
+ partition_date = timezone.coerce_datetime(run_date)
+ log.info(
+ "applying offset to partition date",
partition_date=partition_date, run_offset=self._run_offset
+ )
+ iter_func = self._get_next if self._run_offset > 0 else self._get_prev
+ for _ in range(abs(self._run_offset)):
+ partition_date = iter_func(partition_date)
+ log.info("new partition date", partition_date=partition_date)
+ return partition_date
+
+ def next_dagrun_info_v2(
+ self,
+ *,
+ last_dagrun_info: DagRunInfo | None,
+ restriction: TimeRestriction,
+ ) -> DagRunInfo | None:
+ # todo: AIP-76 add test for this logic
+ # todo: AIP-76 we will have to ensure that the start / end times apply
to the partition date ideally,
+ # rather than just the run after
+
+ if restriction.catchup:
+ if last_dagrun_info is not None:
+ next_start_time = self._get_next(last_dagrun_info.run_after)
+ elif restriction.earliest is None:
+ next_start_time = self._calc_first_run()
+ else:
+ next_start_time = self._align_to_next(restriction.earliest)
+ else:
+ prev_candidate = self._align_to_prev(coerce_datetime(utcnow()))
+ start_time_candidates = [prev_candidate]
+ if last_dagrun_info is not None:
+ next_candidate = self._get_next(last_dagrun_info.run_after)
+ start_time_candidates.append(next_candidate)
+ elif restriction.earliest is None:
+ # Run immediately has no effect if there is restriction on
earliest
+ first_run = self._calc_first_run()
+ start_time_candidates.append(first_run)
+ if restriction.earliest is not None:
+ earliest = self._align_to_next(restriction.earliest)
+ start_time_candidates.append(earliest)
+ next_start_time = max(start_time_candidates)
+ if restriction.latest is not None and restriction.latest <
next_start_time:
+ return None
+
+ partition_date, partition_key =
self._get_partition_info(run_date=next_start_time)
+ return DagRunInfo(
+ run_after=next_start_time,
+ partition_date=partition_date,
+ partition_key=partition_key,
+ data_interval=None,
+ )
+
+ def _get_partition_info(self, run_date: DateTime) -> tuple[DateTime, str]:
+ # todo: AIP-76 it does not make sense that we would infer partition
info from run date
+ # in general, because they might not be 1-1
+ partition_date = self._get_partition_date(run_date=run_date)
+ partition_key = self._format_key(partition_date)
+ return partition_date, partition_key
+
+ def _format_key(self, partition_date: DateTime) -> str:
+ return partition_date.strftime(self._key_format)
+
+ def generate_run_id(
+ self,
+ *,
+ run_type: DagRunType,
+ run_after: DateTime,
+ data_interval: DataInterval | None,
+ **extra,
+ ) -> str:
+ components = [
+ run_after.isoformat(),
+ extra.get("partition_key"),
+ get_random_string(),
+ ]
+ return run_type.generate_run_id(suffix="__".join(components))
+
+ def next_run_info_from_dag_model(self, *, dag_model: DagModel) ->
DagRunInfo:
+ run_after =
timezone.coerce_datetime(dag_model.next_dagrun_create_after)
+ if TYPE_CHECKING:
+ assert run_after is not None
+ partition_date, partition_key =
self._get_partition_info(run_date=run_after)
+ return DagRunInfo(
+ run_after=run_after,
+ data_interval=None,
+ partition_date=partition_date,
+ partition_key=partition_key,
+ )
+
+ def run_info_from_dag_run(self, *, dag_run: DagRun) -> DagRunInfo:
+ run_after = timezone.coerce_datetime(dag_run.run_after)
+ # todo: AIP-76 store this on DagRun so we don't need to recalculate?
+ # todo: AIP-76 this needs to be public
+ partition_date = self._get_partition_date(run_date=dag_run.run_after)
+ return DagRunInfo(
+ run_after=run_after,
+ data_interval=None,
+ partition_date=partition_date,
+ partition_key=dag_run.partition_key,
+ )
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 98edd6364fb..01f87c2ca22 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -67,6 +67,10 @@ else:
pytestmark = pytest.mark.db_test
+jan_1 = DEFAULT_DATE
+jan_6 = DEFAULT_DATE + timedelta(days=5)
+dec_27 = DEFAULT_DATE + timedelta(days=-5)
+
class TestCliDags:
parser: argparse.ArgumentParser
@@ -145,74 +149,84 @@ class TestCliDags:
assert "OUT" in out
assert "ERR" in out
- def test_next_execution(self, tmp_path, stdout_capture):
- dag_test_list = [
- ("future_schedule_daily", "timedelta(days=5)", "'0 0 * * *'",
"True"),
- ("future_schedule_every_4_hours", "timedelta(days=5)",
"timedelta(hours=4)", "True"),
- ("future_schedule_once", "timedelta(days=5)", "'@once'", "True"),
- ("future_schedule_none", "timedelta(days=5)", "None", "True"),
- ("past_schedule_once", "timedelta(days=-5)", "'@once'", "True"),
- ("past_schedule_daily", "timedelta(days=-5)", "'0 0 * * *'",
"True"),
- ("past_schedule_daily_catchup_false", "timedelta(days=-5)", "'0 0
* * *'", "False"),
- ]
-
- for f in dag_test_list:
- file_content = os.linesep.join(
- [
- "from airflow import DAG",
- "from airflow.providers.standard.operators.empty import
EmptyOperator",
- "from datetime import timedelta; from pendulum import
today",
- f"dag = DAG('{f[0]}', start_date=today() + {f[1]},
schedule={f[2]}, catchup={f[3]})",
- "task = EmptyOperator(task_id='empty_task',dag=dag)",
- ]
- )
- dag_file = tmp_path / f"{f[0]}.py"
- dag_file.write_text(file_content)
-
- with time_machine.travel(DEFAULT_DATE):
- clear_db_dags()
- parse_and_sync_to_db(tmp_path, include_examples=False)
-
- default_run = DEFAULT_DATE
- future_run = default_run + timedelta(days=5)
- past_run = default_run + timedelta(days=-5)
-
- expected_output = {
- "future_schedule_daily": (
- future_run.isoformat(),
- future_run.isoformat() + os.linesep + (future_run +
timedelta(days=1)).isoformat(),
+ @pytest.mark.parametrize(
+ ("dag_id", "delta", "schedule", "catchup", "first", "second"),
+ [
+ (
+ "future_schedule_daily",
+ "timedelta(days=5)",
+ "'0 0 * * *'",
+ "True",
+ jan_6.isoformat(),
+ jan_6.isoformat() + os.linesep + (jan_6 +
timedelta(days=1)).isoformat(),
),
- "future_schedule_every_4_hours": (
- future_run.isoformat(),
- future_run.isoformat() + os.linesep + (future_run +
timedelta(hours=4)).isoformat(),
+ (
+ "future_schedule_every_4_hours",
+ "timedelta(days=5)",
+ "timedelta(hours=4)",
+ "True",
+ jan_6.isoformat(),
+ jan_6.isoformat() + os.linesep + (jan_6 +
timedelta(hours=4)).isoformat(),
),
- "future_schedule_once": (future_run.isoformat(),
future_run.isoformat() + os.linesep + "None"),
- "future_schedule_none": ("None", "None"),
- "past_schedule_once": (past_run.isoformat(), "None"),
- "past_schedule_daily": (
- past_run.isoformat(),
- past_run.isoformat() + os.linesep + (past_run +
timedelta(days=1)).isoformat(),
+ (
+ "future_schedule_once",
+ "timedelta(days=5)",
+ "'@once'",
+ "True",
+ jan_6.isoformat(),
+ jan_6.isoformat() + os.linesep + "None",
),
- "past_schedule_daily_catchup_false": (
- (default_run - timedelta(days=1)).isoformat(),
- (default_run - timedelta(days=1)).isoformat() + os.linesep +
default_run.isoformat(),
+ ("future_schedule_none", "timedelta(days=5)", "None", "True",
"None", "None"),
+ ("past_schedule_once", "timedelta(days=-5)", "'@once'", "True",
dec_27.isoformat(), "None"),
+ (
+ "past_schedule_daily",
+ "timedelta(days=-5)",
+ "'0 0 * * *'",
+ "True",
+ dec_27.isoformat(),
+ dec_27.isoformat() + os.linesep + (dec_27 +
timedelta(days=1)).isoformat(),
),
- }
+ (
+ "past_schedule_daily_catchup_false",
+ "timedelta(days=-5)",
+ "'0 0 * * *'",
+ "False",
+ (jan_1 - timedelta(days=1)).isoformat(),
+ (jan_1 - timedelta(days=1)).isoformat() + os.linesep +
jan_1.isoformat(),
+ ),
+ ],
+ )
+ def test_next_execution(self, dag_id, delta, schedule, catchup, first,
second, tmp_path, stdout_capture):
+ file_content = os.linesep.join(
+ [
+ "from airflow import DAG",
+ "from airflow.providers.standard.operators.empty import
EmptyOperator",
+ "from datetime import timedelta; from pendulum import today",
+ f"dag = DAG('{dag_id}', start_date=today(tz='UTC') + {delta},
schedule={schedule}, catchup={catchup})",
+ "task = EmptyOperator(task_id='empty_task',dag=dag)",
+ ]
+ )
+ dag_file = tmp_path / f"{dag_id}.py"
+ dag_file.write_text(file_content)
- for dag_id in expected_output:
- # Test num-executions = 1 (default)
- args = self.parser.parse_args(["dags", "next-execution", dag_id])
- with stdout_capture as temp_stdout:
- dag_command.dag_next_execution(args)
- out = temp_stdout.getvalue()
- assert expected_output[dag_id][0] in out
+ print(file_content)
+ with time_machine.travel(DEFAULT_DATE):
+ clear_db_dags()
+ parse_and_sync_to_db(tmp_path, include_examples=False)
- # Test num-executions = 2
- args = self.parser.parse_args(["dags", "next-execution", dag_id,
"--num-executions", "2"])
- with stdout_capture as temp_stdout:
- dag_command.dag_next_execution(args)
- out = temp_stdout.getvalue()
- assert expected_output[dag_id][1] in out
+ # Test num-executions = 1 (default)
+ args = self.parser.parse_args(["dags", "next-execution", dag_id])
+ with stdout_capture as temp_stdout:
+ dag_command.dag_next_execution(args)
+ out = temp_stdout.getvalue()
+ assert first in out
+
+ # Test num-executions = 2
+ args = self.parser.parse_args(["dags", "next-execution", dag_id,
"--num-executions", "2"])
+ with stdout_capture as temp_stdout:
+ dag_command.dag_next_execution(args)
+ out = temp_stdout.getvalue()
+ assert second in out
# Rebuild Test DB for other tests
clear_db_dags()
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index c6b3d026082..069d72b5eca 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -88,7 +88,7 @@ from airflow.sdk.definitions.callback import AsyncCallback,
SyncCallback
from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
from airflow.serialization.definitions.dag import SerializedDAG
from airflow.serialization.serialized_objects import LazyDeserializedDAG
-from airflow.timetables.base import DataInterval
+from airflow.timetables.base import DagRunInfo, DataInterval
from airflow.utils.session import create_session, provide_session
from airflow.utils.span_status import SpanStatus
from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -4053,7 +4053,7 @@ class TestSchedulerJob:
# To increase the chances the TIs from the "full" pool will get
# retrieved first, we schedule all TIs from the first dag first.
def _create_dagruns(dag: SerializedDAG):
- next_info = dag.next_dagrun_info(None)
+ next_info = dag.next_dagrun_info(last_automated_run_info=None)
assert next_info is not None
for i in range(30):
yield dag.create_dagrun(
@@ -4066,7 +4066,7 @@ class TestSchedulerJob:
triggered_by=DagRunTriggeredByType.TEST,
session=session,
)
- next_info = dag.next_dagrun_info(next_info.data_interval)
+ next_info =
dag.next_dagrun_info(last_automated_run_info=next_info)
if next_info is None:
break
@@ -4346,7 +4346,7 @@ class TestSchedulerJob:
bash_command="exit 1",
retries=1,
)
- dag_maker.dag_model.calculate_dagrun_date_fields(dag, None)
+ dag_maker.dag_model.calculate_dagrun_date_fields(dag,
last_automated_run=None)
@provide_session
def do_schedule(session):
@@ -4555,6 +4555,7 @@ class TestSchedulerJob:
session.rollback()
def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self,
dag_maker):
+ # todo: this fails
dag_id = "test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs"
with dag_maker(dag_id=dag_id, schedule="@daily"):
EmptyOperator(task_id="task1")
@@ -4645,6 +4646,46 @@ class TestSchedulerJob:
self.job_runner._create_dag_runs(dag_models=[dag_maker.dag_model],
session=session)
assert mock_calc.called
+ @pytest.mark.parametrize(
+ "expected",
+ [
+ DagRunInfo(
+ run_after=pendulum.now(),
+ data_interval=None,
+ partition_date=None,
+ partition_key="helloooooo",
+ ),
+ DagRunInfo(
+ run_after=pendulum.now(),
+ data_interval=DataInterval(pendulum.today(), pendulum.today()),
+ partition_date=None,
+ partition_key=None,
+ ),
+ ],
+ )
+ @patch("airflow.timetables.base.Timetable.next_run_info_from_dag_model")
+ @patch("airflow.serialization.definitions.dag.SerializedDAG.create_dagrun")
+ def test_should_use_info_from_timetable(self, mock_create, mock_next,
expected, session, dag_maker):
+ """We should always update next_dagrun after scheduler creates a new
dag run."""
+ mock_next.return_value = expected
+ with dag_maker(schedule="0 0 * * *"):
+ EmptyOperator(task_id="dummy")
+
+ scheduler_job = Job(executor=self.null_exec)
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+ self.job_runner._create_dag_runs(dag_models=[dag_maker.dag_model],
session=session)
+ kwargs = mock_create.call_args.kwargs
+ # todo: AIP-76 let's add partition_date to dag run
+ # and we should probably have something on DagRun that can return the
DagRunInfo for
+ # that dag run. See https://github.com/apache/airflow/issues/61167
+ actual = DagRunInfo(
+ run_after=kwargs["run_after"],
+ data_interval=kwargs["data_interval"],
+ partition_key=kwargs["partition_key"],
+ partition_date=None,
+ )
+ assert actual == expected
+
@pytest.mark.parametrize(
("run_type", "expected"),
[
@@ -5055,9 +5096,7 @@ class TestSchedulerJob:
scheduler_messages = [
record.message for record in caplog.records if record.levelno
>= logging.ERROR
]
- assert scheduler_messages == [
- "DAG 'test_scheduler_create_dag_runs_does_not_raise_error' not
found in serialized_dag table",
- ]
+ assert scheduler_messages == ["Dag not found in serialized_dag
table"]
def _clear_serdags(self, dag_id, session):
SDM = SerializedDagModel
@@ -6253,7 +6292,13 @@ class TestSchedulerJob:
run_id="dr1_run_2",
state=State.QUEUED,
logical_date=dag.next_dagrun_info(
- last_automated_dagrun=data_interval, restricted=False
+ last_automated_run_info=DagRunInfo(
+ run_after=dr1_running.run_after,
+ data_interval=data_interval,
+ partition_date=None,
+ partition_key=None,
+ ),
+ restricted=False,
).data_interval.start,
)
# second dag and dagruns
@@ -7324,7 +7369,7 @@ class TestSchedulerJob:
job_runner = SchedulerJobRunner(job=scheduler_job)
# In the dagmodel list, the first dag should fail, but the second one
should succeed
job_runner._create_dag_runs([dm1], session)
- assert "Failed creating DagRun for testdag1" in caplog.text
+ assert "Failed creating DagRun" in caplog.text
def test_activate_referenced_assets_with_no_existing_warning(self,
session, testing_dag_bundle):
dag_warnings = session.scalars(select(DagWarning)).all()
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 1853b4402d1..e5366fc6127 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -82,6 +82,7 @@ from airflow.timetables.simple import (
NullTimetable,
OnceTimetable,
)
+from airflow.timetables.trigger import CronPartitionTimetable
from airflow.utils.file import list_py_file_paths
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -1483,11 +1484,11 @@ my_postgres_conn:
dag = DAG("test_scheduler_dagrun_once",
start_date=timezone.datetime(2015, 1, 1), schedule="@once")
scheduler_dag = create_scheduler_dag(dag)
- next_info = scheduler_dag.next_dagrun_info(None)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
assert next_info
assert next_info.logical_date == timezone.datetime(2015, 1, 1)
- next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
assert next_info is None
def test_next_dagrun_info_catchup(self):
@@ -1528,7 +1529,8 @@ my_postgres_conn:
start_date=six_hours_ago_to_the_hour,
catchup=False,
)
- next_date, _ = dag1.next_dagrun_info(None)
+ info = dag1.next_dagrun_info(last_automated_run_info=None)
+ next_date = info.run_after
# The DR should be scheduled in the last half an hour, not 6 hours ago
assert next_date > half_an_hour_ago
assert next_date < timezone.utcnow()
@@ -1540,7 +1542,8 @@ my_postgres_conn:
catchup=False,
)
- next_date, _ = dag2.next_dagrun_info(None)
+ info = dag2.next_dagrun_info(last_automated_run_info=None)
+ next_date = info.run_after
# The DR should be scheduled in the last 2 hours, not 6 hours ago
assert next_date > two_hours_ago
# The DR should be scheduled BEFORE now
@@ -1553,7 +1556,8 @@ my_postgres_conn:
catchup=False,
)
- next_date, _ = dag3.next_dagrun_info(None)
+ info = dag3.next_dagrun_info(last_automated_run_info=None)
+ next_date = info.run_after
# The DR should be scheduled in the last 2 hours, not 6 hours ago
assert next_date == six_hours_ago_to_the_hour
@@ -1572,12 +1576,12 @@ my_postgres_conn:
)
scheduler_dag = create_scheduler_dag(dag)
- next_info = scheduler_dag.next_dagrun_info(None)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
assert next_info
assert next_info.logical_date == timezone.datetime(2020, 1, 4)
# The date to create is in the future, this is handled by
"DagModel.dags_needing_dagruns"
- next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
assert next_info
assert next_info.logical_date == timezone.datetime(2020, 1, 5)
@@ -1595,20 +1599,20 @@ my_postgres_conn:
)
scheduler_dag = create_scheduler_dag(dag)
- next_info = scheduler_dag.next_dagrun_info(None)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
assert next_info
assert next_info.logical_date == timezone.datetime(2020, 5, 1)
- next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
assert next_info
assert next_info.logical_date == timezone.datetime(2020, 5, 2)
- next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
assert next_info
assert next_info.logical_date == timezone.datetime(2020, 5, 3)
# The date to create is in the future, this is handled by
"DagModel.dags_needing_dagruns"
- next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
assert next_info
assert next_info.logical_date == timezone.datetime(2020, 5, 4)
@@ -1616,7 +1620,7 @@ my_postgres_conn:
"""Test the DAG does not crash the scheduler if the timetable raises
an exception."""
class FailingTimetable(Timetable):
- def next_dagrun_info(self, last_automated_data_interval,
restriction):
+ def next_dagrun_info(self, last_automated_run_info, restriction):
raise RuntimeError("this fails")
def _find_registered_custom_timetable(s):
@@ -1646,19 +1650,16 @@ my_postgres_conn:
assert len(records) == 1
record = records[0]
assert record.exc_info is not None, "Should contain exception"
- assert record.message == (
- f"Failed to fetch run info after data interval {data_interval}
"
- f"for DAG 'test_next_dagrun_info_timetable_exception'"
- )
+ assert record.message == "Failed to fetch run info"
with caplog.at_level(level=logging.ERROR):
- next_info = scheduler_dag.next_dagrun_info(None)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
assert next_info is None, "failed next_dagrun_info should return None"
_check_logs(caplog.records, data_interval=None)
caplog.clear()
data_interval = DataInterval(timezone.datetime(2020, 5, 1),
timezone.datetime(2020, 5, 2))
with caplog.at_level(level=logging.ERROR):
- next_info = scheduler_dag.next_dagrun_info(data_interval)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=data_interval)
assert next_info is None, "failed next_dagrun_info should return None"
_check_logs(caplog.records, data_interval)
@@ -1683,7 +1684,7 @@ my_postgres_conn:
EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
scheduler_dag = create_scheduler_dag(dag)
- next_info = scheduler_dag.next_dagrun_info(None)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
assert next_info
assert next_info.logical_date == timezone.datetime(2016, 1, 2, 5, 4)
@@ -1696,7 +1697,7 @@ my_postgres_conn:
EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
scheduler_dag = create_scheduler_dag(dag)
- next_info = scheduler_dag.next_dagrun_info(None)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
assert next_info
assert next_info.logical_date == timezone.datetime(2016, 1, 1, 10, 10)
@@ -1711,7 +1712,7 @@ my_postgres_conn:
EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
scheduler_dag = create_scheduler_dag(dag)
- next_info = scheduler_dag.next_dagrun_info(None)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
assert next_info
# With catchup=False, next_dagrun should be based on the current date
# Verify it's not using the old start_date
@@ -1732,7 +1733,7 @@ my_postgres_conn:
EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
scheduler_dag = create_scheduler_dag(dag)
- next_info = scheduler_dag.next_dagrun_info(None)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
assert next_info
# With catchup=False, next_dagrun should be based on the current date
# Verify it's not using the old start_date
@@ -1748,11 +1749,11 @@ my_postgres_conn:
)
scheduler_dag = create_scheduler_dag(dag)
- next_info = scheduler_dag.next_dagrun_info(None)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=None)
assert next_info
assert next_info.logical_date == timezone.datetime(2024, 2, 29)
- next_info = scheduler_dag.next_dagrun_info(next_info.data_interval)
+ next_info =
scheduler_dag.next_dagrun_info(last_automated_run_info=next_info)
assert next_info.logical_date == timezone.datetime(2028, 2, 29)
assert next_info.data_interval.start == timezone.datetime(2028, 2, 29)
assert next_info.data_interval.end == timezone.datetime(2032, 2, 29)
@@ -2748,7 +2749,7 @@ def test_iter_dagrun_infos_between_error(caplog):
end = pendulum.instance(DEFAULT_DATE)
class FailingAfterOneTimetable(Timetable):
- def next_dagrun_info(self, last_automated_data_interval, restriction):
+ def next_dagrun_info(self, last_automated_data_interval: DataInterval
| None, restriction):
if last_automated_data_interval is None:
return DagRunInfo.interval(start, end)
raise RuntimeError("this fails")
@@ -3554,3 +3555,48 @@ def test_get_run_data_interval_pre_aip_39():
ds_end = current_ts.replace(hour=0, minute=0, second=0, microsecond=0)
timetable = coerce_to_core_timetable(dag.timetable)
assert get_run_data_interval(timetable, dr) ==
DataInterval(start=ds_start, end=ds_end)
+
+
[email protected](
+ ("schedule", "next_run", "next_interval", "next_run_after"),
+ [
+ (
+ CronPartitionTimetable(
+ "0 0 * * *",
+ timezone=pendulum.UTC,
+ ),
+ TEST_DATE + timedelta(days=1),
+ None,
+ TEST_DATE + timedelta(days=1),
+ ),
+ (
+ "0 0 * * *",
+ TEST_DATE,
+ DataInterval(start=TEST_DATE, end=TEST_DATE + timedelta(days=1)),
+ TEST_DATE + timedelta(days=1),
+ ),
+ (
+ None,
+ None,
+ None,
+ None,
+ ),
+ (
+ "@once",
+ None,
+ None,
+ None,
+ ),
+ ],
+)
+def test_calculate_dagrun_date_fields(schedule, next_run, next_interval,
next_run_after, dag_maker, session):
+ with dag_maker(schedule=schedule, catchup=True, start_date=TEST_DATE):
+ BashOperator(task_id="hi", bash_command="yo")
+
+ run = dag_maker.create_dagrun()
+ serdag = dag_maker.serialized_dag
+ dag_model = dag_maker.dag_model
+ dag_model.calculate_dagrun_date_fields(dag=serdag, last_automated_run=run)
+ assert dag_model.next_dagrun_data_interval == next_interval
+ assert dag_model.next_dagrun == next_run
+ assert dag_model.next_dagrun_create_after == next_run_after
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index 5bf2103c5e5..7449a69b5f0 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -1127,7 +1127,7 @@ class TestDagRun:
expected_stat_tags = {"dag_id": f"{dag.dag_id}", "run_type":
DagRunType.SCHEDULED}
scheduler_dag = sync_dag_to_db(dag, session=session)
try:
- info = scheduler_dag.next_dagrun_info(None)
+ info = scheduler_dag.next_dagrun_info(last_automated_run_info=None)
orm_dag_kwargs = {
"dag_id": dag.dag_id,
"bundle_name": "testing",
diff --git a/airflow-core/tests/unit/timetables/test_events_timetable.py
b/airflow-core/tests/unit/timetables/test_events_timetable.py
index f4df62e39e1..80166b57389 100644
--- a/airflow-core/tests/unit/timetables/test_events_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_events_timetable.py
@@ -69,7 +69,9 @@ def restricted_timetable():
list(zip(EVENT_DATES, EVENT_DATES)),
)
def test_dag_run_info_interval(start: pendulum.DateTime, end:
pendulum.DateTime):
- expected_info = DagRunInfo(run_after=end,
data_interval=DataInterval(start, end))
+ expected_info = DagRunInfo(
+ run_after=end, data_interval=DataInterval(start, end),
partition_date=None, partition_key=None
+ )
assert DagRunInfo.interval(start, end) == expected_info
diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
index eecd2bdfb0a..a5774bb22c5 100644
--- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
@@ -23,11 +23,14 @@ import dateutil.relativedelta
import pendulum
import pytest
import time_machine
+from sqlalchemy import select
from airflow._shared.timezones.timezone import utc
from airflow.exceptions import AirflowTimetableInvalid
+from airflow.models import DagModel
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
from airflow.timetables.trigger import (
+ CronPartitionTimetable,
CronTriggerTimetable,
DeltaTriggerTimetable,
MultipleCronTriggerTimetable,
@@ -599,3 +602,210 @@ def test_multi_serialization():
assert tt._timetables[0]._timezone == tt._timetables[1]._timezone == utc
assert tt._timetables[0]._interval == tt._timetables[1]._interval ==
datetime.timedelta(minutes=10)
assert tt._timetables[0]._run_immediately ==
tt._timetables[1]._run_immediately is False
+
+
[email protected]_test
[email protected]_serialized_dag
+def test_latest_run_no_history(dag_maker, session):
+ start_date = pendulum.datetime(2026, 1, 1)
+ with dag_maker(
+ "test",
+ start_date=start_date,
+ catchup=True,
+ schedule=CronPartitionTimetable(
+ "0 * * * *",
+ timezone=pendulum.UTC,
+ ),
+ ):
+ pass
+ dag_maker.sync_dagbag_to_db()
+ session.commit()
+ dm = session.scalar(select(DagModel))
+ assert dm.next_dagrun == start_date
+
+
[email protected]_test
[email protected]_serialized_dag
+def test_latest_run_with_run(dag_maker, session):
+ """
+ This ensures that the dag processor will figure out the next run correctly
+ """
+ start_date = pendulum.datetime(2026, 1, 1)
+ with dag_maker(
+ "test",
+ start_date=start_date,
+ catchup=True,
+ schedule=CronPartitionTimetable(
+ "0 0 * * *",
+ timezone=pendulum.UTC,
+ ),
+ ):
+ pass
+ dag_maker.sync_dagbag_to_db()
+ dag_maker.create_dagrun(
+ run_id="abc1234",
+ logical_date=None,
+ data_interval=None,
+ run_type="scheduled",
+ run_after=start_date + datetime.timedelta(days=3),
+ partition_key="anything",
+ session=session,
+ )
+ session.commit()
+ dag_maker.sync_dag_to_db()
+ session.commit()
+ dm = session.scalar(select(DagModel))
+ assert dm.next_dagrun == start_date + datetime.timedelta(days=4)
+
+
[email protected]_test
[email protected](
+ ("schedule", "partition_key", "expected"),
+ [
+ (
+ CronPartitionTimetable(
+ "0 0 * * *",
+ timezone=pendulum.UTC,
+ ),
+ "key-1",
+ DagRunInfo(
+ run_after=START_DATE + datetime.timedelta(days=3),
+ data_interval=None,
+ partition_date=START_DATE + datetime.timedelta(days=3),
+ partition_key="key-1",
+ ),
+ ),
+ (
+ "0 0 * * *",
+ None,
+ DagRunInfo(
+ run_after=START_DATE + datetime.timedelta(days=3),
+ data_interval=DataInterval(
+ start=START_DATE + datetime.timedelta(days=2),
+ end=START_DATE + datetime.timedelta(days=3),
+ ),
+ partition_date=None,
+ partition_key=None,
+ ),
+ ),
+ ],
+)
+def test_run_info_from_dag_run(schedule, partition_key, expected, dag_maker,
session):
+ with dag_maker(
+ "test",
+ start_date=START_DATE,
+ catchup=True,
+ schedule=schedule,
+ ):
+ pass
+ dag_maker.sync_dagbag_to_db()
+ dr = dag_maker.create_dagrun(
+ run_id="abc1234",
+ logical_date=None,
+ data_interval=None,
+ run_type="scheduled",
+ run_after=START_DATE + datetime.timedelta(days=3),
+ partition_key=partition_key,
+ session=session,
+ )
+ info = dag_maker.serialized_dag.timetable.run_info_from_dag_run(dag_run=dr)
+ assert info == expected
+
+
[email protected]_test
[email protected]_serialized_dag
[email protected](
+ ("schedule", "expected"),
+ [
+ (
+ CronPartitionTimetable(
+ "0 0 * * *",
+ timezone=pendulum.UTC,
+ ),
+ DagRunInfo(
+ run_after=START_DATE,
+ data_interval=None,
+ partition_date=START_DATE,
+ partition_key="2021-09-04T00:00:00",
+ ),
+ ),
+ (
+ "0 0 * * *",
+ DagRunInfo(
+ run_after=START_DATE + datetime.timedelta(days=1),
+ data_interval=DataInterval(
+ start=START_DATE,
+ end=START_DATE + datetime.timedelta(days=1),
+ ),
+ partition_date=None,
+ partition_key=None,
+ ),
+ ),
+ ],
+)
+def test_next_dagrun_info_v2(schedule, expected, dag_maker, session):
+ """
+ This ensures that the dag processor will figure out the next run correctly
+ """
+ with dag_maker(
+ "test",
+ start_date=START_DATE,
+ catchup=True,
+ schedule=schedule,
+ ):
+ pass
+ dag_maker.sync_dagbag_to_db()
+ serdag = dag_maker.serialized_dag
+ timetable = serdag.timetable
+ info = timetable.next_dagrun_info_v2(
+ last_dagrun_info=None,
+ restriction=serdag._time_restriction,
+ )
+ assert info == expected
+
+
[email protected]_test
[email protected]_serialized_dag
[email protected](
+ ("schedule", "partition_key", "expected"),
+ [
+ (
+ CronPartitionTimetable(
+ "0 0 * * *",
+ timezone=pendulum.UTC,
+ ),
+ "key-1",
+ DagRunInfo(
+ run_after=START_DATE,
+ data_interval=None,
+ partition_date=START_DATE,
+ partition_key="2021-09-04T00:00:00",
+ ),
+ ),
+ (
+ "0 0 * * *",
+ None,
+ DagRunInfo(
+ run_after=START_DATE + datetime.timedelta(days=1),
+ data_interval=DataInterval(
+ start=START_DATE,
+ end=START_DATE + datetime.timedelta(days=1),
+ ),
+ partition_date=None,
+ partition_key=None,
+ ),
+ ),
+ ],
+)
+def test_next_run_info_from_dag_model(schedule, partition_key, expected,
dag_maker, session):
+ with dag_maker(
+ "test",
+ start_date=START_DATE,
+ catchup=True,
+ schedule=schedule,
+ ):
+ pass
+ dag_maker.sync_dagbag_to_db()
+ dm = dag_maker.dag_model
+ info =
dag_maker.serialized_dag.timetable.next_run_info_from_dag_model(dag_model=dm)
+ assert info == expected
diff --git a/airflow-core/tests/unit/timetables/test_workday_timetable.py
b/airflow-core/tests/unit/timetables/test_workday_timetable.py
index aadf8e67a2f..abf730d10af 100644
--- a/airflow-core/tests/unit/timetables/test_workday_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_workday_timetable.py
@@ -57,7 +57,12 @@ def timetable():
list(zip(WEEK_1_WEEKDAYS[:-1], WEEK_1_WEEKDAYS[1:])),
)
def test_dag_run_info_interval(start: pendulum.DateTime, end:
pendulum.DateTime):
- expected_info = DagRunInfo(run_after=end,
data_interval=DataInterval(start, end))
+ expected_info = DagRunInfo(
+ run_after=end,
+ data_interval=DataInterval(start, end),
+ partition_date=None,
+ partition_key=None,
+ )
assert DagRunInfo.interval(start, end) == expected_info
diff --git a/devel-common/src/tests_common/pytest_plugin.py
b/devel-common/src/tests_common/pytest_plugin.py
index 9622906efff..dc5e8f38f6a 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -53,7 +53,7 @@ if TYPE_CHECKING:
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.sdk.types import DagRunProtocol, Operator
from airflow.serialization.definitions.dag import SerializedDAG
- from airflow.timetables.base import DataInterval
+ from airflow.timetables.base import DagRunInfo, DataInterval
from airflow.typing_compat import Self
from airflow.utils.state import DagRunState, TaskInstanceState
@@ -1065,7 +1065,7 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
- timezone = _import_timezone()
+ airflow_timezone = _import_timezone()
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.types import DagRunTriggeredByType
@@ -1100,8 +1100,12 @@ def dag_maker(request) -> Generator[DagMaker, None,
None]:
if run_type == DagRunType.MANUAL:
logical_date = self.start_date
else:
- logical_date = dag.next_dagrun_info(None).logical_date
- logical_date = timezone.coerce_datetime(logical_date)
+ if AIRFLOW_V_3_2_PLUS:
+ next_run_kwargs = dict(last_automated_run_info=None)
+ else:
+ next_run_kwargs = dict(last_automated_dagrun=None)
+ logical_date =
dag.next_dagrun_info(**next_run_kwargs).logical_date
+ logical_date = airflow_timezone.coerce_datetime(logical_date)
data_interval = None
try:
@@ -1125,13 +1129,15 @@ def dag_maker(request) -> Generator[DagMaker, None,
None]:
if AIRFLOW_V_3_0_PLUS:
kwargs["run_id"] = dag.timetable.generate_run_id(
run_type=run_type,
- run_after=logical_date or
timezone.coerce_datetime(timezone.utcnow()),
+ run_after=logical_date
+ or
airflow_timezone.coerce_datetime(airflow_timezone.utcnow()),
data_interval=data_interval,
)
else:
kwargs["run_id"] = dag.timetable.generate_run_id(
run_type=run_type,
- logical_date=logical_date or
timezone.coerce_datetime(timezone.utcnow()),
+ logical_date=logical_date
+ or
airflow_timezone.coerce_datetime(airflow_timezone.utcnow()),
data_interval=data_interval,
)
kwargs["run_type"] = run_type
@@ -1139,7 +1145,9 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
if AIRFLOW_V_3_0_PLUS:
kwargs.setdefault("triggered_by", DagRunTriggeredByType.TEST)
kwargs["logical_date"] = logical_date
- kwargs.setdefault("run_after", data_interval[-1] if
data_interval else timezone.utcnow())
+ kwargs.setdefault(
+ "run_after", data_interval[-1] if data_interval else
airflow_timezone.utcnow()
+ )
else:
kwargs.pop("triggered_by", None)
kwargs["execution_date"] = logical_date
@@ -1158,9 +1166,13 @@ def dag_maker(request) -> Generator[DagMaker, None,
None]:
if AIRFLOW_V_3_1_PLUS:
from airflow.models.dag import get_run_data_interval
- next_info =
sdag.next_dagrun_info(get_run_data_interval(sdag.timetable, dagrun))
+ interval = get_run_data_interval(sdag.timetable, dagrun)
+ last_run_info = self._get_run_info(dagrun, sdag, interval)
+ next_info =
sdag.next_dagrun_info(last_automated_run_info=last_run_info)
else:
- next_info =
sdag.next_dagrun_info(sdag.get_run_data_interval(dagrun))
+ interval = sdag.get_run_data_interval(dagrun)
+ last_run_info = self._get_run_info(dagrun, sdag, interval)
+ next_info =
sdag.next_dagrun_info(last_automated_run_info=last_run_info)
if next_info is None:
raise ValueError(f"cannot create run after {dagrun}")
return self.create_dagrun(
@@ -1169,6 +1181,20 @@ def dag_maker(request) -> Generator[DagMaker, None,
None]:
**kwargs,
)
+ def _get_run_info(self, dagrun: DagRun, serdag: SerializedDAG,
interval: DataInterval) -> DagRunInfo:
+ from airflow.timetables.base import DagRunInfo
+
+ if AIRFLOW_V_3_2_PLUS:
+ return serdag.timetable.run_info_from_dag_run(dag_run=dagrun)
+
+ airflow_timezone = _import_timezone()
+ return DagRunInfo(
+ run_after=airflow_timezone.coerce_datetime(dagrun.run_after),
+ data_interval=interval,
+ partition_date=None,
+ partition_key=None,
+ )
+
def create_ti(self, task_id, dag_run=None, dag_run_kwargs=None,
map_index=-1):
"""
Create a specific task instance with proper task refresh.
@@ -2519,7 +2545,8 @@ def create_runtime_ti(mocked_parse):
)
if drinfo:
data_interval = drinfo.data_interval
- data_interval_start, data_interval_end = data_interval.start,
data_interval.end
+ data_interval_start = data_interval and data_interval.start
+ data_interval_end = data_interval and data_interval.end
dag_id = task.dag.dag_id
task_retries = task.retries or 0
diff --git a/task-sdk/src/airflow/sdk/bases/timetable.py
b/task-sdk/src/airflow/sdk/bases/timetable.py
index 5f5da55f1de..4488cc9ba6d 100644
--- a/task-sdk/src/airflow/sdk/bases/timetable.py
+++ b/task-sdk/src/airflow/sdk/bases/timetable.py
@@ -47,6 +47,8 @@ class BaseTimetable:
asset_condition: BaseAsset | None = None
+ # TODO: AIP-76 just add partition-driven field here to differentiate the
behavior
+
def validate(self) -> None:
"""
Validate the timetable is correctly specified.
diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
index 66bc3cc245e..afde7e2cfd1 100644
--- a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
+++ b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
@@ -51,7 +51,7 @@ class DeltaTriggerTimetable(DeltaMixin, BaseTimetable):
@attrs.define
class CronTriggerTimetable(CronMixin, BaseTimetable):
"""
- Timetable that triggers DAG runs according to a cron expression.
+ Timetable that triggers Dag runs according to a cron expression.
This is different from ``CronDataIntervalTimetable``, where the cron
expression specifies the *data interval* of a DAG run. With this timetable,
@@ -66,13 +66,13 @@ class CronTriggerTimetable(CronMixin, BaseTimetable):
:param timezone: Which timezone to use to interpret the cron string
:param interval: timedelta that defines the data interval start. Default 0.
- *run_immediately* controls, if no *start_time* is given to the DAG, when
- the first run of the DAG should be scheduled. It has no effect if there
- already exist runs for this DAG.
+ *run_immediately* controls, if no *start_time* is given to the Dag, when
+ the first run of the Dag should be scheduled. It has no effect if there
+ already exist runs for this Dag.
- * If *True*, always run immediately the most recent possible DAG run.
+ * If *True*, always run immediately the most recent possible Dag run.
* If *False*, wait to run until the next scheduled time in the future.
- * If passed a ``timedelta``, will run the most recent possible DAG run
+ * If passed a ``timedelta``, will run the most recent possible Dag run
if that run's ``data_interval_end`` is within timedelta of now.
* If *None*, the timedelta is calculated as 10% of the time between the
most recent past scheduled time and the next scheduled time. E.g. if