This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8dd900b6301 Set logical_date and data_interval to None for
asset-triggered dags and forbid them to be accessed in context/template (#46460)
8dd900b6301 is described below
commit 8dd900b63014b5e80f42660e090577ad8efc8a83
Author: Wei Lee <[email protected]>
AuthorDate: Thu Feb 13 15:42:00 2025 +0800
Set logical_date and data_interval to None for asset-triggered dags and
forbid them to be accessed in context/template (#46460)
* style(dag): improve type annotation
* refactor(dag): rename by_dag as adrq_by_dag
* feat(scheduler_job_runner): set logical_date, data_interval as none when
creating dag runs for asset triggered dag
* test(pytest_plugin): set run_after to now if data_interval is None
* test(scheduler_job_runner): set logical_date and data_interval of asset
triggered dag runs to none
* feat(dagrun): order queued and running dag runs by run_after instead of
logical_date
* feat(dag): get task_instances based on run_after instead of logical_date
* feat(taskinstance): change log_url base_date to use run_after instead of
logical_date
* test(test_common): rewrite create_dagrun as logical_date is now nullable
* feat(dag): get the last_dagrun by run_after
* feat(taskinstance): pass base_date to TaskInstance.log_uri only when
logical_date exists
* feat(www): fix last_dag_run through run_after
* feat(www): fetch last_dag_runs using run_after instead of logical_date in
task_stat
* feat(www): fetch dag_run through run_after instead of logical_date in
grid_data
* feat(task_sdk): remove data_interval_start, data_interval_end,
prev_data_interval_start_success, prev_data_interval_end_success for dag_run
that has no data_interval
* test(pytest_plugin): add DagRun.DATASET_TRIGERED for backward compat
* Revert "test(test_common): rewrite create_dagrun as logical_date is now
nullable"
This reverts commit a3ba5a1021d203f324dab1371d85ad11a7c55e7a.
* Revert "feat(dag): get task_instances based on run_after instead of
logical_date"
This reverts commit a8be4a7ebb424b194c8dd3183243b2516e4edb66.
* Revert "feat(dag): get the last_dagrun by run_after"
This reverts commit a1b4e6e8299b3e0e800db088bc95b30519fe6b1a.
* feat(timetable): remove AssetTriggeredTimetable.data_interval_for_events
* feat(scheduler_job_runner): use start_date directly for asset triggered
dag
* Revert "feat(www): fetch dag_run through run_after instead of
logical_date in grid_data"
This reverts commit 24a79d6d78820b5ed308b09ec9a65870e9c88384.
* Revert "feat(www): fetch last_dag_runs using run_after instead of
logical_date in task_stat"
This reverts commit 0f04880ece0a7ac42b97597bfa6a6fbe3375cb3b.
* Revert "feat(www): fix last_dag_run through run_after"
This reverts commit 5c90113b68fdf47a8bfe232e22098b4eaa0e7652.
* refactor(task_runner): merge the data_interval keys with logical_date
check logic
* feat(scheduler_job_runner): simplify _create_dag_runs_asset_triggered
logic as we don't need to check existing logical_date for asset triggered dag
runs
---
airflow/jobs/scheduler_job_runner.py | 132 +++++++++------------
airflow/models/dag.py | 18 +--
airflow/models/dagrun.py | 4 +-
airflow/timetables/simple.py | 24 +---
scripts/ci/pre_commit/template_context_key_sync.py | 13 +-
.../src/airflow/sdk/execution_time/task_runner.py | 20 ++--
tests/jobs/test_scheduler_job.py | 14 +--
tests/timetables/test_assets_timetable.py | 18 ---
tests_common/pytest_plugin.py | 4 +-
9 files changed, 104 insertions(+), 143 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 3703268b07e..4d81e464e68 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -84,6 +84,7 @@ if TYPE_CHECKING:
from datetime import datetime
from types import FrameType
+ from pendulum.datetime import DateTime
from sqlalchemy.orm import Query, Session
from airflow.executors.executor_utils import ExecutorName
@@ -502,8 +503,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if current_task_concurrency >= task_concurrency_limit:
self.log.info(
- "Not executing %s since the task concurrency
for"
- " this task has been reached.",
+ "Not executing %s since the task concurrency
for this task has been reached.",
task_instance,
)
starved_tasks.add((task_instance.dag_id,
task_instance.task_id))
@@ -1206,7 +1206,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
non_asset_dags =
all_dags_needing_dag_runs.difference(asset_triggered_dags)
self._create_dag_runs(non_asset_dags, session)
if asset_triggered_dags:
- self._create_dag_runs_asset_triggered(asset_triggered_dags,
asset_triggered_dag_info, session)
+ self._create_dag_runs_asset_triggered(
+ dag_models=asset_triggered_dags,
+ asset_triggered_dag_info=asset_triggered_dag_info,
+ session=session,
+ )
# commit the session - Release the write lock on DagModel table.
guard.commit()
@@ -1325,21 +1329,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
session: Session,
) -> None:
"""For DAGs that are triggered by assets, create dag runs."""
- # Bulk Fetch DagRuns with dag_id and logical_date same
- # as DagModel.dag_id and DagModel.next_dagrun
- # This list is used to verify if the DagRun already exist so that we
don't attempt to create
- # duplicate dag runs
- logical_dates = {
- dag_id: timezone.coerce_datetime(last_time)
- for dag_id, (_, last_time) in asset_triggered_dag_info.items()
+ triggered_dates: dict[str, DateTime] = {
+ dag_id: timezone.coerce_datetime(last_asset_event_time)
+ for dag_id, (_, last_asset_event_time) in
asset_triggered_dag_info.items()
}
- existing_dagruns: set[tuple[str, timezone.DateTime]] = set(
- session.execute(
- select(DagRun.dag_id, DagRun.logical_date).where(
- tuple_(DagRun.dag_id,
DagRun.logical_date).in_(logical_dates.items())
- )
- )
- )
for dag_model in dag_models:
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
@@ -1356,64 +1349,50 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
latest_dag_version = DagVersion.get_latest_version(dag.dag_id,
session=session)
- # Explicitly check if the DagRun already exists. This is an edge
case
- # where a Dag Run is created but `DagModel.next_dagrun` and
`DagModel.next_dagrun_create_after`
- # are not updated.
- # We opted to check DagRun existence instead
- # of catching an Integrity error and rolling back the session i.e
- # we need to set dag.next_dagrun_info if the Dag Run already
exists or if we
- # create a new one. This is so that in the next Scheduling loop we
try to create new runs
- # instead of falling in a loop of Integrity Error.
- logical_date = logical_dates[dag.dag_id]
- if (dag.dag_id, logical_date) not in existing_dagruns:
- previous_dag_run = session.scalar(
- select(DagRun)
- .where(
- DagRun.dag_id == dag.dag_id,
- DagRun.logical_date < logical_date,
- DagRun.run_type == DagRunType.ASSET_TRIGGERED,
- )
- .order_by(DagRun.logical_date.desc())
- .limit(1)
- )
- asset_event_filters = [
- DagScheduleAssetReference.dag_id == dag.dag_id,
- AssetEvent.timestamp <= logical_date,
- ]
- if previous_dag_run:
- asset_event_filters.append(AssetEvent.timestamp >
previous_dag_run.logical_date)
-
- asset_events = session.scalars(
- select(AssetEvent)
- .join(
- DagScheduleAssetReference,
- AssetEvent.asset_id ==
DagScheduleAssetReference.asset_id,
- )
- .where(*asset_event_filters)
- ).all()
-
- data_interval =
dag.timetable.data_interval_for_events(logical_date, asset_events)
- dag_run = dag.create_dagrun(
- run_id=DagRun.generate_run_id(
- run_type=DagRunType.ASSET_TRIGGERED,
- logical_date=logical_date,
- run_after=max(logical_dates.values()),
- ),
- logical_date=logical_date,
- data_interval=data_interval,
- run_after=max(logical_dates.values()),
- run_type=DagRunType.ASSET_TRIGGERED,
- triggered_by=DagRunTriggeredByType.ASSET,
- dag_version=latest_dag_version,
- state=DagRunState.QUEUED,
- creating_job_id=self.job.id,
- session=session,
+ triggered_date = triggered_dates[dag.dag_id]
+ previous_dag_run = session.scalar(
+ select(DagRun)
+ .where(
+ DagRun.dag_id == dag.dag_id,
+ DagRun.run_after < triggered_date,
+ DagRun.run_type == DagRunType.ASSET_TRIGGERED,
)
- Stats.incr("asset.triggered_dagruns")
- dag_run.consumed_asset_events.extend(asset_events)
- session.execute(
-
delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag_run.dag_id)
+ .order_by(DagRun.run_after.desc())
+ .limit(1)
+ )
+ asset_event_filters = [
+ DagScheduleAssetReference.dag_id == dag.dag_id,
+ AssetEvent.timestamp <= triggered_date,
+ ]
+ if previous_dag_run:
+ asset_event_filters.append(AssetEvent.timestamp >
previous_dag_run.run_after)
+
+ asset_events = session.scalars(
+ select(AssetEvent)
+ .join(
+ DagScheduleAssetReference,
+ AssetEvent.asset_id == DagScheduleAssetReference.asset_id,
)
+ .where(*asset_event_filters)
+ ).all()
+
+ dag_run = dag.create_dagrun(
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.ASSET_TRIGGERED, logical_date=None,
run_after=triggered_date
+ ),
+ logical_date=None,
+ data_interval=None,
+ run_after=triggered_date,
+ run_type=DagRunType.ASSET_TRIGGERED,
+ triggered_by=DagRunTriggeredByType.ASSET,
+ dag_version=latest_dag_version,
+ state=DagRunState.QUEUED,
+ creating_job_id=self.job.id,
+ session=session,
+ )
+ Stats.incr("asset.triggered_dagruns")
+ dag_run.consumed_asset_events.extend(asset_events)
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_run.dag_id))
def _should_update_dag_next_dagruns(
self,
@@ -1486,7 +1465,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag_run.state = DagRunState.RUNNING
dag_run.start_date = timezone.utcnow()
- if dag.timetable.periodic and not dag_run.external_trigger and
dag_run.clear_number < 1:
+ if (
+ dag.timetable.periodic
+ and dag_run.triggered_by != DagRunTriggeredByType.ASSET
+ and not dag_run.external_trigger
+ and dag_run.clear_number < 1
+ ):
# TODO: Logically, this should be DagRunInfo.run_after, but the
# information is not stored on a DagRun, only before the actual
# execution on DagModel.next_dagrun_create_after. We should add
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 8ae5935bad3..5ebe1c2cb2f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -304,8 +304,7 @@ def _convert_max_consecutive_failed_dag_runs(val: int) ->
int:
val = airflow_conf.getint("core",
"max_consecutive_failed_dag_runs_per_dag")
if val < 0:
raise ValueError(
- f"Invalid max_consecutive_failed_dag_runs: {val}."
- f"Requires max_consecutive_failed_dag_runs >= 0"
+ f"Invalid max_consecutive_failed_dag_runs: {val}. Requires
max_consecutive_failed_dag_runs >= 0"
)
return val
@@ -2336,11 +2335,11 @@ class DagModel(Base):
return None
# this loads all the ADRQ records.... may need to limit num dags
- by_dag: dict[str, list[AssetDagRunQueue]] = defaultdict(list)
+ adrq_by_dag: dict[str, list[AssetDagRunQueue]] = defaultdict(list)
for r in session.scalars(select(AssetDagRunQueue)):
- by_dag[r.target_dag_id].append(r)
+ adrq_by_dag[r.target_dag_id].append(r)
dag_statuses: dict[str, dict[AssetUniqueKey, bool]] = {}
- for dag_id, records in by_dag.items():
+ for dag_id, records in adrq_by_dag.items():
dag_statuses[dag_id] = {AssetUniqueKey.from_asset(x.asset): True
for x in records}
ser_dags =
SerializedDagModel.get_latest_serialized_dags(dag_ids=list(dag_statuses),
session=session)
@@ -2348,14 +2347,15 @@ class DagModel(Base):
dag_id = ser_dag.dag_id
statuses = dag_statuses[dag_id]
if not dag_ready(dag_id,
cond=ser_dag.dag.timetable.asset_condition, statuses=statuses):
- del by_dag[dag_id]
+ del adrq_by_dag[dag_id]
del dag_statuses[dag_id]
del dag_statuses
- asset_triggered_dag_info = {}
- for dag_id, records in by_dag.items():
+ # TODO: make it more readable (rename it or make it attrs, dataclass
or etc.)
+ asset_triggered_dag_info: dict[str, tuple[datetime, datetime]] = {}
+ for dag_id, records in adrq_by_dag.items():
times = sorted(x.created_at for x in records)
asset_triggered_dag_info[dag_id] = (times[0], times[-1])
- del by_dag
+ del adrq_by_dag
asset_triggered_dag_ids = set(asset_triggered_dag_info.keys())
if asset_triggered_dag_ids:
exclusion_list = set(
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 808543d0b50..0035a68b502 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -450,7 +450,7 @@ class DagRun(Base, LoggingMixin):
.order_by(
nulls_first(BackfillDagRun.sort_ordinal, session=session),
nulls_first(cls.last_scheduling_decision, session=session),
- cls.logical_date,
+ cls.run_after,
)
.limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
)
@@ -535,7 +535,7 @@ class DagRun(Base, LoggingMixin):
nulls_first(BackfillDagRun.sort_ordinal, session=session),
nulls_first(cls.last_scheduling_decision, session=session),
nulls_first(running_drs.c.num_running, session=session), #
many running -> lower priority
- cls.logical_date,
+ cls.run_after,
)
.limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
)
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 45574daa37e..b221b11b515 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from collections.abc import Collection, Sequence
+from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
@@ -25,7 +25,6 @@ from airflow.utils import timezone
if TYPE_CHECKING:
from pendulum import DateTime
- from airflow.models.asset import AssetEvent
from airflow.sdk.definitions.asset import BaseAsset
from airflow.timetables.base import TimeRestriction
from airflow.utils.types import DagRunType
@@ -202,27 +201,6 @@ class AssetTriggeredTimetable(_TrivialTimetable):
return DagRun.generate_run_id(run_type=run_type,
logical_date=logical_date, run_after=run_after)
- def data_interval_for_events(
- self,
- logical_date: DateTime,
- events: Collection[AssetEvent],
- ) -> DataInterval:
- if not events:
- return DataInterval(logical_date, logical_date)
-
- start_dates, end_dates = [], []
- for event in events:
- if event.source_dag_run is not None:
- start_dates.append(event.source_dag_run.data_interval_start)
- end_dates.append(event.source_dag_run.data_interval_end)
- else:
- start_dates.append(event.timestamp)
- end_dates.append(event.timestamp)
-
- start = min(start_dates)
- end = max(end_dates)
- return DataInterval(start, end)
-
def next_dagrun_info(
self,
*,
diff --git a/scripts/ci/pre_commit/template_context_key_sync.py
b/scripts/ci/pre_commit/template_context_key_sync.py
index 49d768541a1..270a3a695b2 100755
--- a/scripts/ci/pre_commit/template_context_key_sync.py
+++ b/scripts/ci/pre_commit/template_context_key_sync.py
@@ -33,7 +33,18 @@ CONTEXT_HINT = ROOT_DIR.joinpath("task_sdk", "src",
"airflow", "sdk", "definitio
TEMPLATES_REF_RST = ROOT_DIR.joinpath("docs", "apache-airflow",
"templates-ref.rst")
# These are only conditionally set
-IGNORE = {"ds", "ds_nodash", "ts", "ts_nodash", "ts_nodash_with_tz",
"logical_date"}
+IGNORE = {
+ "ds",
+ "ds_nodash",
+ "ts",
+ "ts_nodash",
+ "ts_nodash_with_tz",
+ "logical_date",
+ "data_interval_end",
+ "data_interval_start",
+ "prev_data_interval_start_success",
+ "prev_data_interval_end_success",
+}
def _iter_template_context_keys_from_original_return() -> typing.Iterator[str]:
diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
index a100452791b..3b8801ff9fc 100644
--- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -155,16 +155,8 @@ class RuntimeTaskInstance(TaskInstance):
context_from_server: Context = {
# TODO: Assess if we need to pass these through
timezone.coerce_datetime
"dag_run": dag_run, # type: ignore[typeddict-item] #
Removable after #46522
- "data_interval_end": dag_run.data_interval_end,
- "data_interval_start": dag_run.data_interval_start,
"task_instance_key_str":
f"{self.task.dag_id}__{self.task.task_id}__{dag_run.run_id}",
"task_reschedule_count":
self._ti_context_from_server.task_reschedule_count,
- "prev_data_interval_start_success": lazy_object_proxy.Proxy(
- lambda:
get_previous_dagrun_success(self.id).data_interval_start
- ),
- "prev_data_interval_end_success": lazy_object_proxy.Proxy(
- lambda:
get_previous_dagrun_success(self.id).data_interval_end
- ),
"prev_start_date_success": lazy_object_proxy.Proxy(
lambda: get_previous_dagrun_success(self.id).start_date
),
@@ -180,8 +172,10 @@ class RuntimeTaskInstance(TaskInstance):
ts = logical_date.isoformat()
ts_nodash = logical_date.strftime("%Y%m%dT%H%M%S")
ts_nodash_with_tz = ts.replace("-", "").replace(":", "")
+ # logical_date and data_interval either coexist or be None
together
context.update(
{
+ # keys that depend on logical_date
"logical_date": logical_date,
"ds": ds,
"ds_nodash": ds_nodash,
@@ -189,8 +183,18 @@ class RuntimeTaskInstance(TaskInstance):
"ts": ts,
"ts_nodash": ts_nodash,
"ts_nodash_with_tz": ts_nodash_with_tz,
+ # keys that depend on data_interval
+ "data_interval_end": dag_run.data_interval_end,
+ "data_interval_start": dag_run.data_interval_start,
+ "prev_data_interval_start_success":
lazy_object_proxy.Proxy(
+ lambda:
get_previous_dagrun_success(self.id).data_interval_start
+ ),
+ "prev_data_interval_end_success":
lazy_object_proxy.Proxy(
+ lambda:
get_previous_dagrun_success(self.id).data_interval_end
+ ),
}
)
+
if from_server.upstream_map_indexes is not None:
# We stash this in here for later use, but we purposefully
don't want to document it's
# existence. Should this be a private attribute on RuntimeTI
instead perhaps?
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 76b1ceb0e9a..24aeb44c5a7 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3880,8 +3880,8 @@ class TestSchedulerJob:
assert list(map(dict_from_obj, created_run.consumed_asset_events)) ==
list(
map(dict_from_obj, [event1, event2])
)
- assert created_run.data_interval_start == DEFAULT_DATE +
timedelta(days=5)
- assert created_run.data_interval_end == DEFAULT_DATE +
timedelta(days=11)
+ assert created_run.data_interval_start is None
+ assert created_run.data_interval_end is None
# dag2 ADRQ record should still be there since the dag run was *not*
triggered
assert
session.query(AssetDagRunQueue).filter_by(target_dag_id=dag2.dag_id).one() is
not None
# dag2 should not be triggered since it depends on both asset 1 and 2
@@ -3889,7 +3889,7 @@ class TestSchedulerJob:
# dag3 ADRQ record should be deleted since the dag run was triggered
assert
session.query(AssetDagRunQueue).filter_by(target_dag_id=dag3.dag_id).one_or_none()
is None
- assert dag3.get_last_dagrun().creating_job_id == scheduler_job.id
+ assert created_run.creating_job_id == scheduler_job.id
@pytest.mark.need_serialized_dag
@pytest.mark.parametrize(
@@ -4997,7 +4997,7 @@ class TestSchedulerJob:
dag_version = DagVersion.get_latest_version(dag.dag_id)
for i in range(16):
dr = dag_maker.create_dagrun(
- run_id=f"dr2_run_{i+1}",
+ run_id=f"dr2_run_{i + 1}",
state=State.RUNNING,
logical_date=date,
dag_version=dag_version,
@@ -5007,7 +5007,7 @@ class TestSchedulerJob:
date = dr16[0].logical_date + timedelta(hours=1)
for i in range(16, 32):
dr = dag_maker.create_dagrun(
- run_id=f"dr2_run_{i+1}",
+ run_id=f"dr2_run_{i + 1}",
state=State.QUEUED,
logical_date=date,
dag_version=dag_version,
@@ -5021,7 +5021,7 @@ class TestSchedulerJob:
dag_version = DagVersion.get_latest_version(dag.dag_id)
for i in range(16):
dr = dag_maker.create_dagrun(
- run_id=f"dr3_run_{i+1}",
+ run_id=f"dr3_run_{i + 1}",
state=State.RUNNING,
logical_date=date,
dag_version=dag_version,
@@ -5031,7 +5031,7 @@ class TestSchedulerJob:
date = dr16[0].logical_date + timedelta(hours=1)
for i in range(16, 32):
dr = dag_maker.create_dagrun(
- run_id=f"dr2_run_{i+1}",
+ run_id=f"dr2_run_{i + 1}",
state=State.QUEUED,
logical_date=date,
dag_version=dag_version,
diff --git a/tests/timetables/test_assets_timetable.py
b/tests/timetables/test_assets_timetable.py
index 0158ddfd5cf..9892b5805bd 100644
--- a/tests/timetables/test_assets_timetable.py
+++ b/tests/timetables/test_assets_timetable.py
@@ -244,24 +244,6 @@ def asset_events(mocker) -> list[AssetEvent]:
return [event_earlier, event_later]
-def test_data_interval_for_events(
- asset_timetable: AssetOrTimeSchedule, asset_events: list[AssetEvent]
-) -> None:
- """
- Tests the data_interval_for_events method of AssetOrTimeSchedule.
-
- :param asset_timetable: The AssetOrTimeSchedule instance to test.
- :param asset_events: A list of mock AssetEvent instances.
- """
- data_interval =
asset_timetable.data_interval_for_events(logical_date=DateTime.now(),
events=asset_events)
- assert data_interval.start == min(
- event.timestamp for event in asset_events
- ), "Data interval start does not match"
- assert data_interval.end == max(
- event.timestamp for event in asset_events
- ), "Data interval end does not match"
-
-
def test_run_ordering_inheritance(asset_timetable: AssetOrTimeSchedule) ->
None:
"""
Tests that AssetOrTimeSchedule inherits run_ordering from its parent class
correctly.
diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py
index 76ddb082283..0766e33b183 100644
--- a/tests_common/pytest_plugin.py
+++ b/tests_common/pytest_plugin.py
@@ -893,6 +893,8 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.types import DagRunTriggeredByType
+ else:
+ DagRunType.ASSET_TRIGGERED = DagRunType.DATASET_TRIGGERED
if "execution_date" in kwargs:
raise TypeError("use logical_date instead")
@@ -947,7 +949,7 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
kwargs.setdefault("triggered_by", DagRunTriggeredByType.TEST)
kwargs["logical_date"] = logical_date
kwargs.setdefault("dag_version", None)
- kwargs.setdefault("run_after", data_interval[-1])
+ kwargs.setdefault("run_after", data_interval[-1] if
data_interval else timezone.utcnow())
else:
kwargs.pop("dag_version", None)
kwargs.pop("triggered_by", None)