This is an automated email from the ASF dual-hosted git repository.

phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 035060d7f38 AIP-83 amendment: Add logic for generating run_id when 
logical date is None. (#46616)
035060d7f38 is described below

commit 035060d7f384a4989eddb6fb05f512f9c6a7e5bf
Author: Ankit Chaurasia <[email protected]>
AuthorDate: Tue Feb 11 17:25:07 2025 +0545

    AIP-83 amendment: Add logic for generating run_id when logical date is 
None. (#46616)
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow/api/common/trigger_dag.py                  | 51 ++++++++-------
 .../api_connexion/endpoints/dag_run_endpoint.py    |  9 ++-
 airflow/api_connexion/schemas/dag_run_schema.py    | 21 ++++---
 airflow/api_fastapi/core_api/datamodels/dag_run.py |  8 ++-
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 16 +++--
 .../api_fastapi/core_api/routes/public/dag_run.py  | 21 +++++--
 airflow/jobs/scheduler_job_runner.py               |  8 +--
 airflow/models/backfill.py                         |  8 +--
 airflow/models/baseoperator.py                     |  6 +-
 airflow/models/dag.py                              | 26 +++++---
 airflow/models/dagrun.py                           | 17 ++++-
 airflow/timetables/base.py                         |  4 +-
 airflow/timetables/simple.py                       | 16 +++--
 airflow/ui/openapi-gen/requests/schemas.gen.ts     | 17 +++--
 airflow/ui/openapi-gen/requests/types.gen.ts       |  3 +-
 airflow/utils/types.py                             | 11 ++--
 airflow/www/views.py                               | 26 ++++++--
 docker_tests/test_docker_compose_quick_start.py    |  6 +-
 kubernetes_tests/test_base.py                      |  7 ++-
 kubernetes_tests/test_kubernetes_pod_operator.py   | 27 ++++++--
 .../cncf/kubernetes/operators/test_job.py          | 21 +++++--
 .../cncf/kubernetes/operators/test_pod.py          |  4 +-
 .../kubernetes/operators/test_spark_kubernetes.py  |  4 +-
 .../api_endpoints/test_dag_run_endpoint.py         |  2 +
 .../api_endpoints/test_xcom_endpoint.py            | 14 ++++-
 .../microsoft/azure/operators/test_data_factory.py |  4 +-
 .../microsoft/azure/sensors/test_wasb.py           |  8 ++-
 .../snowflake/operators/test_snowflake.py          |  4 +-
 .../providers/standard/operators/trigger_dagrun.py |  6 +-
 .../endpoints/test_dag_run_endpoint.py             | 73 +++++++++++++++++-----
 .../api_connexion/endpoints/test_xcom_endpoint.py  | 70 +++++++++++++++++----
 tests/api_connexion/schemas/test_dag_run_schema.py | 19 +++---
 .../core_api/routes/public/test_xcom.py            |  5 +-
 tests/jobs/test_scheduler_job.py                   |  7 ++-
 tests/models/test_dag.py                           |  4 +-
 tests/models/test_dagrun.py                        |  8 +--
 tests/models/test_taskinstance.py                  |  7 ++-
 tests/models/test_xcom.py                          |  4 +-
 tests/operators/test_trigger_dagrun.py             |  8 ++-
 tests/sensors/test_external_task_sensor.py         |  4 +-
 tests/timetables/test_assets_timetable.py          |  6 +-
 tests/utils/test_state.py                          |  2 +-
 tests_common/pytest_plugin.py                      | 17 +++--
 43 files changed, 441 insertions(+), 168 deletions(-)

diff --git a/airflow/api/common/trigger_dag.py 
b/airflow/api/common/trigger_dag.py
index 08ee1726ccc..8a90438af70 100644
--- a/airflow/api/common/trigger_dag.py
+++ b/airflow/api/common/trigger_dag.py
@@ -42,6 +42,7 @@ def _trigger_dag(
     dag_bag: DagBag,
     *,
     triggered_by: DagRunTriggeredByType,
+    run_after: datetime,
     run_id: str | None = None,
     conf: dict | str | None = None,
     logical_date: datetime | None = None,
@@ -54,6 +55,7 @@ def _trigger_dag(
     :param dag_id: DAG ID
     :param dag_bag: DAG Bag model
     :param triggered_by: the entity which triggers the dag_run
+    :param run_after: the datetime before which dag cannot run.
     :param run_id: ID of the run
     :param conf: configuration
     :param logical_date: logical date of the run
@@ -65,26 +67,30 @@ def _trigger_dag(
     if dag is None or dag_id not in dag_bag.dags:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
-    logical_date = logical_date or timezone.utcnow()
-
-    if not timezone.is_localized(logical_date):
-        raise ValueError("The logical date should be localized")
-
-    if replace_microseconds:
-        logical_date = logical_date.replace(microsecond=0)
-
-    if dag.default_args and "start_date" in dag.default_args:
-        min_dag_start_date = dag.default_args["start_date"]
-        if min_dag_start_date and logical_date < min_dag_start_date:
-            raise ValueError(
-                f"Logical date [{logical_date.isoformat()}] should be >= 
start_date "
-                f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
-            )
-    coerced_logical_date = timezone.coerce_datetime(logical_date)
-
-    data_interval = 
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
-    run_id = run_id or dag.timetable.generate_run_id(
-        run_type=DagRunType.MANUAL, logical_date=coerced_logical_date, 
data_interval=data_interval
+    if logical_date:
+        if not timezone.is_localized(logical_date):
+            raise ValueError("The logical date should be localized")
+
+        if replace_microseconds:
+            logical_date = logical_date.replace(microsecond=0)
+
+        if dag.default_args and "start_date" in dag.default_args:
+            min_dag_start_date = dag.default_args["start_date"]
+            if min_dag_start_date and logical_date < min_dag_start_date:
+                raise ValueError(
+                    f"Logical date [{logical_date.isoformat()}] should be >= 
start_date "
+                    f"[{min_dag_start_date.isoformat()}] from DAG's 
default_args"
+                )
+        coerced_logical_date = timezone.coerce_datetime(logical_date)
+        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
+    else:
+        coerced_logical_date = None
+        data_interval = None
+
+    run_id = run_id or DagRun.generate_run_id(
+        run_type=DagRunType.MANUAL,
+        logical_date=coerced_logical_date,
+        run_after=timezone.coerce_datetime(run_after),
     )
 
     # This intentionally does not use 'session' in the current scope because it
@@ -102,7 +108,7 @@ def _trigger_dag(
         run_id=run_id,
         logical_date=logical_date,
         data_interval=data_interval,
-        run_after=data_interval.end,
+        run_after=run_after,
         conf=run_conf,
         run_type=DagRunType.MANUAL,
         triggered_by=triggered_by,
@@ -120,6 +126,7 @@ def trigger_dag(
     dag_id: str,
     *,
     triggered_by: DagRunTriggeredByType,
+    run_after: datetime | None = None,
     run_id: str | None = None,
     conf: dict | str | None = None,
     logical_date: datetime | None = None,
@@ -131,6 +138,7 @@ def trigger_dag(
 
     :param dag_id: DAG ID
     :param triggered_by: the entity which triggers the dag_run
+    :param run_after: the datetime before which dag won't run.
     :param run_id: ID of the dag_run
     :param conf: configuration
     :param logical_date: date of execution
@@ -147,6 +155,7 @@ def trigger_dag(
         dag_id=dag_id,
         dag_bag=dagbag,
         run_id=run_id,
+        run_after=run_after or timezone.utcnow(),
         conf=conf,
         logical_date=logical_date,
         replace_microseconds=replace_microseconds,
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 28e85cfe61a..cddf67261d5 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -327,7 +327,8 @@ def post_dag_run(*, dag_id: str, session: Session = 
NEW_SESSION) -> APIResponse:
     except ValidationError as err:
         raise BadRequest(detail=str(err))
 
-    logical_date = pendulum.instance(post_body["logical_date"])
+    logical_date = pendulum.instance(post_body["logical_date"]) if 
post_body.get("logical_date") else None
+    run_after = pendulum.instance(post_body["run_after"])
     run_id = post_body["run_id"]
     dagrun_instance = session.scalar(
         select(DagRun)
@@ -352,12 +353,14 @@ def post_dag_run(*, dag_id: str, session: Session = 
NEW_SESSION) -> APIResponse:
                     end=pendulum.instance(data_interval_end),
                 )
             else:
-                data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+                data_interval = (
+                    
dag.timetable.infer_manual_data_interval(run_after=logical_date) if 
logical_date else None
+                )
             dag_run = dag.create_dagrun(
                 run_id=run_id,
                 logical_date=logical_date,
                 data_interval=data_interval,
-                run_after=data_interval.end,
+                run_after=run_after,
                 conf=post_body.get("conf"),
                 run_type=DagRunType.MANUAL,
                 triggered_by=DagRunTriggeredByType.REST_API,
diff --git a/airflow/api_connexion/schemas/dag_run_schema.py 
b/airflow/api_connexion/schemas/dag_run_schema.py
index c2560613def..8b3a8df54c8 100644
--- a/airflow/api_connexion/schemas/dag_run_schema.py
+++ b/airflow/api_connexion/schemas/dag_run_schema.py
@@ -63,7 +63,8 @@ class DAGRunSchema(SQLAlchemySchema):
 
     run_id = auto_field(data_key="dag_run_id")
     dag_id = auto_field(dump_only=True)
-    logical_date = auto_field(data_key="logical_date", 
validate=validate_istimezone)
+    logical_date = auto_field(data_key="logical_date", allow_none=True, 
validate=validate_istimezone)
+    run_after = auto_field(data_key="run_after", validate=validate_istimezone)
     start_date = auto_field(dump_only=True)
     end_date = auto_field(dump_only=True)
     state = DagStateField(dump_only=True)
@@ -78,17 +79,23 @@ class DAGRunSchema(SQLAlchemySchema):
 
     @pre_load
     def autogenerate(self, data, **kwargs):
-        """Auto generate run_id and logical_date if they are not provided."""
-        logical_date = data.get("logical_date", _MISSING)
+        """Auto generate run_id and run_after if they are not provided."""
+        run_after = data.get("run_after", _MISSING)
 
-        # Auto-generate logical_date if missing
-        if logical_date is _MISSING:
-            data["logical_date"] = str(timezone.utcnow())
+        # Auto-generate run_after if missing
+        if run_after is _MISSING:
+            data["run_after"] = str(timezone.utcnow())
 
         if "dag_run_id" not in data:
             try:
+                if logical_date_str := data.get("logical_date"):
+                    logical_date = timezone.parse(logical_date_str)
+                else:
+                    logical_date = None
                 data["dag_run_id"] = DagRun.generate_run_id(
-                    DagRunType.MANUAL, timezone.parse(data["logical_date"])
+                    run_type=DagRunType.MANUAL,
+                    logical_date=logical_date,
+                    run_after=timezone.parse(data["run_after"]),
                 )
             except (ParserError, TypeError) as err:
                 raise BadRequest("Incorrect datetime argument", 
detail=str(err))
diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 18be129a195..295eedca424 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -20,11 +20,11 @@ from __future__ import annotations
 from datetime import datetime
 from enum import Enum
 
-import pendulum
 from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator
 
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
 from airflow.models import DagRun
+from airflow.utils import timezone
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
@@ -82,9 +82,11 @@ class TriggerDAGRunPostBody(StrictBaseModel):
     """Trigger DAG Run Serializer for POST body."""
 
     dag_run_id: str | None = None
-    logical_date: AwareDatetime | None
     data_interval_start: AwareDatetime | None = None
     data_interval_end: AwareDatetime | None = None
+    logical_date: AwareDatetime | None
+    run_after: datetime = Field(default_factory=timezone.utcnow)
+
     conf: dict = Field(default_factory=dict)
     note: str | None = None
 
@@ -102,7 +104,7 @@ class TriggerDAGRunPostBody(StrictBaseModel):
     def validate_dag_run_id(self):
         if not self.dag_run_id:
             self.dag_run_id = DagRun.generate_run_id(
-                DagRunType.MANUAL, self.logical_date or pendulum.now("UTC")
+                run_type=DagRunType.MANUAL, logical_date=self.logical_date, 
run_after=self.run_after
             )
         return self
 
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 6d2223e6c25..ade0da25b5d 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -10501,12 +10501,6 @@ components:
           - type: string
           - type: 'null'
           title: Dag Run Id
-        logical_date:
-          anyOf:
-          - type: string
-            format: date-time
-          - type: 'null'
-          title: Logical Date
         data_interval_start:
           anyOf:
           - type: string
@@ -10519,6 +10513,16 @@ components:
             format: date-time
           - type: 'null'
           title: Data Interval End
+        logical_date:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Logical Date
+        run_after:
+          type: string
+          format: date-time
+          title: Run After
         conf:
           type: object
           title: Conf
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 19bf526efc9..afce667e8cf 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -347,7 +347,6 @@ def trigger_dag_run(
 ) -> DAGRunResponse:
     """Trigger a DAG."""
     dm = session.scalar(select(DagModel).where(DagModel.is_active, 
DagModel.dag_id == dag_id).limit(1))
-    now = pendulum.now("UTC")
     if not dm:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: 
'{dag_id}' not found")
 
@@ -359,6 +358,7 @@ def trigger_dag_run(
 
     logical_date = timezone.coerce_datetime(body.logical_date)
     coerced_logical_date = timezone.coerce_datetime(logical_date)
+    run_after = timezone.coerce_datetime(body.run_after)
 
     try:
         dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
@@ -369,13 +369,26 @@ def trigger_dag_run(
                 end=pendulum.instance(body.data_interval_end),
             )
         else:
-            data_interval = 
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date or now)
+            if body.logical_date:
+                data_interval = dag.timetable.infer_manual_data_interval(
+                    run_after=coerced_logical_date or run_after
+                )
+                run_after = data_interval.end
+            else:
+                data_interval = None
+
+        if body.dag_run_id:
+            run_id = body.dag_run_id
+        else:
+            run_id = DagRun.generate_run_id(
+                run_type=DagRunType.SCHEDULED, 
logical_date=coerced_logical_date, run_after=run_after
+            )
 
         dag_run = dag.create_dagrun(
-            run_id=cast(str, body.dag_run_id),
+            run_id=run_id,
             logical_date=coerced_logical_date,
             data_interval=data_interval,
-            run_after=data_interval.end,
+            run_after=run_after,
             conf=body.conf,
             run_type=DagRunType.MANUAL,
             triggered_by=DagRunTriggeredByType.REST_API,
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index df1cb5ab529..f340893e5ae 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1286,7 +1286,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     dag.create_dagrun(
                         run_id=dag.timetable.generate_run_id(
                             run_type=DagRunType.SCHEDULED,
-                            logical_date=dag_model.next_dagrun,
+                            run_after=dag_model.next_dagrun,
                             data_interval=data_interval,
                         ),
                         logical_date=dag_model.next_dagrun,
@@ -1394,12 +1394,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
                 data_interval = 
dag.timetable.data_interval_for_events(logical_date, asset_events)
                 dag_run = dag.create_dagrun(
-                    run_id=dag.timetable.generate_run_id(
+                    run_id=DagRun.generate_run_id(
                         run_type=DagRunType.ASSET_TRIGGERED,
                         logical_date=logical_date,
-                        data_interval=data_interval,
-                        session=session,
-                        events=asset_events,
+                        run_after=max(logical_dates.values()),
                     ),
                     logical_date=logical_date,
                     data_interval=data_interval,
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index 735f09f3a3f..b4b15a6e613 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -286,6 +286,8 @@ def _create_backfill_dag_run(
     backfill_sort_ordinal,
     session,
 ):
+    from airflow.models.dagrun import DagRun
+
     with session.begin_nested():
         should_skip_create_backfill = should_create_backfill_dag_run(
             info, reprocess_behavior, backfill_id, backfill_sort_ordinal, 
session
@@ -296,10 +298,8 @@ def _create_backfill_dag_run(
         dag_version = DagVersion.get_latest_version(dag.dag_id, 
session=session)
         try:
             dr = dag.create_dagrun(
-                run_id=dag.timetable.generate_run_id(
-                    run_type=DagRunType.BACKFILL_JOB,
-                    logical_date=info.logical_date,
-                    data_interval=info.data_interval,
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.BACKFILL_JOB, 
logical_date=info.logical_date, run_after=info.run_after
                 ),
                 logical_date=info.logical_date,
                 data_interval=info.data_interval,
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 3361fe33df6..ea6a7c34b11 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -628,7 +628,11 @@ class BaseOperator(TaskSDKBaseOperator, AbstractOperator, 
metaclass=BaseOperator
                 # This is _mostly_ only used in tests
                 dr = DagRun(
                     dag_id=self.dag_id,
-                    run_id=DagRun.generate_run_id(DagRunType.MANUAL, 
info.logical_date),
+                    run_id=DagRun.generate_run_id(
+                        run_type=DagRunType.MANUAL,
+                        logical_date=info.logical_date,
+                        run_after=info.run_after,
+                    ),
                     run_type=DagRunType.MANUAL,
                     logical_date=info.logical_date,
                     data_interval=info.data_interval,
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7ebbb48b4b4..111852f2728 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1602,6 +1602,7 @@ class DAG(TaskSDKDag, LoggingMixin):
     @provide_session
     def test(
         self,
+        run_after: datetime | None = None,
         logical_date: datetime | None = None,
         run_conf: dict[str, Any] | None = None,
         conn_file_path: str | None = None,
@@ -1613,6 +1614,7 @@ class DAG(TaskSDKDag, LoggingMixin):
         """
         Execute one single DagRun for a given DAG and logical date.
 
+        :param run_after: the datetime before which to Dag won't run.
         :param logical_date: logical date for the DAG run
         :param run_conf: configuration to pass to newly created dagrun
         :param conn_file_path: file path to a connection file in either yaml 
or json
@@ -1652,7 +1654,6 @@ class DAG(TaskSDKDag, LoggingMixin):
             exit_stack.callback(lambda: secrets_backend_list.pop(0))
 
         with exit_stack:
-            logical_date = logical_date or timezone.utcnow()
             self.validate()
             self.log.debug("Clearing existing task instances for logical date 
%s", logical_date)
             self.clear(
@@ -1663,16 +1664,23 @@ class DAG(TaskSDKDag, LoggingMixin):
             )
             self.log.debug("Getting dagrun for dag %s", self.dag_id)
             logical_date = timezone.coerce_datetime(logical_date)
-            data_interval = 
self.timetable.infer_manual_data_interval(run_after=logical_date)
+            run_after = timezone.coerce_datetime(run_after) or 
timezone.coerce_datetime(timezone.utcnow())
+            data_interval = (
+                
self.timetable.infer_manual_data_interval(run_after=logical_date) if 
logical_date else None
+            )
             scheduler_dag = 
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self))
 
             dr: DagRun = _get_or_create_dagrun(
                 dag=scheduler_dag,
-                start_date=logical_date,
+                start_date=logical_date or run_after,
                 logical_date=logical_date,
                 data_interval=data_interval,
-                run_after=data_interval.end,
-                run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+                run_after=run_after,
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.MANUAL,
+                    logical_date=logical_date,
+                    run_after=run_after,
+                ),
                 session=session,
                 conf=run_conf,
                 triggered_by=DagRunTriggeredByType.TEST,
@@ -1756,8 +1764,8 @@ class DAG(TaskSDKDag, LoggingMixin):
         self,
         *,
         run_id: str,
-        logical_date: datetime | None,
-        data_interval: tuple[datetime, datetime],
+        logical_date: datetime | None = None,
+        data_interval: tuple[datetime, datetime] | None = None,
         run_after: datetime,
         conf: dict | None = None,
         run_type: DagRunType,
@@ -2485,8 +2493,8 @@ def _get_or_create_dagrun(
     *,
     dag: DAG,
     run_id: str,
-    logical_date: datetime,
-    data_interval: tuple[datetime, datetime],
+    logical_date: datetime | None,
+    data_interval: tuple[datetime, datetime] | None,
     run_after: datetime,
     conf: dict | None,
     triggered_by: DagRunTriggeredByType,
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index d9bf14c3f98..199762d69ef 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -78,6 +78,7 @@ from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks
 from airflow.utils.state import DagRunState, State, TaskInstanceState
+from airflow.utils.strings import get_random_string
 from airflow.utils.types import NOTSET, DagRunTriggeredByType, DagRunType
 
 if TYPE_CHECKING:
@@ -621,10 +622,20 @@ class DagRun(Base, LoggingMixin):
         return session.scalars(select(cls).where(cls.dag_id == dag_id, 
cls.run_id == run_id)).one_or_none()
 
     @staticmethod
-    def generate_run_id(run_type: DagRunType, logical_date: datetime) -> str:
-        """Generate Run ID based on Run Type and logical Date."""
+    def generate_run_id(
+        *, run_type: DagRunType, logical_date: datetime | None = None, 
run_after: datetime
+    ) -> str:
+        """
+        Generate Run ID based on Run Type, run_after and logical Date.
+
+        :param run_type: type of DagRun
+        :param logical_date: the logical date
+        :param run_after: the date before which dag run won't start.
+        """
         # _Ensure_ run_type is a DagRunType, not just a string from user code
-        return DagRunType(run_type).generate_run_id(logical_date)
+        if logical_date:
+            return 
DagRunType(run_type).generate_run_id(suffix=run_after.isoformat())
+        return 
DagRunType(run_type).generate_run_id(suffix=f"{run_after.isoformat()}_{get_random_string()}")
 
     @staticmethod
     @provide_session
diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py
index da81090ff9a..d5d863c2984 100644
--- a/airflow/timetables/base.py
+++ b/airflow/timetables/base.py
@@ -281,8 +281,8 @@ class Timetable(Protocol):
         self,
         *,
         run_type: DagRunType,
-        logical_date: DateTime,
+        run_after: DateTime,
         data_interval: DataInterval | None,
         **extra,
     ) -> str:
-        return run_type.generate_run_id(logical_date)
+        return run_type.generate_run_id(suffix=run_after.isoformat())
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 20e8085fe0d..45574daa37e 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -24,7 +24,6 @@ from airflow.utils import timezone
 
 if TYPE_CHECKING:
     from pendulum import DateTime
-    from sqlalchemy import Session
 
     from airflow.models.asset import AssetEvent
     from airflow.sdk.definitions.asset import BaseAsset
@@ -186,15 +185,22 @@ class AssetTriggeredTimetable(_TrivialTimetable):
         self,
         *,
         run_type: DagRunType,
-        logical_date: DateTime,
         data_interval: DataInterval | None,
-        session: Session | None = None,
-        events: Collection[AssetEvent] | None = None,
+        run_after: DateTime,
         **extra,
     ) -> str:
+        """
+        Generate Run ID based on Run Type, run_after and logical Date.
+
+        :param run_type: type of DagRun
+        :param data_interval: the data interval
+        :param run_after: the date before which dag run won't start.
+        """
         from airflow.models.dagrun import DagRun
 
-        return DagRun.generate_run_id(run_type, logical_date)
+        logical_date = data_interval.start if data_interval is not None else 
run_after
+
+        return DagRun.generate_run_id(run_type=run_type, 
logical_date=logical_date, run_after=run_after)
 
     def data_interval_for_events(
         self,
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 3e69736e87d..7d307ac5217 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -5816,7 +5816,7 @@ export const $TriggerDAGRunPostBody = {
       ],
       title: "Dag Run Id",
     },
-    logical_date: {
+    data_interval_start: {
       anyOf: [
         {
           type: "string",
@@ -5826,9 +5826,9 @@ export const $TriggerDAGRunPostBody = {
           type: "null",
         },
       ],
-      title: "Logical Date",
+      title: "Data Interval Start",
     },
-    data_interval_start: {
+    data_interval_end: {
       anyOf: [
         {
           type: "string",
@@ -5838,9 +5838,9 @@ export const $TriggerDAGRunPostBody = {
           type: "null",
         },
       ],
-      title: "Data Interval Start",
+      title: "Data Interval End",
     },
-    data_interval_end: {
+    logical_date: {
       anyOf: [
         {
           type: "string",
@@ -5850,7 +5850,12 @@ export const $TriggerDAGRunPostBody = {
           type: "null",
         },
       ],
-      title: "Data Interval End",
+      title: "Logical Date",
+    },
+    run_after: {
+      type: "string",
+      format: "date-time",
+      title: "Run After",
     },
     conf: {
       type: "object",
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 9a2ed62122a..c3bff10bcd6 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1416,9 +1416,10 @@ export type TimeDelta = {
  */
 export type TriggerDAGRunPostBody = {
   dag_run_id?: string | null;
-  logical_date: string | null;
   data_interval_start?: string | null;
   data_interval_end?: string | null;
+  logical_date: string | null;
+  run_after?: string;
   conf?: {
     [key: string]: unknown;
   };
diff --git a/airflow/utils/types.py b/airflow/utils/types.py
index 46f295c4ee2..28822919e38 100644
--- a/airflow/utils/types.py
+++ b/airflow/utils/types.py
@@ -22,8 +22,6 @@ from typing import TYPE_CHECKING, TypedDict
 import airflow.sdk.definitions._internal.types
 
 if TYPE_CHECKING:
-    from datetime import datetime
-
     from airflow.typing_compat import TypeAlias
 
 ArgNotSet: TypeAlias = airflow.sdk.definitions._internal.types.ArgNotSet
@@ -42,8 +40,13 @@ class DagRunType(str, enum.Enum):
     def __str__(self) -> str:
         return self.value
 
-    def generate_run_id(self, logical_date: datetime) -> str:
-        return f"{self}__{logical_date.isoformat()}"
+    def generate_run_id(self, *, suffix: str) -> str:
+        """
+        Generate a string for DagRun based on suffix string.
+
+        :param suffix: Generate run_id from suffix.
+        """
+        return f"{self}__{suffix}"
 
     @staticmethod
     def from_run_id(run_id: str) -> DagRunType:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f99dcd9161f..b3fae31fefe 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2032,7 +2032,8 @@ class Airflow(AirflowBaseView):
         origin = get_safe_url(request.values.get("origin"))
         unpause = request.values.get("unpause")
         request_conf = request.values.get("conf")
-        request_logical_date = request.values.get("logical_date", 
default=timezone.utcnow().isoformat())
+        request_logical_date = request.values.get("logical_date")
+        request_run_after = request.values.get("run_after", 
default=timezone.utcnow().isoformat())
         is_dag_run_conf_overrides_params = conf.getboolean("core", 
"dag_run_conf_overrides_params")
         dag = get_airflow_app().dag_bag.get_dag(dag_id)
         dag_orm: DagModel = 
session.scalar(select(DagModel).where(DagModel.dag_id == dag_id).limit(1))
@@ -2148,7 +2149,7 @@ class Airflow(AirflowBaseView):
             )
 
         try:
-            logical_date = timezone.parse(request_logical_date, strict=True)
+            logical_date = timezone.parse(request_logical_date, strict=True) 
if request_logical_date else None
         except ParserError:
             flash("Invalid logical date", "error")
             form = DateTimeForm(data={"logical_date": 
timezone.utcnow().isoformat()})
@@ -2160,6 +2161,19 @@ class Airflow(AirflowBaseView):
                 form=form,
             )
 
+        try:
+            run_after = timezone.parse(request_run_after, strict=True)
+        except ParserError:
+            flash("Invalid run_after", "error")
+            form = DateTimeForm(data={"run_after": 
timezone.utcnow().isoformat()})
+            return self.render_template(
+                "airflow/trigger.html",
+                form_fields=form_fields,
+                **render_params,
+                conf=request_conf or {},
+                form=form,
+            )
+
         dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id, 
session=session)
         if dr:
             if dr.run_id == run_id:
@@ -2224,12 +2238,12 @@ class Airflow(AirflowBaseView):
                     "warning",
                 )
 
-        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date or run_after)
         if not run_id:
-            run_id = dag.timetable.generate_run_id(
-                logical_date=logical_date,
-                data_interval=data_interval,
+            run_id = DagRun.generate_run_id(
                 run_type=DagRunType.MANUAL,
+                logical_date=logical_date,
+                run_after=run_after,
             )
 
         try:
diff --git a/docker_tests/test_docker_compose_quick_start.py 
b/docker_tests/test_docker_compose_quick_start.py
index 4b6335facff..15f346e57e3 100644
--- a/docker_tests/test_docker_compose_quick_start.py
+++ b/docker_tests/test_docker_compose_quick_start.py
@@ -101,7 +101,11 @@ def 
test_trigger_dag_and_wait_for_result(default_docker_image, tmp_path_factory,
         compose.execute(service="airflow-scheduler", command=["airflow", 
"scheduler", "-n", "50"])
 
         api_request("PATCH", path=f"dags/{DAG_ID}", json={"is_paused": False})
-        api_request("POST", path=f"dags/{DAG_ID}/dagRuns", json={"dag_run_id": 
DAG_RUN_ID})
+        api_request(
+            "POST",
+            path=f"dags/{DAG_ID}/dagRuns",
+            json={"dag_run_id": DAG_RUN_ID, "logical_date": 
"2020-06-11T18:00:00+00:00"},
+        )
 
         wait_for_terminal_dag_state(dag_id=DAG_ID, dag_run_id=DAG_RUN_ID)
         dag_state = api_request("GET", 
f"dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}").get("state")
diff --git a/kubernetes_tests/test_base.py b/kubernetes_tests/test_base.py
index cadd8954cfb..2e0a96620a1 100644
--- a/kubernetes_tests/test_base.py
+++ b/kubernetes_tests/test_base.py
@@ -264,8 +264,10 @@ class BaseK8STest:
         assert result.status_code == 200, f"Could not enable DAG: 
{result_json}"
         post_string = f"http://{host}/api/v1/dags/{dag_id}/dagRuns";
         print(f"Calling [start_dag]#2 {post_string}")
+
+        logical_date = datetime.now(timezone.utc).isoformat()
         # Trigger a new dagrun
-        result = self.session.post(post_string, json={})
+        result = self.session.post(post_string, json={"logical_date": 
logical_date})
         try:
             result_json = result.json()
         except ValueError:
@@ -292,7 +294,8 @@ class BaseK8STest:
         for dag_run in dag_runs:
             if dag_run["dag_id"] == dag_id:
                 logical_date = dag_run["logical_date"]
+                run_after = dag_run["run_after"]
                 dag_run_id = dag_run["dag_run_id"]
                 break
-        assert logical_date is not None, f"No logical_date can be found for 
the dag with {dag_id}"
+        assert run_after is not None, f"No run_after can be found for the dag 
with {dag_id}"
         return dag_run_id, logical_date
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 9e4cd9e4372..e5637b5a9db 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -50,13 +50,30 @@ POD_MANAGER_CLASS = 
"airflow.providers.cncf.kubernetes.utils.pod_manager.PodMana
 
 
 def create_context(task) -> Context:
+    from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
     dag = DAG(dag_id="dag", schedule=None)
     logical_date = timezone.datetime(2016, 1, 1, 1, 0, 0, 
tzinfo=timezone.parse_timezone("Europe/Amsterdam"))
-    dag_run = DagRun(
-        dag_id=dag.dag_id,
-        logical_date=logical_date,
-        run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
-    )
+
+    if AIRFLOW_V_3_0_PLUS:
+        dag_run = DagRun(
+            dag_id=dag.dag_id,
+            logical_date=logical_date,
+            run_id=DagRun.generate_run_id(
+                run_type=DagRunType.MANUAL,
+                logical_date=logical_date,
+                run_after=logical_date,
+            ),
+        )
+    else:
+        dag_run = DagRun(
+            dag_id=dag.dag_id,
+            logical_date=logical_date,
+            run_id=DagRun.generate_run_id(  # type: ignore[call-arg]
+                run_type=DagRunType.MANUAL,
+                logical_date=logical_date,
+            ),
+        )
     task_instance = TaskInstance(task=task)
     task_instance.dag_run = dag_run
     task_instance.dag_id = dag.dag_id
diff --git 
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py
 
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py
index 5342363f093..691a5496961 100644
--- 
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py
+++ 
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_job.py
@@ -37,6 +37,8 @@ from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.types import DagRunType
 
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
 DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0)
 JOB_OPERATORS_PATH = "airflow.providers.cncf.kubernetes.operators.job.{}"
 HOOK_CLASS = JOB_OPERATORS_PATH.format("KubernetesHook")
@@ -57,11 +59,20 @@ def create_context(task, persist_to_db=False, 
map_index=None):
     else:
         dag = DAG(dag_id="dag", schedule=None, start_date=pendulum.now())
         dag.add_task(task)
-    dag_run = DagRun(
-        run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
-        run_type=DagRunType.MANUAL,
-        dag_id=dag.dag_id,
-    )
+    if AIRFLOW_V_3_0_PLUS:
+        dag_run = DagRun(
+            run_id=DagRun.generate_run_id(
+                run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, 
run_after=DEFAULT_DATE
+            ),
+            run_type=DagRunType.MANUAL,
+            dag_id=dag.dag_id,
+        )
+    else:
+        dag_run = DagRun(
+            run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
+            run_type=DagRunType.MANUAL,
+            dag_id=dag.dag_id,
+        )
     task_instance = TaskInstance(task=task, run_id=dag_run.run_id)
     task_instance.dag_run = dag_run
     if map_index is not None:
diff --git 
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py
 
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py
index 309d2abe642..f447651585e 100644
--- 
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py
+++ 
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_pod.py
@@ -100,7 +100,9 @@ def create_context(task, persist_to_db=False, 
map_index=None):
     now = timezone.utcnow()
     if AIRFLOW_V_3_0_PLUS:
         dag_run = DagRun(
-            run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
+            run_id=DagRun.generate_run_id(
+                run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, 
run_after=DEFAULT_DATE
+            ),
             run_type=DagRunType.MANUAL,
             dag_id=dag.dag_id,
             logical_date=now,
diff --git 
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py
 
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py
index c6814222c47..b7172f87346 100644
--- 
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py
+++ 
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -177,7 +177,9 @@ def create_context(task):
         dag_run = DagRun(
             dag_id=dag.dag_id,
             logical_date=logical_date,
-            run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+            run_id=DagRun.generate_run_id(
+                run_type=DagRunType.MANUAL, logical_date=logical_date, 
run_after=logical_date
+            ),
         )
     else:
         dag_run = DagRun(
diff --git 
a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
 
b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
index 02057310907..26e607c8c71 100644
--- 
a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
+++ 
b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py
@@ -208,6 +208,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time,
+            "run_after": self.default_time,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
@@ -224,6 +225,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time_2,
+            "run_after": self.default_time_2,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
diff --git 
a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
 
b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
index 852c4288bbe..9f4c93549ce 100644
--- 
a/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
+++ 
b/providers/fab/tests/provider_tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py
@@ -128,12 +128,22 @@ class TestGetXComEntries(TestXComEndpoint):
         task_id_1 = "test-task-id-1"
         logical_date = "2005-04-02T00:00:00+00:00"
         logical_date_parsed = timezone.parse(logical_date)
-        dag_run_id_1 = DagRun.generate_run_id(DagRunType.MANUAL, 
logical_date_parsed)
+        run_after = "2005-04-02T00:00:00+00:00"
+        run_after_parsed = timezone.parse(run_after)
+        dag_run_id_1 = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL,
+            logical_date=logical_date_parsed,
+            run_after=run_after_parsed,
+        )
         self._create_xcom_entries(dag_id_1, dag_run_id_1, logical_date_parsed, 
task_id_1)
 
         dag_id_2 = "test-dag-id-2"
         task_id_2 = "test-task-id-2"
-        run_id_2 = DagRun.generate_run_id(DagRunType.MANUAL, 
logical_date_parsed)
+        run_id_2 = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL,
+            logical_date=logical_date_parsed,
+            run_after=run_after_parsed,
+        )
         self._create_xcom_entries(dag_id_2, run_id_2, logical_date_parsed, 
task_id_2)
         self._create_invalid_xcom_entries(logical_date_parsed)
         response = self.client.get(
diff --git 
a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py
 
b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py
index 9eb8a682656..1a5721e9d08 100644
--- 
a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py
+++ 
b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/operators/test_data_factory.py
@@ -327,7 +327,9 @@ class TestAzureDataFactoryRunPipelineOperatorWithDeferrable:
             dag_run = DagRun(
                 dag_id=dag.dag_id,
                 logical_date=logical_date,
-                run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.MANUAL, logical_date=logical_date, 
run_after=logical_date
+                ),
             )
         else:
             dag_run = DagRun(
diff --git 
a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py
 
b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py
index e05f9454c95..f89b2319d11 100644
--- 
a/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py
+++ 
b/providers/microsoft/azure/tests/provider_tests/microsoft/azure/sensors/test_wasb.py
@@ -119,7 +119,9 @@ class TestWasbBlobAsyncSensor:
             dag_run = DagRun(
                 dag_id=dag.dag_id,
                 logical_date=logical_date,
-                run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.MANUAL, logical_date=logical_date, 
run_after=logical_date
+                ),
             )
 
         task_instance = TaskInstance(task=task)
@@ -262,7 +264,9 @@ class TestWasbPrefixAsyncSensor:
             dag_run = DagRun(
                 dag_id=dag.dag_id,
                 logical_date=logical_date,
-                run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.MANUAL, logical_date=logical_date, 
run_after=logical_date
+                ),
             )
 
         task_instance = TaskInstance(task=task)
diff --git 
a/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py
 
b/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py
index 43795f22ace..be66ff67ca8 100644
--- 
a/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py
+++ 
b/providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py
@@ -183,7 +183,9 @@ def create_context(task, dag=None):
         dag_run = DagRun(
             dag_id=dag.dag_id,
             logical_date=logical_date,
-            run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+            run_id=DagRun.generate_run_id(
+                run_type=DagRunType.MANUAL, logical_date=logical_date, 
run_after=logical_date
+            ),
         )
     else:
         dag_run = DagRun(
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py 
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
index 053a6011b63..b5ac483d1f2 100644
--- 
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
+++ 
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -192,7 +192,11 @@ class TriggerDagRunOperator(BaseOperator):
         if self.trigger_run_id:
             run_id = str(self.trigger_run_id)
         else:
-            run_id = DagRun.generate_run_id(DagRunType.MANUAL, 
parsed_logical_date)
+            run_id = DagRun.generate_run_id(
+                run_type=DagRunType.MANUAL,
+                logical_date=parsed_logical_date,
+                run_after=parsed_logical_date,
+            )
 
         try:
             dag_run = trigger_dag(
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 3ce4019c824..94028976e15 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -109,6 +109,7 @@ class TestDagRunEndpoint:
                 run_id=f"TEST_DAG_RUN_ID_{i}",
                 run_type=DagRunType.MANUAL,
                 logical_date=timezone.parse(self.default_time) + 
timedelta(days=i - 1),
+                run_after=timezone.parse(self.default_time) + timedelta(days=i 
- 1),
                 start_date=timezone.parse(self.default_time),
                 external_trigger=True,
                 state=state,
@@ -126,6 +127,7 @@ class TestDagRunEndpoint:
                         run_id=f"TEST_DAG_RUN_ID_{i}",
                         run_type=DagRunType.MANUAL,
                         logical_date=timezone.parse(self.default_time_2),
+                        run_after=timezone.parse(self.default_time_2),
                         start_date=timezone.parse(self.default_time),
                         external_trigger=True,
                         state=state,
@@ -190,6 +192,7 @@ class TestGetDagRun(TestDagRunEndpoint):
             run_id="TEST_DAG_RUN_ID",
             run_type=DagRunType.MANUAL,
             logical_date=timezone.parse(self.default_time),
+            run_after=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
             external_trigger=True,
             state="running",
@@ -201,6 +204,7 @@ class TestGetDagRun(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time,
+            "run_after": self.default_time,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
@@ -240,6 +244,7 @@ class TestGetDagRun(TestDagRunEndpoint):
             run_id="TEST_DAG_RUN_ID",
             run_type=DagRunType.MANUAL,
             logical_date=timezone.parse(self.default_time),
+            run_after=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
             external_trigger=True,
         )
@@ -263,6 +268,7 @@ class TestGetDagRun(TestDagRunEndpoint):
             run_id="TEST_DAG_RUN_ID",
             run_type=DagRunType.MANUAL,
             logical_date=timezone.parse(self.default_time),
+            run_after=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
             external_trigger=True,
             state="running",
@@ -287,6 +293,7 @@ class TestGetDagRun(TestDagRunEndpoint):
             run_id="TEST_DAG_RUN_ID",
             run_type=DagRunType.MANUAL,
             logical_date=timezone.parse(self.default_time),
+            run_after=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
             external_trigger=True,
             state="running",
@@ -312,6 +319,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time,
+            "run_after": self.default_time,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
@@ -328,6 +336,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time_2,
+            "run_after": self.default_time_2,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
@@ -382,6 +391,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time_2,
+            "run_after": self.default_time_2,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
@@ -398,6 +408,7 @@ class TestGetDagRuns(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time,
+            "run_after": self.default_time,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
@@ -550,6 +561,7 @@ class TestGetDagRunsPagination(TestDagRunEndpoint):
                 run_id=f"TEST_DAG_RUN_ID{i}",
                 run_type=DagRunType.MANUAL,
                 logical_date=timezone.parse(self.default_time) + 
timedelta(minutes=i),
+                run_after=timezone.parse(self.default_time) + 
timedelta(minutes=i),
                 start_date=timezone.parse(self.default_time),
                 external_trigger=True,
             )
@@ -655,6 +667,7 @@ class TestGetDagRunsPaginationFilters(TestDagRunEndpoint):
                 run_id=f"TEST_START_EXEC_DAY_1{i}",
                 run_type=DagRunType.MANUAL,
                 logical_date=timezone.parse(dates[i]),
+                run_after=timezone.parse(dates[i]),
                 start_date=timezone.parse(dates[i]),
                 external_trigger=True,
                 state=DagRunState.SUCCESS,
@@ -699,6 +712,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time,
+            "run_after": self.default_time,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
@@ -716,6 +730,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time_2,
+            "run_after": self.default_time_2,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
@@ -777,6 +792,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time_2,
+            "run_after": self.default_time_2,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
@@ -793,6 +809,7 @@ class TestGetDagRunBatch(TestDagRunEndpoint):
             "end_date": None,
             "state": "running",
             "logical_date": self.default_time,
+            "run_after": self.default_time,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
@@ -925,6 +942,7 @@ class TestGetDagRunBatchPagination(TestDagRunEndpoint):
                 state="running",
                 run_type=DagRunType.MANUAL,
                 logical_date=timezone.parse(self.default_time) + 
timedelta(minutes=i),
+                run_after=timezone.parse(self.default_time) + 
timedelta(minutes=i),
                 start_date=timezone.parse(self.default_time),
                 external_trigger=True,
             )
@@ -1008,6 +1026,7 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
                 run_id=f"TEST_START_EXEC_DAY_1{i}",
                 run_type=DagRunType.MANUAL,
                 logical_date=timezone.parse(date),
+                run_after=timezone.parse(date),
                 start_date=timezone.parse(date),
                 external_trigger=True,
                 state="success",
@@ -1084,21 +1103,36 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
 class TestPostDagRun(TestDagRunEndpoint):
     @time_machine.travel(timezone.utcnow(), tick=False)
     @pytest.mark.parametrize(
-        "dag_run_id, logical_date, note, data_interval_start, 
data_interval_end",
+        "dag_run_id, logical_date, run_after, note, data_interval_start, 
data_interval_end",
         [
             pytest.param(
-                "TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", 
None, None, id="all-present"
+                "TEST_DAG_RUN",
+                "2020-06-11T18:00:00+00:00",
+                "2020-06-11T18:00:00+00:00",
+                "test-note",
+                None,
+                None,
+                id="all-present",
             ),
             pytest.param(
                 "TEST_DAG_RUN",
                 "2024-06-11T18:00:00+00:00",
+                "2024-06-11T18:00:00+00:00",
                 "test-note",
                 "2024-01-03T00:00:00+00:00",
                 "2024-01-04T05:00:00+00:00",
                 id="all-present-with-dates",
             ),
-            pytest.param(None, "2020-06-11T18:00:00+00:00", None, None, None, 
id="only-date"),
-            pytest.param(None, None, None, None, None, id="all-missing"),
+            pytest.param(
+                None,
+                "2020-06-11T18:00:00+00:00",
+                "2020-06-11T18:00:00+00:00",
+                None,
+                None,
+                None,
+                id="only-date",
+            ),
+            pytest.param(None, None, "2020-06-11T18:00:00+00:00", None, None, 
None, id="all-missing"),
         ],
     )
     def test_should_respond_200(
@@ -1106,20 +1140,18 @@ class TestPostDagRun(TestDagRunEndpoint):
         session,
         dag_run_id,
         logical_date,
+        run_after,
         note,
         data_interval_start,
         data_interval_end,
     ):
         self._create_dag("TEST_DAG_ID")
-
-        # We freeze time for this test, so we could check it into the returned 
dates.
-        fixed_now = timezone.utcnow()
-
         # raise NotImplementedError("TODO: Add tests for data_interval_start 
and data_interval_end")
 
         request_json = {}
         if logical_date is not None:
             request_json["logical_date"] = logical_date
+        request_json["run_after"] = run_after
         if dag_run_id is not None:
             request_json["dag_run_id"] = dag_run_id
         if data_interval_start is not None:
@@ -1136,12 +1168,11 @@ class TestPostDagRun(TestDagRunEndpoint):
 
         assert response.status_code == 200
 
-        if logical_date is None:
-            expected_logical_date = fixed_now.isoformat()
-        else:
-            expected_logical_date = logical_date
+        expected_logical_date = logical_date if logical_date is not None else 
None
+
+        # when logical_date is null, run_id is run_after + random string.
         if dag_run_id is None:
-            expected_dag_run_id = f"manual__{expected_logical_date}"
+            expected_dag_run_id = f"manual__{run_after}"
         else:
             expected_dag_run_id = dag_run_id
 
@@ -1157,6 +1188,7 @@ class TestPostDagRun(TestDagRunEndpoint):
             "dag_run_id": expected_dag_run_id,
             "end_date": None,
             "logical_date": expected_logical_date,
+            "run_after": run_after,
             "external_trigger": True,
             "start_date": None,
             "state": "queued",
@@ -1167,8 +1199,15 @@ class TestPostDagRun(TestDagRunEndpoint):
             "note": note,
         }
         expected_response_json.update({"triggered_by": "rest_api"} if 
AIRFLOW_V_3_0_PLUS else {})
+        response_json = response.json
+        for key in expected_response_json:
+            if key != "dag_run_id":
+                assert response_json[key] == expected_response_json[key], 
f"Mismatch on key {key}"
 
-        assert response.json == expected_response_json
+        assert response_json["dag_run_id"].startswith(expected_dag_run_id), (
+            f"dag_run_id '{response_json['dag_run_id']}' does not start with 
expected prefix "
+            f"'{expected_dag_run_id}'"
+        )
         _check_last_log(session, dag_id="TEST_DAG_ID", 
event="api.post_dag_run", logical_date=None)
 
     def test_raises_validation_error_for_invalid_request(self):
@@ -1255,6 +1294,7 @@ class TestPostDagRun(TestDagRunEndpoint):
             "api/v1/dags/TEST_DAG_ID/dagRuns",
             json={
                 "logical_date": logical_date,
+                "run_after": logical_date,
             },
             environ_overrides={"REMOTE_USER": "test"},
         )
@@ -1266,6 +1306,7 @@ class TestPostDagRun(TestDagRunEndpoint):
             "dag_run_id": dag_run_id,
             "end_date": None,
             "logical_date": logical_date,
+            "run_after": logical_date,
             "external_trigger": True,
             "start_date": None,
             "state": "queued",
@@ -1512,6 +1553,7 @@ class TestPatchDagRunState(TestDagRunEndpoint):
             "dag_run_id": dag_run_id,
             "end_date": dr.end_date.isoformat() if state != State.QUEUED else 
None,
             "logical_date": dr.logical_date.isoformat(),
+            "run_after": dr.run_after.isoformat(),
             "external_trigger": False,
             "start_date": dr.start_date.isoformat() if state != State.QUEUED 
else None,
             "state": state,
@@ -1686,6 +1728,7 @@ class TestClearDagRun(TestDagRunEndpoint):
             "end_date": None,
             "external_trigger": False,
             "logical_date": dr.logical_date.isoformat(),
+            "run_after": dr.run_after.isoformat(),
             "start_date": None,
             "state": "queued",
             "data_interval_start": dr.data_interval_start.isoformat(),
@@ -1908,6 +1951,7 @@ class TestSetDagRunNote(TestDagRunEndpoint):
             "end_date": dr.end_date.isoformat(),
             "external_trigger": True,
             "logical_date": self.default_time,
+            "run_after": self.default_time,
             "start_date": self.default_time,
             "state": "success",
             "data_interval_start": None,
@@ -1936,6 +1980,7 @@ class TestSetDagRunNote(TestDagRunEndpoint):
             "dag_run_id": dr.run_id,
             "end_date": dr.end_date.isoformat(),
             "logical_date": self.default_time,
+            "run_after": self.default_time,
             "external_trigger": True,
             "start_date": self.default_time,
             "state": "success",
diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py 
b/tests/api_connexion/endpoints/test_xcom_endpoint.py
index 784bc6142d4..598d3a99546 100644
--- a/tests/api_connexion/endpoints/test_xcom_endpoint.py
+++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py
@@ -110,9 +110,13 @@ class TestGetXComEntry(TestXComEndpoint):
         dag_id = "test-dag-id"
         task_id = "test-task-id"
         logical_date = "2005-04-02T00:00:00+00:00"
+        run_after = "2005-04-02T00:00:00+00:00"
         xcom_key = "test-xcom-key"
         logical_date_parsed = timezone.parse(logical_date)
-        run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+        run_after_parsed = timezone.parse(run_after)
+        run_id = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
         self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id, 
xcom_key, {"key": "value"})
         response = self.client.get(
             
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
@@ -136,9 +140,13 @@ class TestGetXComEntry(TestXComEndpoint):
         dag_id = "test-dag-id"
         task_id = "test-task-id"
         logical_date = "2005-04-02T00:00:00+00:00"
+        run_after = "2005-04-02T00:00:00+00:00"
         xcom_key = "test-xcom-key"
         logical_date_parsed = timezone.parse(logical_date)
-        run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+        run_after_parsed = timezone.parse(run_after)
+        run_id = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
         self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id, 
xcom_key, {"key": "value"})
         response = self.client.get(
             
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}?stringify=false",
@@ -162,9 +170,13 @@ class TestGetXComEntry(TestXComEndpoint):
         dag_id = "test-dag-id"
         task_id = "test-task-id"
         logical_date = "2005-04-02T00:00:00+00:00"
+        run_after = "2005-04-02T00:00:00+00:00"
         xcom_key = "test-xcom-key"
         logical_date_parsed = timezone.parse(logical_date)
-        run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+        run_after_parsed = timezone.parse(run_after)
+        run_id = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
         self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id, 
xcom_key)
         response = self.client.get(
             
f"/api/v1/dags/nonexistentdagid/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
@@ -177,9 +189,13 @@ class TestGetXComEntry(TestXComEndpoint):
         dag_id = "test-dag-id"
         task_id = "test-task-id"
         logical_date = "2005-04-02T00:00:00+00:00"
+        run_after = "2005-04-02T00:00:00+00:00"
         xcom_key = "test-xcom-key"
         logical_date_parsed = timezone.parse(logical_date)
-        run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+        run_after_parsed = timezone.parse(run_after)
+        run_id = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
         self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id, 
xcom_key)
         response = self.client.get(
             
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}"
@@ -191,9 +207,13 @@ class TestGetXComEntry(TestXComEndpoint):
         dag_id = "test-dag-id"
         task_id = "test-task-id"
         logical_date = "2005-04-02T00:00:00+00:00"
+        run_after = "2005-04-02T00:00:00+00:00"
         xcom_key = "test-xcom-key"
         logical_date_parsed = timezone.parse(logical_date)
-        run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+        run_after_parsed = timezone.parse(run_after)
+        run_id = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
 
         self._create_xcom_entry(dag_id, run_id, logical_date_parsed, task_id, 
xcom_key)
         response = self.client.get(
@@ -303,8 +323,12 @@ class TestGetXComEntries(TestXComEndpoint):
         dag_id = "test-dag-id"
         task_id = "test-task-id"
         logical_date = "2005-04-02T00:00:00+00:00"
+        run_after = "2005-04-02T00:00:00+00:00"
         logical_date_parsed = timezone.parse(logical_date)
-        run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+        run_after_parsed = timezone.parse(run_after)
+        run_id = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
 
         self._create_xcom_entries(dag_id, run_id, logical_date_parsed, task_id)
         response = self.client.get(
@@ -345,13 +369,19 @@ class TestGetXComEntries(TestXComEndpoint):
         dag_id_1 = "test-dag-id-1"
         task_id_1 = "test-task-id-1"
         logical_date = "2005-04-02T00:00:00+00:00"
+        run_after = "2005-04-02T00:00:00+00:00"
         logical_date_parsed = timezone.parse(logical_date)
-        run_id_1 = DagRun.generate_run_id(DagRunType.MANUAL, 
logical_date_parsed)
+        run_after_parsed = timezone.parse(run_after)
+        run_id_1 = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
         self._create_xcom_entries(dag_id_1, run_id_1, logical_date_parsed, 
task_id_1)
 
         dag_id_2 = "test-dag-id-2"
         task_id_2 = "test-task-id-2"
-        run_id_2 = DagRun.generate_run_id(DagRunType.MANUAL, 
logical_date_parsed)
+        run_id_2 = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
         self._create_xcom_entries(dag_id_2, run_id_2, logical_date_parsed, 
task_id_2)
 
         response = self.client.get(
@@ -409,7 +439,11 @@ class TestGetXComEntries(TestXComEndpoint):
         task_id = "test-task-id"
         logical_date = "2005-04-02T00:00:00+00:00"
         logical_date_parsed = timezone.parse(logical_date)
-        dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, 
logical_date_parsed)
+        run_after = "2005-04-02T00:00:00+00:00"
+        run_after_parsed = timezone.parse(run_after)
+        dag_run_id = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
         self._create_xcom_entries(dag_id, dag_run_id, logical_date_parsed, 
task_id, mapped_ti=True)
 
         def assert_expected_result(expected_entries, map_index=None):
@@ -453,7 +487,11 @@ class TestGetXComEntries(TestXComEndpoint):
         task_id = "test-task-id"
         logical_date = "2005-04-02T00:00:00+00:00"
         logical_date_parsed = timezone.parse(logical_date)
-        dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, 
logical_date_parsed)
+        run_after = "2005-04-02T00:00:00+00:00"
+        run_after_parsed = timezone.parse(run_after)
+        dag_run_id = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
         self._create_xcom_entries(dag_id, dag_run_id, logical_date_parsed, 
task_id, mapped_ti=True)
 
         def assert_expected_result(expected_entries, key=None):
@@ -495,7 +533,11 @@ class TestGetXComEntries(TestXComEndpoint):
         task_id = "test-task-id"
         logical_date = "2005-04-02T00:00:00+00:00"
         logical_date_parsed = timezone.parse(logical_date)
-        run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+        run_after = "2005-04-02T00:00:00+00:00"
+        run_after_parsed = timezone.parse(run_after)
+        run_id = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+        )
         self._create_xcom_entries(dag_id, run_id, logical_date_parsed, task_id)
 
         response = self.client.get(
@@ -580,7 +622,11 @@ class TestPaginationGetXComEntries(TestXComEndpoint):
         self.task_id = "test-task-id"
         self.logical_date = "2005-04-02T00:00:00+00:00"
         self.logical_date_parsed = timezone.parse(self.logical_date)
-        self.run_id = DagRun.generate_run_id(DagRunType.MANUAL, 
self.logical_date_parsed)
+        run_after = "2005-04-02T00:00:00+00:00"
+        run_after_parsed = timezone.parse(run_after)
+        self.run_id = DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=self.logical_date_parsed, 
run_after=run_after_parsed
+        )
 
     @pytest.mark.parametrize(
         "query_params, expected_xcom_ids",
diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py 
b/tests/api_connexion/schemas/test_dag_run_schema.py
index 61b3dac6350..251946d17fa 100644
--- a/tests/api_connexion/schemas/test_dag_run_schema.py
+++ b/tests/api_connexion/schemas/test_dag_run_schema.py
@@ -63,6 +63,7 @@ class TestDAGRunSchema(TestDAGRunBase):
             state="running",
             run_type=DagRunType.MANUAL.value,
             logical_date=timezone.parse(self.default_time),
+            run_after=timezone.parse(self.default_time),
             start_date=timezone.parse(self.default_time),
             conf='{"start": "stop"}',
             **triggered_by_kwargs,
@@ -85,6 +86,7 @@ class TestDAGRunSchema(TestDAGRunBase):
             "last_scheduling_decision": None,
             "run_type": "manual",
             "note": None,
+            "run_after": self.default_time,
         }
         expected_deserialized_dagrun.update({"triggered_by": "test"} if 
AIRFLOW_V_3_0_PLUS else {})
 
@@ -94,30 +96,30 @@ class TestDAGRunSchema(TestDAGRunBase):
         "serialized_dagrun, expected_result",
         [
             (  # Conf not provided
-                {"dag_run_id": "my-dag-run", "logical_date": DEFAULT_TIME},
-                {"run_id": "my-dag-run", "logical_date": parse(DEFAULT_TIME)},
+                {"dag_run_id": "my-dag-run", "run_after": DEFAULT_TIME},
+                {"run_id": "my-dag-run", "run_after": parse(DEFAULT_TIME)},
             ),
             (
                 {
                     "dag_run_id": "my-dag-run",
-                    "logical_date": DEFAULT_TIME,
+                    "run_after": DEFAULT_TIME,
                     "conf": {"start": "stop"},
                 },
                 {
                     "run_id": "my-dag-run",
-                    "logical_date": parse(DEFAULT_TIME),
+                    "run_after": parse(DEFAULT_TIME),
                     "conf": {"start": "stop"},
                 },
             ),
             (
                 {
                     "dag_run_id": "my-dag-run",
-                    "logical_date": DEFAULT_TIME,
+                    "run_after": DEFAULT_TIME,
                     "conf": '{"start": "stop"}',
                 },
                 {
                     "run_id": "my-dag-run",
-                    "logical_date": parse(DEFAULT_TIME),
+                    "run_after": parse(DEFAULT_TIME),
                     "conf": {"start": "stop"},
                 },
             ),
@@ -131,7 +133,7 @@ class TestDAGRunSchema(TestDAGRunBase):
         """Dag_run_id and logical_date fields are autogenerated if missing"""
         serialized_dagrun = {}
         result = dagrun_schema.load(serialized_dagrun)
-        assert result == {"logical_date": result["logical_date"], "run_id": 
result["run_id"]}
+        assert result == {"run_after": result["run_after"], "run_id": 
result["run_id"]}
 
     def test_invalid_logical_date_raises(self):
         serialized_dagrun = {"logical_date": "mydate"}
@@ -151,6 +153,7 @@ class TestDagRunCollection(TestDAGRunBase):
             run_id="my-dag-run",
             state="running",
             logical_date=timezone.parse(self.default_time),
+            run_after=timezone.parse(self.default_time),
             run_type=DagRunType.MANUAL.value,
             start_date=timezone.parse(self.default_time),
             conf='{"start": "stop"}',
@@ -176,6 +179,7 @@ class TestDagRunCollection(TestDAGRunBase):
             "dag_run_id": "my-dag-run",
             "end_date": None,
             "logical_date": self.default_time,
+            "run_after": self.default_time,
             "external_trigger": True,
             "state": "running",
             "start_date": self.default_time,
@@ -195,6 +199,7 @@ class TestDagRunCollection(TestDAGRunBase):
             "end_date": None,
             "state": "running",
             "logical_date": self.second_time,
+            "run_after": self.second_time,
             "external_trigger": True,
             "start_date": self.default_time,
             "conf": {},
diff --git a/tests/api_fastapi/core_api/routes/public/test_xcom.py 
b/tests/api_fastapi/core_api/routes/public/test_xcom.py
index e3d9b3641a9..dd5a073c1ba 100644
--- a/tests/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/tests/api_fastapi/core_api/routes/public/test_xcom.py
@@ -49,7 +49,10 @@ TEST_TASK_ID_2 = "test-task-id-2"
 
 logical_date_parsed = timezone.parse(TEST_EXECUTION_DATE)
 logical_date_formatted = logical_date_parsed.strftime("%Y-%m-%dT%H:%M:%SZ")
-run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
+run_after_parsed = timezone.parse(TEST_EXECUTION_DATE)
+run_id = DagRun.generate_run_id(
+    run_type=DagRunType.MANUAL, logical_date=logical_date_parsed, 
run_after=run_after_parsed
+)
 
 
 @provide_session
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 8d82a249aec..76b1ceb0e9a 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -138,16 +138,17 @@ def create_dagrun(session):
         state: DagRunState = DagRunState.RUNNING,
         start_date: datetime | None = None,
     ) -> DagRun:
-        run_id = dag.timetable.generate_run_id(
+        run_after = logical_date or timezone.utcnow()
+        run_id = DagRun.generate_run_id(
             run_type=run_type,
             logical_date=logical_date,
-            data_interval=data_interval,
+            run_after=run_after,
         )
         return dag.create_dagrun(
             run_id=run_id,
             logical_date=logical_date,
             data_interval=data_interval,
-            run_after=data_interval.end,
+            run_after=run_after,
             run_type=run_type,
             state=state,
             start_date=start_date,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index af815279737..aee56bee883 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -156,7 +156,7 @@ def _create_dagrun(
         data_interval = DataInterval(*map(timezone.coerce_datetime, 
data_interval))
     run_id = dag.timetable.generate_run_id(
         run_type=run_type,
-        logical_date=logical_date,  # type: ignore
+        run_after=logical_date or data_interval.end,  # type: ignore
         data_interval=data_interval,
     )
     return dag.create_dagrun(
@@ -2974,7 +2974,7 @@ def 
test_get_asset_triggered_next_run_info_with_unresolved_asset_alias(dag_maker
 )
 def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: 
DagRunType) -> None:
     dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule="@daily")
-    run_id = run_id_type.generate_run_id(DEFAULT_DATE)
+    run_id = DagRun.generate_run_id(run_type=run_id_type, 
run_after=DEFAULT_DATE, logical_date=DEFAULT_DATE)
 
     with pytest.raises(ValueError) as ctx:
         dag.create_dagrun(
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index f4324abf485..b7328ba3713 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -111,7 +111,7 @@ class TestDagRun:
         dag_run = dag.create_dagrun(
             run_id=dag.timetable.generate_run_id(
                 run_type=run_type,
-                logical_date=logical_date,
+                run_after=logical_date,
                 data_interval=data_interval,
             ),
             run_type=run_type,
@@ -863,7 +863,7 @@ class TestDagRun:
         dr = dag.create_dagrun(
             run_id=dag.timetable.generate_run_id(
                 run_type=DagRunType.SCHEDULED,
-                logical_date=DEFAULT_DATE,
+                run_after=DEFAULT_DATE,
                 data_interval=dag.infer_automated_data_interval(DEFAULT_DATE),
             ),
             run_type=DagRunType.SCHEDULED,
@@ -941,7 +941,7 @@ class TestDagRun:
             dag_run = dag.create_dagrun(
                 run_id=dag.timetable.generate_run_id(
                     run_type=DagRunType.SCHEDULED,
-                    logical_date=dag.start_date,
+                    run_after=dag.start_date,
                     
data_interval=dag.infer_automated_data_interval(dag.start_date),
                 ),
                 run_type=DagRunType.SCHEDULED,
@@ -1051,7 +1051,7 @@ def 
test_verify_integrity_task_start_and_end_date(Stats_incr, session, run_type,
         dag_id=dag.dag_id,
         run_type=run_type,
         logical_date=DEFAULT_DATE,
-        run_id=DagRun.generate_run_id(run_type, DEFAULT_DATE),
+        run_id=DagRun.generate_run_id(run_type=run_type, 
logical_date=DEFAULT_DATE, run_after=DEFAULT_DATE),
     )
     dag_run.dag = dag
 
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 3d985394646..2d9d1a6edcb 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -3611,7 +3611,12 @@ class TestTaskInstance:
         assert os.environ["AIRFLOW_CTX_DAG_ID"] == "test_echo_env_variables"
         assert os.environ["AIRFLOW_CTX_TASK_ID"] == "hive_in_python_op"
         assert DEFAULT_DATE.isoformat() == 
os.environ["AIRFLOW_CTX_LOGICAL_DATE"]
-        assert DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE) == 
os.environ["AIRFLOW_CTX_DAG_RUN_ID"]
+        assert (
+            DagRun.generate_run_id(
+                run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, 
run_after=DEFAULT_DATE
+            )
+            == os.environ["AIRFLOW_CTX_DAG_RUN_ID"]
+        )
 
     def test_echo_env_variables(self, dag_maker):
         with dag_maker(
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index ea0d95290f0..45d7b264c44 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -58,7 +58,9 @@ def reset_db():
 @pytest.fixture
 def task_instance_factory(request, session: Session):
     def func(*, dag_id, task_id, logical_date):
-        run_id = DagRun.generate_run_id(DagRunType.SCHEDULED, logical_date)
+        run_id = DagRun.generate_run_id(
+            run_type=DagRunType.SCHEDULED, logical_date=logical_date, 
run_after=logical_date
+        )
         run = DagRun(
             dag_id=dag_id,
             run_type=DagRunType.SCHEDULED,
diff --git a/tests/operators/test_trigger_dagrun.py 
b/tests/operators/test_trigger_dagrun.py
index db86eb0fa7d..1c72dae332a 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -125,7 +125,9 @@ class TestDagRunOperator:
 
         dagrun = dag_maker.session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).one()
         assert dagrun.external_trigger
-        assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
dagrun.logical_date)
+        assert dagrun.run_id == DagRun.generate_run_id(
+            run_type=DagRunType.MANUAL, logical_date=dagrun.logical_date, 
run_after=dagrun.logical_date
+        )
         self.assert_extra_link(dagrun, task, dag_maker.session)
 
     def test_trigger_dagrun_custom_run_id(self, dag_maker):
@@ -167,7 +169,9 @@ class TestDagRunOperator:
             dagrun = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).one()
             assert dagrun.external_trigger
             assert dagrun.logical_date == custom_logical_date
-            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
custom_logical_date)
+            assert dagrun.run_id == DagRun.generate_run_id(
+                run_type=DagRunType.MANUAL, logical_date=custom_logical_date, 
run_after=custom_logical_date
+            )
             self.assert_extra_link(dagrun, task, session)
 
     def test_trigger_dagrun_twice(self, dag_maker):
diff --git a/tests/sensors/test_external_task_sensor.py 
b/tests/sensors/test_external_task_sensor.py
index d6bf001c5bd..239eca8cdc1 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -115,7 +115,7 @@ class TestExternalTaskSensor:
         self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
         self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
         self.dag = DAG(TEST_DAG_ID, schedule=None, default_args=self.args)
-        self.dag_run_id = DagRunType.MANUAL.generate_run_id(DEFAULT_DATE)
+        self.dag_run_id = 
DagRunType.MANUAL.generate_run_id(suffix=DEFAULT_DATE.isoformat())
 
     def add_time_sensor(self, task_id=TEST_TASK_ID):
         op = TimeSensor(task_id=task_id, target_time=time(0), dag=self.dag)
@@ -1239,7 +1239,7 @@ def run_tasks(
         runs[dag.dag_id] = dagrun = dag.create_dagrun(
             run_id=dag.timetable.generate_run_id(
                 run_type=DagRunType.MANUAL,
-                logical_date=logical_date,
+                run_after=logical_date,
                 data_interval=data_interval,
             ),
             logical_date=logical_date,
diff --git a/tests/timetables/test_assets_timetable.py 
b/tests/timetables/test_assets_timetable.py
index 541ef2abb6e..0158ddfd5cf 100644
--- a/tests/timetables/test_assets_timetable.py
+++ b/tests/timetables/test_assets_timetable.py
@@ -208,7 +208,11 @@ def test_generate_run_id(asset_timetable: 
AssetOrTimeSchedule) -> None:
     :param asset_timetable: The AssetOrTimeSchedule instance to test.
     """
     run_id = asset_timetable.generate_run_id(
-        run_type=DagRunType.MANUAL, extra_args="test", 
logical_date=DateTime.now(), data_interval=None
+        run_type=DagRunType.MANUAL,
+        extra_args="test",
+        logical_date=DateTime.now(),
+        run_after=DateTime.now(),
+        data_interval=None,
     )
     assert isinstance(run_id, str)
 
diff --git a/tests/utils/test_state.py b/tests/utils/test_state.py
index de393f38c35..035740db46f 100644
--- a/tests/utils/test_state.py
+++ b/tests/utils/test_state.py
@@ -41,7 +41,7 @@ def test_dagrun_state_enum_escape():
         dag.create_dagrun(
             run_id=dag.timetable.generate_run_id(
                 run_type=DagRunType.SCHEDULED,
-                logical_date=DEFAULT_DATE,
+                run_after=DEFAULT_DATE,
                 
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
             ),
             run_type=DagRunType.SCHEDULED,
diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py
index 150b48f7459..76ddb082283 100644
--- a/tests_common/pytest_plugin.py
+++ b/tests_common/pytest_plugin.py
@@ -929,11 +929,18 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
                 if "run_type" not in kwargs:
                     kwargs["run_id"] = "test"
                 else:
-                    kwargs["run_id"] = dag.timetable.generate_run_id(
-                        run_type=run_type,
-                        logical_date=logical_date,
-                        data_interval=data_interval,
-                    )
+                    if AIRFLOW_V_3_0_PLUS:
+                        kwargs["run_id"] = dag.timetable.generate_run_id(
+                            run_type=run_type,
+                            run_after=logical_date or 
timezone.coerce_datetime(timezone.utcnow()),
+                            data_interval=data_interval,
+                        )
+                    else:
+                        kwargs["run_id"] = dag.timetable.generate_run_id(
+                            run_type=run_type,
+                            logical_date=logical_date or 
timezone.coerce_datetime(timezone.utcnow()),
+                            data_interval=data_interval,
+                        )
             kwargs["run_type"] = run_type
 
             if AIRFLOW_V_3_0_PLUS:

Reply via email to