This is an automated email from the ASF dual-hosted git repository.
phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 035060d7f38 AIP-83 amendment: Add logic for generating run_id when
logical date is None. (#46616)
035060d7f38 is described below
commit 035060d7f384a4989eddb6fb05f512f9c6a7e5bf
Author: Ankit Chaurasia <[email protected]>
AuthorDate: Tue Feb 11 17:25:07 2025 +0545
AIP-83 amendment: Add logic for generating run_id when logical date is
None. (#46616)
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/api/common/trigger_dag.py | 51 ++++++++-------
.../api_connexion/endpoints/dag_run_endpoint.py | 9 ++-
airflow/api_connexion/schemas/dag_run_schema.py | 21 ++++---
airflow/api_fastapi/core_api/datamodels/dag_run.py | 8 ++-
.../api_fastapi/core_api/openapi/v1-generated.yaml | 16 +++--
.../api_fastapi/core_api/routes/public/dag_run.py | 21 +++++--
airflow/jobs/scheduler_job_runner.py | 8 +--
airflow/models/backfill.py | 8 +--
airflow/models/baseoperator.py | 6 +-
airflow/models/dag.py | 26 +++++---
airflow/models/dagrun.py | 17 ++++-
airflow/timetables/base.py | 4 +-
airflow/timetables/simple.py | 16 +++--
airflow/ui/openapi-gen/requests/schemas.gen.ts | 17 +++--
airflow/ui/openapi-gen/requests/types.gen.ts | 3 +-
airflow/utils/types.py | 11 ++--
airflow/www/views.py | 26 ++++++--
docker_tests/test_docker_compose_quick_start.py | 6 +-
kubernetes_tests/test_base.py | 7 ++-
kubernetes_tests/test_kubernetes_pod_operator.py | 27 ++++++--
.../cncf/kubernetes/operators/test_job.py | 21 +++++--
.../cncf/kubernetes/operators/test_pod.py | 4 +-
.../kubernetes/operators/test_spark_kubernetes.py | 4 +-
.../api_endpoints/test_dag_run_endpoint.py | 2 +
.../api_endpoints/test_xcom_endpoint.py | 14 ++++-
.../microsoft/azure/operators/test_data_factory.py | 4 +-
.../microsoft/azure/sensors/test_wasb.py | 8 ++-
.../snowflake/operators/test_snowflake.py | 4 +-
.../providers/standard/operators/trigger_dagrun.py | 6 +-
.../endpoints/test_dag_run_endpoint.py | 73 +++++++++++++++++-----
.../api_connexion/endpoints/test_xcom_endpoint.py | 70 +++++++++++++++++----
tests/api_connexion/schemas/test_dag_run_schema.py | 19 +++---
.../core_api/routes/public/test_xcom.py | 5 +-
tests/jobs/test_scheduler_job.py | 7 ++-
tests/models/test_dag.py | 4 +-
tests/models/test_dagrun.py | 8 +--
tests/models/test_taskinstance.py | 7 ++-
tests/models/test_xcom.py | 4 +-
tests/operators/test_trigger_dagrun.py | 8 ++-
tests/sensors/test_external_task_sensor.py | 4 +-
tests/timetables/test_assets_timetable.py | 6 +-
tests/utils/test_state.py | 2 +-
tests_common/pytest_plugin.py | 17 +++--
43 files changed, 441 insertions(+), 168 deletions(-)
diff --git a/airflow/api/common/trigger_dag.py
b/airflow/api/common/trigger_dag.py
index 08ee1726ccc..8a90438af70 100644
--- a/airflow/api/common/trigger_dag.py
+++ b/airflow/api/common/trigger_dag.py
@@ -42,6 +42,7 @@ def _trigger_dag(
dag_bag: DagBag,
*,
triggered_by: DagRunTriggeredByType,
+ run_after: datetime,
run_id: str | None = None,
conf: dict | str | None = None,
logical_date: datetime | None = None,
@@ -54,6 +55,7 @@ def _trigger_dag(
:param dag_id: DAG ID
:param dag_bag: DAG Bag model
:param triggered_by: the entity which triggers the dag_run
+ :param run_after: the datetime before which dag cannot run.
:param run_id: ID of the run
:param conf: configuration
:param logical_date: logical date of the run
@@ -65,26 +67,30 @@ def _trigger_dag(
if dag is None or dag_id not in dag_bag.dags:
raise DagNotFound(f"Dag id {dag_id} not found")
- logical_date = logical_date or timezone.utcnow()
-
- if not timezone.is_localized(logical_date):
- raise ValueError("The logical date should be localized")
-
- if replace_microseconds:
- logical_date = logical_date.replace(microsecond=0)
-
- if dag.default_args and "start_date" in dag.default_args:
- min_dag_start_date = dag.default_args["start_date"]
- if min_dag_start_date and logical_date < min_dag_start_date:
- raise ValueError(
- f"Logical date [{logical_date.isoformat()}] should be >=
start_date "
- f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
- )
- coerced_logical_date = timezone.coerce_datetime(logical_date)
-
- data_interval =
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
- run_id = run_id or dag.timetable.generate_run_id(
- run_type=DagRunType.MANUAL, logical_date=coerced_logical_date,
data_interval=data_interval
+ if logical_date:
+ if not timezone.is_localized(logical_date):
+ raise ValueError("The logical date should be localized")
+
+ if replace_microseconds:
+ logical_date = logical_date.replace(microsecond=0)
+
+ if dag.default_args and "start_date" in dag.default_args:
+ min_dag_start_date = dag.default_args["start_date"]
+ if min_dag_start_date and logical_date < min_dag_start_date:
+ raise ValueError(
+ f"Logical date [{logical_date.isoformat()}] should be >=
start_date "
+ f"[{min_dag_start_date.isoformat()}] from DAG's
default_args"
+ )
+ coerced_logical_date = timezone.coerce_datetime(logical_date)
+ data_interval =
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
+ else:
+ coerced_logical_date = None
+ data_interval = None
+
+ run_id = run_id or DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL,
+ logical_date=coerced_logical_date,
+ run_after=timezone.coerce_datetime(run_after),
)
# This intentionally does not use 'session' in the current scope because it
@@ -102,7 +108,7 @@ def _trigger_dag(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
- run_after=data_interval.end,
+ run_after=run_after,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
@@ -120,6 +126,7 @@ def trigger_dag(
dag_id: str,
*,
triggered_by: DagRunTriggeredByType,
+ run_after: datetime | None = None,
run_id: str | None = None,
conf: dict | str | None = None,
logical_date: datetime | None = None,
@@ -131,6 +138,7 @@ def trigger_dag(
:param dag_id: DAG ID
:param triggered_by: the entity which triggers the dag_run
+ :param run_after: the datetime before which dag won't run.
:param run_id: ID of the dag_run
:param conf: configuration
:param logical_date: date of execution
@@ -147,6 +155,7 @@ def trigger_dag(
dag_id=dag_id,
dag_bag=dagbag,
run_id=run_id,
+ run_after=run_after or timezone.utcnow(),
conf=conf,
logical_date=logical_date,
replace_microseconds=replace_microseconds,
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 28e85cfe61a..cddf67261d5 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -327,7 +327,8 @@ def post_dag_run(*, dag_id: str, session: Session =
NEW_SESSION) -> APIResponse:
except ValidationError as err:
raise BadRequest(detail=str(err))
- logical_date = pendulum.instance(post_body["logical_date"])
+ logical_date = pendulum.instance(post_body["logical_date"]) if
post_body.get("logical_date") else None
+ run_after = pendulum.instance(post_body["run_after"])
run_id = post_body["run_id"]
dagrun_instance = session.scalar(
select(DagRun)
@@ -352,12 +353,14 @@ def post_dag_run(*, dag_id: str, session: Session =
NEW_SESSION) -> APIResponse:
end=pendulum.instance(data_interval_end),
)
else:
- data_interval =
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+ data_interval = (
+
dag.timetable.infer_manual_data_interval(run_after=logical_date) if
logical_date else None
+ )
dag_run = dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
- run_after=data_interval.end,
+ run_after=run_after,
conf=post_body.get("conf"),
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
diff --git a/airflow/api_connexion/schemas/dag_run_schema.py
b/airflow/api_connexion/schemas/dag_run_schema.py
index c2560613def..8b3a8df54c8 100644
--- a/airflow/api_connexion/schemas/dag_run_schema.py
+++ b/airflow/api_connexion/schemas/dag_run_schema.py
@@ -63,7 +63,8 @@ class DAGRunSchema(SQLAlchemySchema):
run_id = auto_field(data_key="dag_run_id")
dag_id = auto_field(dump_only=True)
- logical_date = auto_field(data_key="logical_date",
validate=validate_istimezone)
+ logical_date = auto_field(data_key="logical_date", allow_none=True,
validate=validate_istimezone)
+ run_after = auto_field(data_key="run_after", validate=validate_istimezone)
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = DagStateField(dump_only=True)
@@ -78,17 +79,23 @@ class DAGRunSchema(SQLAlchemySchema):
@pre_load
def autogenerate(self, data, **kwargs):
- """Auto generate run_id and logical_date if they are not provided."""
- logical_date = data.get("logical_date", _MISSING)
+ """Auto generate run_id and run_after if they are not provided."""
+ run_after = data.get("run_after", _MISSING)
- # Auto-generate logical_date if missing
- if logical_date is _MISSING:
- data["logical_date"] = str(timezone.utcnow())
+ # Auto-generate run_after if missing
+ if run_after is _MISSING:
+ data["run_after"] = str(timezone.utcnow())
if "dag_run_id" not in data:
try:
+ if logical_date_str := data.get("logical_date"):
+ logical_date = timezone.parse(logical_date_str)
+ else:
+ logical_date = None
data["dag_run_id"] = DagRun.generate_run_id(
- DagRunType.MANUAL, timezone.parse(data["logical_date"])
+ run_type=DagRunType.MANUAL,
+ logical_date=logical_date,
+ run_after=timezone.parse(data["run_after"]),
)
except (ParserError, TypeError) as err:
raise BadRequest("Incorrect datetime argument",
detail=str(err))
diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 18be129a195..295eedca424 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -20,11 +20,11 @@ from __future__ import annotations
from datetime import datetime
from enum import Enum
-import pendulum
from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.models import DagRun
+from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -82,9 +82,11 @@ class TriggerDAGRunPostBody(StrictBaseModel):
"""Trigger DAG Run Serializer for POST body."""
dag_run_id: str | None = None
- logical_date: AwareDatetime | None
data_interval_start: AwareDatetime | None = None
data_interval_end: AwareDatetime | None = None
+ logical_date: AwareDatetime | None
+ run_after: datetime = Field(default_factory=timezone.utcnow)
+
conf: dict = Field(default_factory=dict)
note: str | None = None
@@ -102,7 +104,7 @@ class TriggerDAGRunPostBody(StrictBaseModel):
def validate_dag_run_id(self):
if not self.dag_run_id:
self.dag_run_id = DagRun.generate_run_id(
- DagRunType.MANUAL, self.logical_date or pendulum.now("UTC")
+ run_type=DagRunType.MANUAL, logical_date=self.logical_date,
run_after=self.run_after
)
return self
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 6d2223e6c25..ade0da25b5d 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -10501,12 +10501,6 @@ components:
- type: string
- type: 'null'
title: Dag Run Id
- logical_date:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Logical Date
data_interval_start:
anyOf:
- type: string
@@ -10519,6 +10513,16 @@ components:
format: date-time
- type: 'null'
title: Data Interval End
+ logical_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Logical Date
+ run_after:
+ type: string
+ format: date-time
+ title: Run After
conf:
type: object
title: Conf
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 19bf526efc9..afce667e8cf 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -347,7 +347,6 @@ def trigger_dag_run(
) -> DAGRunResponse:
"""Trigger a DAG."""
dm = session.scalar(select(DagModel).where(DagModel.is_active,
DagModel.dag_id == dag_id).limit(1))
- now = pendulum.now("UTC")
if not dm:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id:
'{dag_id}' not found")
@@ -359,6 +358,7 @@ def trigger_dag_run(
logical_date = timezone.coerce_datetime(body.logical_date)
coerced_logical_date = timezone.coerce_datetime(logical_date)
+ run_after = timezone.coerce_datetime(body.run_after)
try:
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
@@ -369,13 +369,26 @@ def trigger_dag_run(
end=pendulum.instance(body.data_interval_end),
)
else:
- data_interval =
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date or now)
+ if body.logical_date:
+ data_interval = dag.timetable.infer_manual_data_interval(
+ run_after=coerced_logical_date or run_after
+ )
+ run_after = data_interval.end
+ else:
+ data_interval = None
+
+ if body.dag_run_id:
+ run_id = body.dag_run_id
+ else:
+ run_id = DagRun.generate_run_id(
+ run_type=DagRunType.SCHEDULED,
logical_date=coerced_logical_date, run_after=run_after
+ )
dag_run = dag.create_dagrun(
- run_id=cast(str, body.dag_run_id),
+ run_id=run_id,
logical_date=coerced_logical_date,
data_interval=data_interval,
- run_after=data_interval.end,
+ run_after=run_after,
conf=body.conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index df1cb5ab529..f340893e5ae 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1286,7 +1286,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
- logical_date=dag_model.next_dagrun,
+ run_after=dag_model.next_dagrun,
data_interval=data_interval,
),
logical_date=dag_model.next_dagrun,
@@ -1394,12 +1394,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
data_interval =
dag.timetable.data_interval_for_events(logical_date, asset_events)
dag_run = dag.create_dagrun(
- run_id=dag.timetable.generate_run_id(
+ run_id=DagRun.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED,
logical_date=logical_date,
- data_interval=data_interval,
- session=session,
- events=asset_events,
+ run_after=max(logical_dates.values()),
),
logical_date=logical_date,
data_interval=data_interval,
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 735f09f3a3f..b4b15a6e613 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -286,6 +286,8 @@ def _create_backfill_dag_run(
backfill_sort_ordinal,
session,
):
+ from airflow.models.dagrun import DagRun
+
with session.begin_nested():
should_skip_create_backfill = should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal,
session
@@ -296,10 +298,8 @@ def _create_backfill_dag_run(
dag_version = DagVersion.get_latest_version(dag.dag_id,
session=session)
try:
dr = dag.create_dagrun(
- run_id=dag.timetable.generate_run_id(
- run_type=DagRunType.BACKFILL_JOB,
- logical_date=info.logical_date,
- data_interval=info.data_interval,
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.BACKFILL_JOB,
logical_date=info.logical_date, run_after=info.run_after
),
logical_date=info.logical_date,
data_interval=info.data_interval,
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 3361fe33df6..ea6a7c34b11 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -628,7 +628,11 @@ class BaseOperator(TaskSDKBaseOperator, AbstractOperator,
metaclass=BaseOperator
# This is _mostly_ only used in tests
dr = DagRun(
dag_id=self.dag_id,
- run_id=DagRun.generate_run_id(DagRunType.MANUAL,
info.logical_date),
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL,
+ logical_date=info.logical_date,
+ run_after=info.run_after,
+ ),
run_type=DagRunType.MANUAL,
logical_date=info.logical_date,
data_interval=info.data_interval,
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7ebbb48b4b4..111852f2728 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1602,6 +1602,7 @@ class DAG(TaskSDKDag, LoggingMixin):
@provide_session
def test(
self,
+ run_after: datetime | None = None,
logical_date: datetime | None = None,
run_conf: dict[str, Any] | None = None,
conn_file_path: str | None = None,
@@ -1613,6 +1614,7 @@ class DAG(TaskSDKDag, LoggingMixin):
"""
Execute one single DagRun for a given DAG and logical date.
+ :param run_after: the datetime before which to Dag won't run.
:param logical_date: logical date for the DAG run
:param run_conf: configuration to pass to newly created dagrun
:param conn_file_path: file path to a connection file in either yaml
or json
@@ -1652,7 +1654,6 @@ class DAG(TaskSDKDag, LoggingMixin):
exit_stack.callback(lambda: secrets_backend_list.pop(0))
with exit_stack:
- logical_date = logical_date or timezone.utcnow()
self.validate()
self.log.debug("Clearing existing task instances for logical date
%s", logical_date)
self.clear(
@@ -1663,16 +1664,23 @@ class DAG(TaskSDKDag, LoggingMixin):
)
self.log.debug("Getting dagrun for dag %s", self.dag_id)
logical_date = timezone.coerce_datetime(logical_date)
- data_interval =
self.timetable.infer_manual_data_interval(run_after=logical_date)
+ run_after = timezone.coerce_datetime(run_after) or
timezone.coerce_datetime(timezone.utcnow())
+ data_interval = (
+
self.timetable.infer_manual_data_interval(run_after=logical_date) if
logical_date else None
+ )
scheduler_dag =
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self))
dr: DagRun = _get_or_create_dagrun(
dag=scheduler_dag,
- start_date=logical_date,
+ start_date=logical_date or run_after,
logical_date=logical_date,
data_interval=data_interval,
- run_after=data_interval.end,
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+ run_after=run_after,
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL,
+ logical_date=logical_date,
+ run_after=run_after,
+ ),
session=session,
conf=run_conf,
triggered_by=DagRunTriggeredByType.TEST,
@@ -1756,8 +1764,8 @@ class DAG(TaskSDKDag, LoggingMixin):
self,
*,
run_id: str,
- logical_date: datetime | None,
- data_interval: tuple[datetime, datetime],
+ logical_date: datetime | None = None,
+ data_interval: tuple[datetime, datetime] | None = None,
run_after: datetime,
conf: dict | None = None,
run_type: DagRunType,
@@ -2485,8 +2493,8 @@ def _get_or_create_dagrun(
*,
dag: DAG,
run_id: str,
- logical_date: datetime,
- data_interval: tuple[datetime, datetime],
+ logical_date: datetime | None,
+ data_interval: tuple[datetime, datetime] | None,
run_after: datetime,
conf: dict | None,
triggered_by: DagRunTriggeredByType,
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index d9bf14c3f98..199762d69ef 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -78,6 +78,7 @@ from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks
from airflow.utils.state import DagRunState, State, TaskInstanceState
+from airflow.utils.strings import get_random_string
from airflow.utils.types import NOTSET, DagRunTriggeredByType, DagRunType
if TYPE_CHECKING:
@@ -621,10 +622,20 @@ class DagRun(Base, LoggingMixin):
return session.scalars(select(cls).where(cls.dag_id == dag_id,
cls.run_id == run_id)).one_or_none()
@staticmethod
- def generate_run_id(run_type: DagRunType, logical_date: datetime) -> str:
- """Generate Run ID based on Run Type and logical Date."""
+ def generate_run_id(
+ *, run_type: DagRunType, logical_date: datetime | None = None,
run_after: datetime
+ ) -> str:
+ """
+ Generate Run ID based on Run Type, run_after and logical Date.
+
+ :param run_type: type of DagRun
+ :param logical_date: the logical date
+ :param run_after: the date before which dag run won't start.
+ """
# _Ensure_ run_type is a DagRunType, not just a string from user code
- return DagRunType(run_type).generate_run_id(logical_date)
+ if logical_date:
+ return
DagRunType(run_type).generate_run_id(suffix=run_after.isoformat())
+ return
DagRunType(run_type).generate_run_id(suffix=f"{run_after.isoformat()}_{get_random_string()}")
@staticmethod
@provide_session
diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py
index da81090ff9a..d5d863c2984 100644
--- a/airflow/timetables/base.py
+++ b/airflow/timetables/base.py
@@ -281,8 +281,8 @@ class Timetable(Protocol):
self,
*,
run_type: DagRunType,
- logical_date: DateTime,
+ run_after: DateTime,
data_interval: DataInterval | None,
**extra,
) -> str:
- return run_type.generate_run_id(logical_date)
+ return run_type.generate_run_id(suffix=run_after.isoformat())
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 20e8085fe0d..45574daa37e 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -24,7 +24,6 @@ from airflow.utils import timezone
if TYPE_CHECKING:
from pendulum import DateTime
- from sqlalchemy import Session
from airflow.models.asset import AssetEvent
from airflow.sdk.definitions.asset import BaseAsset
@@ -186,15 +185,22 @@ class AssetTriggeredTimetable(_TrivialTimetable):
self,
*,
run_type: DagRunType,
- logical_date: DateTime,
data_interval: DataInterval | None,
- session: Session | None = None,
- events: Collection[AssetEvent] | None = None,
+ run_after: DateTime,
**extra,
) -> str:
+ """
+ Generate Run ID based on Run Type, run_after and logical Date.
+
+ :param run_type: type of DagRun
+ :param data_interval: the data interval
+ :param run_after: the date before which dag run won't start.
+ """
from airflow.models.dagrun import DagRun
- return DagRun.generate_run_id(run_type, logical_date)
+ logical_date = data_interval.start if data_interval is not None else
run_after
+
+ return DagRun.generate_run_id(run_type=run_type,
logical_date=logical_date, run_after=run_after)
def data_interval_for_events(
self,
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 3e69736e87d..7d307ac5217 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -5816,7 +5816,7 @@ export const $TriggerDAGRunPostBody = {
],
title: "Dag Run Id",
},
- logical_date: {
+ data_interval_start: {
anyOf: [
{
type: "string",
@@ -5826,9 +5826,9 @@ export const $TriggerDAGRunPostBody = {
type: "null",
},
],
- title: "Logical Date",
+ title: "Data Interval Start",
},
- data_interval_start: {
+ data_interval_end: {
anyOf: [
{
type: "string",
@@ -5838,9 +5838,9 @@ export const $TriggerDAGRunPostBody = {
type: "null",
},
],
- title: "Data Interval Start",
+ title: "Data Interval End",
},
- data_interval_end: {
+ logical_date: {
anyOf: [
{
type: "string",
@@ -5850,7 +5850,12 @@ export const $TriggerDAGRunPostBody = {
type: "null",
},
],
- title: "Data Interval End",
+ title: "Logical Date",
+ },
+ run_after: {
+ type: "string",
+ format: "date-time",
+ title: "Run After",
},
conf: {
type: "object",
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 9a2ed62122a..c3bff10bcd6 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1416,9 +1416,10 @@ export type TimeDelta = {
*/
export type TriggerDAGRunPostBody = {
dag_run_id?: string | null;
- logical_date: string | null;
data_interval_start?: string | null;
data_interval_end?: string | null;
+ logical_date: string | null;
+ run_after?: string;
conf?: {
[key: string]: unknown;
};
diff --git a/airflow/utils/types.py b/airflow/utils/types.py
index 46f295c4ee2..28822919e38 100644
--- a/airflow/utils/types.py
+++ b/airflow/utils/types.py
@@ -22,8 +22,6 @@ from typing import TYPE_CHECKING, TypedDict
import airflow.sdk.definitions._internal.types
if TYPE_CHECKING:
- from datetime import datetime
-
from airflow.typing_compat import TypeAlias
ArgNotSet: TypeAlias = airflow.sdk.definitions._internal.types.ArgNotSet
@@ -42,8 +40,13 @@ class DagRunType(str, enum.Enum):
def __str__(self) -> str:
return self.value
- def generate_run_id(self, logical_date: datetime) -> str:
- return f"{self}__{logical_date.isoformat()}"
+ def generate_run_id(self, *, suffix: str) -> str:
+ """
+ Generate a string for DagRun based on suffix string.
+
+ :param suffix: Generate run_id from suffix.
+ """
+ return f"{self}__{suffix}"
@staticmethod
def from_run_id(run_id: str) -> DagRunType:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f99dcd9161f..b3fae31fefe 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2032,7 +2032,8 @@ class Airflow(AirflowBaseView):
origin = get_safe_url(request.values.get("origin"))
unpause = request.values.get("unpause")
request_conf = request.values.get("conf")
- request_logical_date = request.values.get("logical_date",
default=timezone.utcnow().isoformat())
+ request_logical_date = request.values.get("logical_date")
+ request_run_after = request.values.get("run_after",
default=timezone.utcnow().isoformat())
is_dag_run_conf_overrides_params = conf.getboolean("core",
"dag_run_conf_overrides_params")
dag = get_airflow_app().dag_bag.get_dag(dag_id)
dag_orm: DagModel =
session.scalar(select(DagModel).where(DagModel.dag_id == dag_id).limit(1))
@@ -2148,7 +2149,7 @@ class Airflow(AirflowBaseView):
)
try:
- logical_date = timezone.parse(request_logical_date, strict=True)
+ logical_date = timezone.parse(request_logical_date, strict=True)
if request_logical_date else None
except ParserError:
flash("Invalid logical date", "error")
form = DateTimeForm(data={"logical_date":
timezone.utcnow().isoformat()})
@@ -2160,6 +2161,19 @@ class Airflow(AirflowBaseView):
form=form,
)
+ try:
+ run_after = timezone.parse(request_run_after, strict=True)
+ except ParserError:
+ flash("Invalid run_after", "error")
+ form = DateTimeForm(data={"run_after":
timezone.utcnow().isoformat()})
+ return self.render_template(
+ "airflow/trigger.html",
+ form_fields=form_fields,
+ **render_params,
+ conf=request_conf or {},
+ form=form,
+ )
+
dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id,
session=session)
if dr:
if dr.run_id == run_id:
@@ -2224,12 +2238,12 @@ class Airflow(AirflowBaseView):
"warning",
)
- data_interval =
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+ data_interval =
dag.timetable.infer_manual_data_interval(run_after=logical_date or run_after)
if not run_id:
- run_id = dag.timetable.generate_run_id(
- logical_date=logical_date,
- data_interval=data_interval,
+ run_id = DagRun.generate_run_id(
run_type=DagRunType.MANUAL,
+ logical_date=logical_date,
+ run_after=run_after,
)
try:
diff --git a/docker_tests/test_docker_compose_quick_start.py
b/docker_tests/test_docker_compose_quick_start.py
index 4b6335facff..15f346e57e3 100644
--- a/docker_tests/test_docker_compose_quick_start.py
+++ b/docker_tests/test_docker_compose_quick_start.py
@@ -101,7 +101,11 @@ def
test_trigger_dag_and_wait_for_result(default_docker_image, tmp_path_factory,
compose.execute(service="airflow-scheduler", command=["airflow",
"scheduler", "-n", "50"])
api_request("PATCH", path=f"dags/{DAG_ID}", json={"is_paused": False})
- api_request("POST", path=f"dags/{DAG_ID}/dagRuns", json={"dag_run_id":
DAG_RUN_ID})
+ api_request(
+ "POST",
+ path=f"dags/{DAG_ID}/dagRuns",
+ json={"dag_run_id": DAG_RUN_ID, "logical_date":
"2020-06-11T18:00:00+00:00"},
+ )
wait_for_terminal_dag_state(dag_id=DAG_ID, dag_run_id=DAG_RUN_ID)
dag_state = api_request("GET",
f"dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}").get("state")
diff --git a/kubernetes_tests/test_base.py b/kubernetes_tests/test_base.py
index cadd8954cfb..2e0a96620a1 100644
--- a/kubernetes_tests/test_base.py
+++ b/kubernetes_tests/test_base.py
@@ -264,8 +264,10 @@ class BaseK8STest:
assert result.status_code == 200, f"Could not enable DAG:
{result_json}"
post_string = f"http://{host}/api/v1/dags/{dag_id}/dagRuns"
print(f"Calling [start_dag]#2 {post_string}")
+
+ logical_date = datetime.now(timezone.utc).isoformat()
# Trigger a new dagrun
- result = self.session.post(post_string, json={})
+ result = self.session.post(post_string, json={"logical_date":
logical_date})
try:
result_json = result.json()
except ValueError:
@@ -292,7 +294,8 @@ class BaseK8STest:
for dag_run in dag_runs:
if dag_run["dag_id"] == dag_id:
logical_date = dag_run["logical_date"]
+ run_after = dag_run["run_after"]
dag_run_id = dag_run["dag_run_id"]
break
- assert logical_date is not None, f"No logical_date can be found for
the dag with {dag_id}"
+ assert run_after is not None, f"No run_after can be found for the dag
with {dag_id}"
return dag_run_id, logical_date
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 9e4cd9e4372..e5637b5a9db 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -50,13 +50,30 @@ POD_MANAGER_CLASS =
"airflow.providers.cncf.kubernetes.utils.pod_manager.PodMana
def create_context(task) -> Context:
+ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
dag = DAG(dag_id="dag", schedule=None)
logical_date = timezone.datetime(2016, 1, 1, 1, 0, 0,
tzinfo=timezone.parse_timezone("Europe/Amsterdam"))
- dag_run = DagRun(
- dag_id=dag.dag_id,
- logical_date=logical_date,
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
- )
+
+ if AIRFLOW_V_3_0_PLUS:
+ dag_run = DagRun(
+ dag_id=dag.dag_id,
+ logical_date=logical_date,
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL,
+ logical_date=logical_date,
+ run_after=logical_date,
+ ),
+ )
+ else:
+ dag_run = DagRun(
+ dag_id=dag.dag_id,
+ logical_date=logical_date,
+ run_id=DagRun.generate_run_id( # type: ignore[call-arg]
+ run_type=DagRunType.MANUAL,
+ logical_date=logical_date,
+ ),
+ )
task_instance = TaskInstance(task=task)
task_instance.dag_run = dag_run
task_instance.dag_id = dag.dag_id
diff --git
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py
index 5342363f093..691a5496961 100644
---
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py
+++
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py
@@ -37,6 +37,8 @@ from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0)
JOB_OPERATORS_PATH = "airflow.providers.cncf.kubernetes.operators.job.{}"
HOOK_CLASS = JOB_OPERATORS_PATH.format("KubernetesHook")
@@ -57,11 +59,20 @@ def create_context(task, persist_to_db=False,
map_index=None):
else:
dag = DAG(dag_id="dag", schedule=None, start_date=pendulum.now())
dag.add_task(task)
- dag_run = DagRun(
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
- run_type=DagRunType.MANUAL,
- dag_id=dag.dag_id,
- )
+ if AIRFLOW_V_3_0_PLUS:
+ dag_run = DagRun(
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE,
run_after=DEFAULT_DATE
+ ),
+ run_type=DagRunType.MANUAL,
+ dag_id=dag.dag_id,
+ )
+ else:
+ dag_run = DagRun(
+ run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
+ run_type=DagRunType.MANUAL,
+ dag_id=dag.dag_id,
+ )
task_instance = TaskInstance(task=task, run_id=dag_run.run_id)
task_instance.dag_run = dag_run
if map_index is not None:
diff --git
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py
index 309d2abe642..f447651585e 100644
---
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py
+++
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py
@@ -100,7 +100,9 @@ def create_context(task, persist_to_db=False,
map_index=None):
now = timezone.utcnow()
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE,
run_after=DEFAULT_DATE
+ ),
run_type=DagRunType.MANUAL,
dag_id=dag.dag_id,
logical_date=now,
diff --git
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py
index c6814222c47..b7172f87346 100644
---
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py
+++
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -177,7 +177,9 @@ def create_context(task):
dag_run = DagRun(
dag_id=dag.dag_id,
logical_date=logical_date,
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date,
run_after=logical_date
+ ),
)
else:
dag_run = DagRun(
diff --git
a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
index 02057310907..26e607c8c71 100644
---
a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
+++
b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
@@ -208,6 +208,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time,
+ "run_after": self.default_time,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
@@ -224,6 +225,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time_2,
+ "run_after": self.default_time_2,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
diff --git
a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
index 852c4288bbe..9f4c93549ce 100644
---
a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
+++
b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
@@ -128,12 +128,22 @@ class TestGetXComEntries(TestXComEndpoint):
task_id_1 = "test-task-id-1"
logical_date = "2005-04-02T00:00:00+00:00"
logical_date_parsed = timezone.parse(logical_date)
- dag_run_id_1 = DagRun.generate_run_id(DagRunType.MANUAL,
logical_date_parsed)
+ run_after = "2005-04-02T00:00:00+00:00"
+ run_after_parsed = timezone.parse(run_after)
+ dag_run_id_1 = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL,
+ logical_date=logical_date_parsed,
+ run_after=run_after_parsed,
+ )
self._create_xcom_entries(dag_id_1, dag_run_id_1, logical_date_parsed,
task_id_1)
dag_id_2 = "test-dag-id-2"
task_id_2 = "test-task-id-2"
- run_id_2 = DagRun.generate_run_id(DagRunType.MANUAL,
logical_date_parsed)
+ run_id_2 = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL,
+ logical_date=logical_date_parsed,
+ run_after=run_after_parsed,
+ )
self._create_xcom_entries(dag_id_2, run_id_2, logical_date_parsed,
task_id_2)
self._create_invalid_xcom_entries(logical_date_parsed)
response = self.client.get(
diff --git
a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py
b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py
index 9eb8a682656..1a5721e9d08 100644
---
a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py
+++
b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py
@@ -327,7 +327,9 @@ class TestAzureDataFactoryRunPipelineOperatorWithDeferrable:
dag_run = DagRun(
dag_id=dag.dag_id,
logical_date=logical_date,
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date,
run_after=logical_date
+ ),
)
else:
dag_run = DagRun(
diff --git
a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py
b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py
index e05f9454c95..f89b2319d11 100644
---
a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py
+++
b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py
@@ -119,7 +119,9 @@ class TestWasbBlobAsyncSensor:
dag_run = DagRun(
dag_id=dag.dag_id,
logical_date=logical_date,
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date,
run_after=logical_date
+ ),
)
task_instance = TaskInstance(task=task)
@@ -262,7 +264,9 @@ class TestWasbPrefixAsyncSensor:
dag_run = DagRun(
dag_id=dag.dag_id,
logical_date=logical_date,
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date,
run_after=logical_date
+ ),
)
task_instance = TaskInstance(task=task)
diff --git
a/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py
b/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py
index 43795f22ace..be66ff67ca8 100644
---
a/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py
+++
b/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py
@@ -183,7 +183,9 @@ def create_context(task, dag=None):
dag_run = DagRun(
dag_id=dag.dag_id,
logical_date=logical_date,
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date,
run_after=logical_date
+ ),
)
else:
dag_run = DagRun(
diff --git
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
index 053a6011b63..b5ac483d1f2 100644
---
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
+++
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -192,7 +192,11 @@ class TriggerDagRunOperator(BaseOperator):
if self.trigger_run_id:
run_id = str(self.trigger_run_id)
else:
- run_id = DagRun.generate_run_id(DagRunType.MANUAL,
parsed_logical_date)
+ run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL,
+ logical_date=parsed_logical_date,
+ run_after=parsed_logical_date,
+ )
try:
dag_run = trigger_dag(
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 3ce4019c824..94028976e15 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -109,6 +109,7 @@ class TestDagRunEndpoint:
run_id=f"TEST_DAG_RUN_ID_{i}",
run_type=DagRunType.MANUAL,
logical_date=timezone.parse(self.default_time) +
timedelta(days=i - 1),
+ run_after=timezone.parse(self.default_time) + timedelta(days=i
- 1),
start_date=timezone.parse(self.default_time),
external_trigger=True,
state=state,
@@ -126,6 +127,7 @@ class TestDagRunEndpoint:
run_id=f"TEST_DAG_RUN_ID_{i}",
run_type=DagRunType.MANUAL,
logical_date=timezone.parse(self.default_time_2),
+ run_after=timezone.parse(self.default_time_2),
start_date=timezone.parse(self.default_time),
external_trigger=True,
state=state,
@@ -190,6 +192,7 @@ class TestGetDagRun(TestDagRunEndpoint):
run_id="TEST_DAG_RUN_ID",
run_type=DagRunType.MANUAL,
logical_date=timezone.parse(self.default_time),
+ run_after=timezone.parse(self.default_time),
start_date=timezone.parse(self.default_time),
external_trigger=True,
state="running",
@@ -201,6 +204,7 @@ class TestGetDagRun(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time,
+ "run_after": self.default_time,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
@@ -240,6 +244,7 @@ class TestGetDagRun(TestDagRunEndpoint):
run_id="TEST_DAG_RUN_ID",
run_type=DagRunType.MANUAL,
logical_date=timezone.parse(self.default_time),
+ run_after=timezone.parse(self.default_time),
start_date=timezone.parse(self.default_time),
external_trigger=True,
)
@@ -263,6 +268,7 @@ class TestGetDagRun(TestDagRunEndpoint):
run_id="TEST_DAG_RUN_ID",
run_type=DagRunType.MANUAL,
logical_date=timezone.parse(self.default_time),
+ run_after=timezone.parse(self.default_time),
start_date=timezone.parse(self.default_time),
external_trigger=True,
state="running",
@@ -287,6 +293,7 @@ class TestGetDagRun(TestDagRunEndpoint):
run_id="TEST_DAG_RUN_ID",
run_type=DagRunType.MANUAL,
logical_date=timezone.parse(self.default_time),
+ run_after=timezone.parse(self.default_time),
start_date=timezone.parse(self.default_time),
external_trigger=True,
state="running",
@@ -312,6 +319,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time,
+ "run_after": self.default_time,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
@@ -328,6 +336,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time_2,
+ "run_after": self.default_time_2,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
@@ -382,6 +391,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time_2,
+ "run_after": self.default_time_2,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
@@ -398,6 +408,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time,
+ "run_after": self.default_time,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
@@ -550,6 +561,7 @@ class TestGetDagRunsPagination(TestDagRunEndpoint):
run_id=f"TEST_DAG_RUN_ID{i}",
run_type=DagRunType.MANUAL,
logical_date=timezone.parse(self.default_time) +
timedelta(minutes=i),
+ run_after=timezone.parse(self.default_time) +
timedelta(minutes=i),
start_date=timezone.parse(self.default_time),
external_trigger=True,
)
@@ -655,6 +667,7 @@ class TestGetDagRunsPaginationFilters(TestDagRunEndpoint):
run_id=f"TEST_START_EXEC_DAY_1{i}",
run_type=DagRunType.MANUAL,
logical_date=timezone.parse(dates[i]),
+ run_after=timezone.parse(dates[i]),
start_date=timezone.parse(dates[i]),
external_trigger=True,
state=DagRunState.SUCCESS,
@@ -699,6 +712,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time,
+ "run_after": self.default_time,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
@@ -716,6 +730,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time_2,
+ "run_after": self.default_time_2,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
@@ -777,6 +792,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time_2,
+ "run_after": self.default_time_2,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
@@ -793,6 +809,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
"end_date": None,
"state": "running",
"logical_date": self.default_time,
+ "run_after": self.default_time,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
@@ -925,6 +942,7 @@ class TestGetDagRunBatchPagination(TestDagRunEndpoint):
state="running",
run_type=DagRunType.MANUAL,
logical_date=timezone.parse(self.default_time) +
timedelta(minutes=i),
+ run_after=timezone.parse(self.default_time) +
timedelta(minutes=i),
start_date=timezone.parse(self.default_time),
external_trigger=True,
)
@@ -1008,6 +1026,7 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
run_id=f"TEST_START_EXEC_DAY_1{i}",
run_type=DagRunType.MANUAL,
logical_date=timezone.parse(date),
+ run_after=timezone.parse(date),
start_date=timezone.parse(date),
external_trigger=True,
state="success",
@@ -1084,21 +1103,36 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
class TestPostDagRun(TestDagRunEndpoint):
@time_machine.travel(timezone.utcnow(), tick=False)
@pytest.mark.parametrize(
- "dag_run_id, logical_date, note, data_interval_start,
data_interval_end",
+ "dag_run_id, logical_date, run_after, note, data_interval_start,
data_interval_end",
[
pytest.param(
- "TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note",
None, None, id="all-present"
+ "TEST_DAG_RUN",
+ "2020-06-11T18:00:00+00:00",
+ "2020-06-11T18:00:00+00:00",
+ "test-note",
+ None,
+ None,
+ id="all-present",
),
pytest.param(
"TEST_DAG_RUN",
"2024-06-11T18:00:00+00:00",
+ "2024-06-11T18:00:00+00:00",
"test-note",
"2024-01-03T00:00:00+00:00",
"2024-01-04T05:00:00+00:00",
id="all-present-with-dates",
),
- pytest.param(None, "2020-06-11T18:00:00+00:00", None, None, None,
id="only-date"),
- pytest.param(None, None, None, None, None, id="all-missing"),
+ pytest.param(
+ None,
+ "2020-06-11T18:00:00+00:00",
+ "2020-06-11T18:00:00+00:00",
+ None,
+ None,
+ None,
+ id="only-date",
+ ),
+ pytest.param(None, None, "2020-06-11T18:00:00+00:00", None, None,
None, id="all-missing"),
],
)
def test_should_respond_200(
@@ -1106,20 +1140,18 @@ class TestPostDagRun(TestDagRunEndpoint):
session,
dag_run_id,
logical_date,
+ run_after,
note,
data_interval_start,
data_interval_end,
):
self._create_dag("TEST_DAG_ID")
-
- # We freeze time for this test, so we could check it into the returned
dates.
- fixed_now = timezone.utcnow()
-
# raise NotImplementedError("TODO: Add tests for data_interval_start
and data_interval_end")
request_json = {}
if logical_date is not None:
request_json["logical_date"] = logical_date
+ request_json["run_after"] = run_after
if dag_run_id is not None:
request_json["dag_run_id"] = dag_run_id
if data_interval_start is not None:
@@ -1136,12 +1168,11 @@ class TestPostDagRun(TestDagRunEndpoint):
assert response.status_code == 200
- if logical_date is None:
- expected_logical_date = fixed_now.isoformat()
- else:
- expected_logical_date = logical_date
+ expected_logical_date = logical_date if logical_date is not None else
None
+
+ # when logical_date is null, run_id is run_after + random string.
if dag_run_id is None:
- expected_dag_run_id = f"manual__{expected_logical_date}"
+ expected_dag_run_id = f"manual__{run_after}"
else:
expected_dag_run_id = dag_run_id
@@ -1157,6 +1188,7 @@ class TestPostDagRun(TestDagRunEndpoint):
"dag_run_id": expected_dag_run_id,
"end_date": None,
"logical_date": expected_logical_date,
+ "run_after": run_after,
"external_trigger": True,
"start_date": None,
"state": "queued",
@@ -1167,8 +1199,15 @@ class TestPostDagRun(TestDagRunEndpoint):
"note": note,
}
expected_response_json.update({"triggered_by": "rest_api"} if
AIRFLOW_V_3_0_PLUS else {})
+ response_json = response.json
+ for key in expected_response_json:
+ if key != "dag_run_id":
+ assert response_json[key] == expected_response_json[key],
f"Mismatch on key {key}"
- assert response.json == expected_response_json
+ assert response_json["dag_run_id"].startswith(expected_dag_run_id), (
+ f"dag_run_id '{response_json['dag_run_id']}' does not start with
expected prefix "
+ f"'{expected_dag_run_id}'"
+ )
_check_last_log(session, dag_id="TEST_DAG_ID",
event="api.post_dag_run", logical_date=None)
def test_raises_validation_error_for_invalid_request(self):
@@ -1255,6 +1294,7 @@ class TestPostDagRun(TestDagRunEndpoint):
"api/v1/dags/TEST_DAG_ID/dagRuns",
json={
"logical_date": logical_date,
+ "run_after": logical_date,
},
environ_overrides={"REMOTE_USER": "test"},
)
@@ -1266,6 +1306,7 @@ class TestPostDagRun(TestDagRunEndpoint):
"dag_run_id": dag_run_id,
"end_date": None,
"logical_date": logical_date,
+ "run_after": logical_date,
"external_trigger": True,
"start_date": None,
"state": "queued",
@@ -1512,6 +1553,7 @@ class TestPatchDagRunState(TestDagRunEndpoint):
"dag_run_id": dag_run_id,
"end_date": dr.end_date.isoformat() if state != State.QUEUED else
None,
"logical_date": dr.logical_date.isoformat(),
+ "run_after": dr.run_after.isoformat(),
"external_trigger": False,
"start_date": dr.start_date.isoformat() if state != State.QUEUED
else None,
"state": state,
@@ -1686,6 +1728,7 @@ class TestClearDagRun(TestDagRunEndpoint):
"end_date": None,
"external_trigger": False,
"logical_date": dr.logical_date.isoformat(),
+ "run_after": dr.run_after.isoformat(),
"start_date": None,
"state": "queued",
"data_interval_start": dr.data_interval_start.isoformat(),
@@ -1908,6 +1951,7 @@ class TestSetDagRunNote(TestDagRunEndpoint):
"end_date": dr.end_date.isoformat(),
"external_trigger": True,
"logical_date": self.default_time,
+ "run_after": self.default_time,
"start_date": self.default_time,
"state": "success",
"data_interval_start": None,
@@ -1936,6 +1980,7 @@ class TestSetDagRunNote(TestDagRunEndpoint):
"dag_run_id": dr.run_id,
"end_date": dr.end_date.isoformat(),
"logical_date": self.default_time,
+ "run_after": self.default_time,
"external_trigger": True,
"start_date": self.default_time,
"state": "success",
diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py
b/tests/api_connexion/endpoints/test_xcom_endpoint.py
index 784bc6142d4..598d3a99546 100644
--- a/tests/api_connexion/endpoints/test_xcom_endpoint.py
+++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py
@@ -110,9 +110,13 @@ class TestGetXComEntry(TestXComEndpoint):
dag_id = "test-dag-id"
task_id = "test-task-id"
logical_date = "2005-04-02T00:00:00+00:00"
+ run_after = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
logical_date_parsed = timezone.parse(logical_date)
- run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+ run_after_parsed = timezone.parse(run_after)
+ run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id,
xcom_key, {"key": "value"})
response = self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
@@ -136,9 +140,13 @@ class TestGetXComEntry(TestXComEndpoint):
dag_id = "test-dag-id"
task_id = "test-task-id"
logical_date = "2005-04-02T00:00:00+00:00"
+ run_after = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
logical_date_parsed = timezone.parse(logical_date)
- run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+ run_after_parsed = timezone.parse(run_after)
+ run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id,
xcom_key, {"key": "value"})
response = self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}?stringify=false",
@@ -162,9 +170,13 @@ class TestGetXComEntry(TestXComEndpoint):
dag_id = "test-dag-id"
task_id = "test-task-id"
logical_date = "2005-04-02T00:00:00+00:00"
+ run_after = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
logical_date_parsed = timezone.parse(logical_date)
- run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+ run_after_parsed = timezone.parse(run_after)
+ run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id,
xcom_key)
response = self.client.get(
f"/api/v1/dags/nonexistentdagid/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
@@ -177,9 +189,13 @@ class TestGetXComEntry(TestXComEndpoint):
dag_id = "test-dag-id"
task_id = "test-task-id"
logical_date = "2005-04-02T00:00:00+00:00"
+ run_after = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
logical_date_parsed = timezone.parse(logical_date)
- run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+ run_after_parsed = timezone.parse(run_after)
+ run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id,
xcom_key)
response = self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}"
@@ -191,9 +207,13 @@ class TestGetXComEntry(TestXComEndpoint):
dag_id = "test-dag-id"
task_id = "test-task-id"
logical_date = "2005-04-02T00:00:00+00:00"
+ run_after = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
logical_date_parsed = timezone.parse(logical_date)
- run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+ run_after_parsed = timezone.parse(run_after)
+ run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id,
xcom_key)
response = self.client.get(
@@ -303,8 +323,12 @@ class TestGetXComEntries(TestXComEndpoint):
dag_id = "test-dag-id"
task_id = "test-task-id"
logical_date = "2005-04-02T00:00:00+00:00"
+ run_after = "2005-04-02T00:00:00+00:00"
logical_date_parsed = timezone.parse(logical_date)
- run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+ run_after_parsed = timezone.parse(run_after)
+ run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entries(dag_id, run_id, logical_date_parsed, task_id)
response = self.client.get(
@@ -345,13 +369,19 @@ class TestGetXComEntries(TestXComEndpoint):
dag_id_1 = "test-dag-id-1"
task_id_1 = "test-task-id-1"
logical_date = "2005-04-02T00:00:00+00:00"
+ run_after = "2005-04-02T00:00:00+00:00"
logical_date_parsed = timezone.parse(logical_date)
- run_id_1 = DagRun.generate_run_id(DagRunType.MANUAL,
logical_date_parsed)
+ run_after_parsed = timezone.parse(run_after)
+ run_id_1 = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entries(dag_id_1, run_id_1, logical_date_parsed,
task_id_1)
dag_id_2 = "test-dag-id-2"
task_id_2 = "test-task-id-2"
- run_id_2 = DagRun.generate_run_id(DagRunType.MANUAL,
logical_date_parsed)
+ run_id_2 = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entries(dag_id_2, run_id_2, logical_date_parsed,
task_id_2)
response = self.client.get(
@@ -409,7 +439,11 @@ class TestGetXComEntries(TestXComEndpoint):
task_id = "test-task-id"
logical_date = "2005-04-02T00:00:00+00:00"
logical_date_parsed = timezone.parse(logical_date)
- dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL,
logical_date_parsed)
+ run_after = "2005-04-02T00:00:00+00:00"
+ run_after_parsed = timezone.parse(run_after)
+ dag_run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entries(dag_id, dag_run_id, logical_date_parsed,
task_id, mapped_ti=True)
def assert_expected_result(expected_entries, map_index=None):
@@ -453,7 +487,11 @@ class TestGetXComEntries(TestXComEndpoint):
task_id = "test-task-id"
logical_date = "2005-04-02T00:00:00+00:00"
logical_date_parsed = timezone.parse(logical_date)
- dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL,
logical_date_parsed)
+ run_after = "2005-04-02T00:00:00+00:00"
+ run_after_parsed = timezone.parse(run_after)
+ dag_run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entries(dag_id, dag_run_id, logical_date_parsed,
task_id, mapped_ti=True)
def assert_expected_result(expected_entries, key=None):
@@ -495,7 +533,11 @@ class TestGetXComEntries(TestXComEndpoint):
task_id = "test-task-id"
logical_date = "2005-04-02T00:00:00+00:00"
logical_date_parsed = timezone.parse(logical_date)
- run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+ run_after = "2005-04-02T00:00:00+00:00"
+ run_after_parsed = timezone.parse(run_after)
+ run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+ )
self._create_xcom_entries(dag_id, run_id, logical_date_parsed, task_id)
response = self.client.get(
@@ -580,7 +622,11 @@ class TestPaginationGetXComEntries(TestXComEndpoint):
self.task_id = "test-task-id"
self.logical_date = "2005-04-02T00:00:00+00:00"
self.logical_date_parsed = timezone.parse(self.logical_date)
- self.run_id = DagRun.generate_run_id(DagRunType.MANUAL,
self.logical_date_parsed)
+ run_after = "2005-04-02T00:00:00+00:00"
+ run_after_parsed = timezone.parse(run_after)
+ self.run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=self.logical_date_parsed,
run_after=run_after_parsed
+ )
@pytest.mark.parametrize(
"query_params, expected_xcom_ids",
diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py
b/tests/api_connexion/schemas/test_dag_run_schema.py
index 61b3dac6350..251946d17fa 100644
--- a/tests/api_connexion/schemas/test_dag_run_schema.py
+++ b/tests/api_connexion/schemas/test_dag_run_schema.py
@@ -63,6 +63,7 @@ class TestDAGRunSchema(TestDAGRunBase):
state="running",
run_type=DagRunType.MANUAL.value,
logical_date=timezone.parse(self.default_time),
+ run_after=timezone.parse(self.default_time),
start_date=timezone.parse(self.default_time),
conf='{"start": "stop"}',
**triggered_by_kwargs,
@@ -85,6 +86,7 @@ class TestDAGRunSchema(TestDAGRunBase):
"last_scheduling_decision": None,
"run_type": "manual",
"note": None,
+ "run_after": self.default_time,
}
expected_deserialized_dagrun.update({"triggered_by": "test"} if
AIRFLOW_V_3_0_PLUS else {})
@@ -94,30 +96,30 @@ class TestDAGRunSchema(TestDAGRunBase):
"serialized_dagrun, expected_result",
[
( # Conf not provided
- {"dag_run_id": "my-dag-run", "logical_date": DEFAULT_TIME},
- {"run_id": "my-dag-run", "logical_date": parse(DEFAULT_TIME)},
+ {"dag_run_id": "my-dag-run", "run_after": DEFAULT_TIME},
+ {"run_id": "my-dag-run", "run_after": parse(DEFAULT_TIME)},
),
(
{
"dag_run_id": "my-dag-run",
- "logical_date": DEFAULT_TIME,
+ "run_after": DEFAULT_TIME,
"conf": {"start": "stop"},
},
{
"run_id": "my-dag-run",
- "logical_date": parse(DEFAULT_TIME),
+ "run_after": parse(DEFAULT_TIME),
"conf": {"start": "stop"},
},
),
(
{
"dag_run_id": "my-dag-run",
- "logical_date": DEFAULT_TIME,
+ "run_after": DEFAULT_TIME,
"conf": '{"start": "stop"}',
},
{
"run_id": "my-dag-run",
- "logical_date": parse(DEFAULT_TIME),
+ "run_after": parse(DEFAULT_TIME),
"conf": {"start": "stop"},
},
),
@@ -131,7 +133,7 @@ class TestDAGRunSchema(TestDAGRunBase):
"""Dag_run_id and logical_date fields are autogenerated if missing"""
serialized_dagrun = {}
result = dagrun_schema.load(serialized_dagrun)
- assert result == {"logical_date": result["logical_date"], "run_id":
result["run_id"]}
+ assert result == {"run_after": result["run_after"], "run_id":
result["run_id"]}
def test_invalid_logical_date_raises(self):
serialized_dagrun = {"logical_date": "mydate"}
@@ -151,6 +153,7 @@ class TestDagRunCollection(TestDAGRunBase):
run_id="my-dag-run",
state="running",
logical_date=timezone.parse(self.default_time),
+ run_after=timezone.parse(self.default_time),
run_type=DagRunType.MANUAL.value,
start_date=timezone.parse(self.default_time),
conf='{"start": "stop"}',
@@ -176,6 +179,7 @@ class TestDagRunCollection(TestDAGRunBase):
"dag_run_id": "my-dag-run",
"end_date": None,
"logical_date": self.default_time,
+ "run_after": self.default_time,
"external_trigger": True,
"state": "running",
"start_date": self.default_time,
@@ -195,6 +199,7 @@ class TestDagRunCollection(TestDAGRunBase):
"end_date": None,
"state": "running",
"logical_date": self.second_time,
+ "run_after": self.second_time,
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
diff --git a/tests/api_fastapi/core_api/routes/public/test_xcom.py
b/tests/api_fastapi/core_api/routes/public/test_xcom.py
index e3d9b3641a9..dd5a073c1ba 100644
--- a/tests/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/tests/api_fastapi/core_api/routes/public/test_xcom.py
@@ -49,7 +49,10 @@ TEST_TASK_ID_2 = "test-task-id-2"
logical_date_parsed = timezone.parse(TEST_EXECUTION_DATE)
logical_date_formatted = logical_date_parsed.strftime("%Y-%m-%dT%H:%M:%SZ")
-run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+run_after_parsed = timezone.parse(TEST_EXECUTION_DATE)
+run_id = DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=logical_date_parsed,
run_after=run_after_parsed
+)
@provide_session
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 8d82a249aec..76b1ceb0e9a 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -138,16 +138,17 @@ def create_dagrun(session):
state: DagRunState = DagRunState.RUNNING,
start_date: datetime | None = None,
) -> DagRun:
- run_id = dag.timetable.generate_run_id(
+ run_after = logical_date or timezone.utcnow()
+ run_id = DagRun.generate_run_id(
run_type=run_type,
logical_date=logical_date,
- data_interval=data_interval,
+ run_after=run_after,
)
return dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
- run_after=data_interval.end,
+ run_after=run_after,
run_type=run_type,
state=state,
start_date=start_date,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index af815279737..aee56bee883 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -156,7 +156,7 @@ def _create_dagrun(
data_interval = DataInterval(*map(timezone.coerce_datetime,
data_interval))
run_id = dag.timetable.generate_run_id(
run_type=run_type,
- logical_date=logical_date, # type: ignore
+ run_after=logical_date or data_interval.end, # type: ignore
data_interval=data_interval,
)
return dag.create_dagrun(
@@ -2974,7 +2974,7 @@ def
test_get_asset_triggered_next_run_info_with_unresolved_asset_alias(dag_maker
)
def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type:
DagRunType) -> None:
dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule="@daily")
- run_id = run_id_type.generate_run_id(DEFAULT_DATE)
+ run_id = DagRun.generate_run_id(run_type=run_id_type,
run_after=DEFAULT_DATE, logical_date=DEFAULT_DATE)
with pytest.raises(ValueError) as ctx:
dag.create_dagrun(
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index f4324abf485..b7328ba3713 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -111,7 +111,7 @@ class TestDagRun:
dag_run = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=run_type,
- logical_date=logical_date,
+ run_after=logical_date,
data_interval=data_interval,
),
run_type=run_type,
@@ -863,7 +863,7 @@ class TestDagRun:
dr = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
- logical_date=DEFAULT_DATE,
+ run_after=DEFAULT_DATE,
data_interval=dag.infer_automated_data_interval(DEFAULT_DATE),
),
run_type=DagRunType.SCHEDULED,
@@ -941,7 +941,7 @@ class TestDagRun:
dag_run = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
- logical_date=dag.start_date,
+ run_after=dag.start_date,
data_interval=dag.infer_automated_data_interval(dag.start_date),
),
run_type=DagRunType.SCHEDULED,
@@ -1051,7 +1051,7 @@ def
test_verify_integrity_task_start_and_end_date(Stats_incr, session, run_type,
dag_id=dag.dag_id,
run_type=run_type,
logical_date=DEFAULT_DATE,
- run_id=DagRun.generate_run_id(run_type, DEFAULT_DATE),
+ run_id=DagRun.generate_run_id(run_type=run_type,
logical_date=DEFAULT_DATE, run_after=DEFAULT_DATE),
)
dag_run.dag = dag
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 3d985394646..2d9d1a6edcb 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -3611,7 +3611,12 @@ class TestTaskInstance:
assert os.environ["AIRFLOW_CTX_DAG_ID"] == "test_echo_env_variables"
assert os.environ["AIRFLOW_CTX_TASK_ID"] == "hive_in_python_op"
assert DEFAULT_DATE.isoformat() ==
os.environ["AIRFLOW_CTX_LOGICAL_DATE"]
- assert DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE) ==
os.environ["AIRFLOW_CTX_DAG_RUN_ID"]
+ assert (
+ DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE,
run_after=DEFAULT_DATE
+ )
+ == os.environ["AIRFLOW_CTX_DAG_RUN_ID"]
+ )
def test_echo_env_variables(self, dag_maker):
with dag_maker(
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index ea0d95290f0..45d7b264c44 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -58,7 +58,9 @@ def reset_db():
@pytest.fixture
def task_instance_factory(request, session: Session):
def func(*, dag_id, task_id, logical_date):
- run_id = DagRun.generate_run_id(DagRunType.SCHEDULED, logical_date)
+ run_id = DagRun.generate_run_id(
+ run_type=DagRunType.SCHEDULED, logical_date=logical_date,
run_after=logical_date
+ )
run = DagRun(
dag_id=dag_id,
run_type=DagRunType.SCHEDULED,
diff --git a/tests/operators/test_trigger_dagrun.py
b/tests/operators/test_trigger_dagrun.py
index db86eb0fa7d..1c72dae332a 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -125,7 +125,9 @@ class TestDagRunOperator:
dagrun = dag_maker.session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).one()
assert dagrun.external_trigger
- assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL,
dagrun.logical_date)
+ assert dagrun.run_id == DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=dagrun.logical_date,
run_after=dagrun.logical_date
+ )
self.assert_extra_link(dagrun, task, dag_maker.session)
def test_trigger_dagrun_custom_run_id(self, dag_maker):
@@ -167,7 +169,9 @@ class TestDagRunOperator:
dagrun = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).one()
assert dagrun.external_trigger
assert dagrun.logical_date == custom_logical_date
- assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL,
custom_logical_date)
+ assert dagrun.run_id == DagRun.generate_run_id(
+ run_type=DagRunType.MANUAL, logical_date=custom_logical_date,
run_after=custom_logical_date
+ )
self.assert_extra_link(dagrun, task, session)
def test_trigger_dagrun_twice(self, dag_maker):
diff --git a/tests/sensors/test_external_task_sensor.py
b/tests/sensors/test_external_task_sensor.py
index d6bf001c5bd..239eca8cdc1 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -115,7 +115,7 @@ class TestExternalTaskSensor:
self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
self.dag = DAG(TEST_DAG_ID, schedule=None, default_args=self.args)
- self.dag_run_id = DagRunType.MANUAL.generate_run_id(DEFAULT_DATE)
+ self.dag_run_id =
DagRunType.MANUAL.generate_run_id(suffix=DEFAULT_DATE.isoformat())
def add_time_sensor(self, task_id=TEST_TASK_ID):
op = TimeSensor(task_id=task_id, target_time=time(0), dag=self.dag)
@@ -1239,7 +1239,7 @@ def run_tasks(
runs[dag.dag_id] = dagrun = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL,
- logical_date=logical_date,
+ run_after=logical_date,
data_interval=data_interval,
),
logical_date=logical_date,
diff --git a/tests/timetables/test_assets_timetable.py
b/tests/timetables/test_assets_timetable.py
index 541ef2abb6e..0158ddfd5cf 100644
--- a/tests/timetables/test_assets_timetable.py
+++ b/tests/timetables/test_assets_timetable.py
@@ -208,7 +208,11 @@ def test_generate_run_id(asset_timetable:
AssetOrTimeSchedule) -> None:
:param asset_timetable: The AssetOrTimeSchedule instance to test.
"""
run_id = asset_timetable.generate_run_id(
- run_type=DagRunType.MANUAL, extra_args="test",
logical_date=DateTime.now(), data_interval=None
+ run_type=DagRunType.MANUAL,
+ extra_args="test",
+ logical_date=DateTime.now(),
+ run_after=DateTime.now(),
+ data_interval=None,
)
assert isinstance(run_id, str)
diff --git a/tests/utils/test_state.py b/tests/utils/test_state.py
index de393f38c35..035740db46f 100644
--- a/tests/utils/test_state.py
+++ b/tests/utils/test_state.py
@@ -41,7 +41,7 @@ def test_dagrun_state_enum_escape():
dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
- logical_date=DEFAULT_DATE,
+ run_after=DEFAULT_DATE,
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
),
run_type=DagRunType.SCHEDULED,
diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py
index 150b48f7459..76ddb082283 100644
--- a/tests_common/pytest_plugin.py
+++ b/tests_common/pytest_plugin.py
@@ -929,11 +929,18 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
if "run_type" not in kwargs:
kwargs["run_id"] = "test"
else:
- kwargs["run_id"] = dag.timetable.generate_run_id(
- run_type=run_type,
- logical_date=logical_date,
- data_interval=data_interval,
- )
+ if AIRFLOW_V_3_0_PLUS:
+ kwargs["run_id"] = dag.timetable.generate_run_id(
+ run_type=run_type,
+ run_after=logical_date or
timezone.coerce_datetime(timezone.utcnow()),
+ data_interval=data_interval,
+ )
+ else:
+ kwargs["run_id"] = dag.timetable.generate_run_id(
+ run_type=run_type,
+ logical_date=logical_date or
timezone.coerce_datetime(timezone.utcnow()),
+ data_interval=data_interval,
+ )
kwargs["run_type"] = run_type
if AIRFLOW_V_3_0_PLUS: