Lee-W commented on code in PR #46512:
URL: https://github.com/apache/airflow/pull/46512#discussion_r1948853445


##########
airflow/models/dag.py:
##########
@@ -1613,6 +1614,7 @@ def test(
         """
         Execute one single DagRun for a given DAG and logical date.
 
+        :param run_after: DagRun to execute after this DagRun

Review Comment:
   I feel this one is inaccurate?



##########
airflow/api/common/trigger_dag.py:
##########
@@ -64,27 +64,31 @@ def _trigger_dag(
 
     if dag is None or dag_id not in dag_bag.dags:
         raise DagNotFound(f"Dag id {dag_id} not found")
-
-    logical_date = logical_date or timezone.utcnow()
-
-    if not timezone.is_localized(logical_date):
-        raise ValueError("The logical date should be localized")
-
-    if replace_microseconds:
-        logical_date = logical_date.replace(microsecond=0)
-
-    if dag.default_args and "start_date" in dag.default_args:
-        min_dag_start_date = dag.default_args["start_date"]
-        if min_dag_start_date and logical_date < min_dag_start_date:
-            raise ValueError(
-                f"Logical date [{logical_date.isoformat()}] should be >= 
start_date "
-                f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
-            )
-    coerced_logical_date = timezone.coerce_datetime(logical_date)
-
-    data_interval = 
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
+    if logical_date:
+        if not timezone.is_localized(logical_date):
+            raise ValueError("The logical date should be localized")
+
+        if replace_microseconds:
+            logical_date = logical_date.replace(microsecond=0)
+
+        if dag.default_args and "start_date" in dag.default_args:
+            min_dag_start_date = dag.default_args["start_date"]
+            if min_dag_start_date and logical_date < min_dag_start_date:
+                raise ValueError(
+                    f"Logical date [{logical_date.isoformat()}] should be >= 
start_date "
+                    f"[{min_dag_start_date.isoformat()}] from DAG's 
default_args"
+                )
+        run_after = timezone.coerce_datetime(logical_date)
+        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=run_after)
+    else:
+        data_interval = None
+        run_after = timezone.coerce_datetime(timezone.utcnow())

Review Comment:
   ```suggestion
           data_interval = 
dag.timetable.infer_manual_data_interval(run_after=run_after)
           run_after = timezone.coerce_datetime(logical_date)
       else:
           data_interval = None
           run_after = timezone.coerce_datetime(timezone.utcnow())
   ```
   
   nit: reorder itt to keep the same reading order



##########
airflow/api_connexion/schemas/dag_run_schema.py:
##########
@@ -78,7 +78,7 @@ class Meta:
 
     @pre_load
     def autogenerate(self, data, **kwargs):
-        """Auto generate run_id and logical_date if they are not provided."""
+        """Auto generate run_id, logical_date if they are not provided."""

Review Comment:
   why do we change it here? just rewording, or the content should be changed 
as well



##########
airflow/models/dag.py:
##########
@@ -1782,6 +1789,10 @@ def create_dagrun(
         :meta private:
         """
         logical_date = timezone.coerce_datetime(logical_date)
+        # For manual runs where logical_date is None, ensure no data_interval 
is set.
+        if logical_date is None:
+            if data_interval is not None:
+                raise ValueError("data_interval must be None when logical_date 
is None")

Review Comment:
   ```suggestion
           if logical_date is None and data_interval is not None:
                   raise ValueError("data_interval must be None when 
logical_date is None")
   ```



##########
airflow/models/dag.py:
##########
@@ -1602,6 +1602,7 @@ def cli(self):
     @provide_session
     def test(
         self,
+        run_after: datetime | None = None,

Review Comment:
   I thought run_after could not be None? 
https://github.com/apache/airflow/pull/46398#discussion_r1944139126



##########
airflow/models/dagrun.py:
##########
@@ -253,12 +253,15 @@ def __init__(
         dag_version: DagVersion | None = None,
         bundle_version: str | None = None,
     ):
+        # For manual runs where logical_date is None, ensure no data_interval 
is set.
+        if logical_date is None:
+            if data_interval is not None:
+                raise ValueError("data_interval must be None if logical_date 
is None")

Review Comment:
   ```suggestion
           if logical_date is None and data_interval is not None:
                   raise ValueError("data_interval must be None if logical_date 
is None")
   ```



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1396,7 +1396,7 @@ def _create_dag_runs_asset_triggered(
                 dag_run = dag.create_dagrun(
                     run_id=dag.timetable.generate_run_id(
                         run_type=DagRunType.ASSET_TRIGGERED,
-                        logical_date=logical_date,
+                        logical_date=logical_date or 
dag_model.next_dagrun_create_after,

Review Comment:
   Is this a tmp solution as it's been blocked by 
https://github.com/apache/airflow/pull/46398? If so, we need to at least add a 
TODO here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to