Lee-W commented on code in PR #46398:
URL: https://github.com/apache/airflow/pull/46398#discussion_r1948435193


##########
airflow/api_connexion/schemas/dag_run_schema.py:
##########
@@ -88,7 +88,8 @@ def autogenerate(self, data, **kwargs):
         if "dag_run_id" not in data:
             try:
                 data["dag_run_id"] = DagRun.generate_run_id(
-                    DagRunType.MANUAL, timezone.parse(data["logical_date"])
+                    DagRunType.MANUAL,
+                    timezone.parse(data["logical_date"]),

Review Comment:
   ```suggestion
                       run_type=DagRunType.MANUAL,
                       logical_date=timezone.parse(data["logical_date"]),
   ```



##########
airflow/utils/types.py:
##########
@@ -42,7 +43,11 @@ class DagRunType(str, enum.Enum):
     def __str__(self) -> str:
         return self.value
 
-    def generate_run_id(self, logical_date: datetime) -> str:
+    def generate_run_id(self, logical_date: datetime | None, run_after: 
datetime | None) -> str:

Review Comment:
   ```suggestion
       def generate_run_id(self, *, logical_date: datetime | None, run_after: 
datetime | None) -> str:
   ```
   
   same here



##########
airflow/models/baseoperator.py:
##########
@@ -628,7 +628,11 @@ def run(
                 # This is _mostly_ only used in tests
                 dr = DagRun(
                     dag_id=self.dag_id,
-                    run_id=DagRun.generate_run_id(DagRunType.MANUAL, 
info.logical_date),
+                    run_id=DagRun.generate_run_id(
+                        DagRunType.MANUAL,
+                        info.logical_date,

Review Comment:
   ```suggestion
                           run_type=DagRunType.MANUAL,
                           logical_date=info.logical_date,
   ```



##########
airflow/utils/types.py:
##########
@@ -42,7 +43,11 @@ class DagRunType(str, enum.Enum):
     def __str__(self) -> str:
         return self.value
 
-    def generate_run_id(self, logical_date: datetime) -> str:
+    def generate_run_id(self, logical_date: datetime | None, run_after: 
datetime | None) -> str:
+        if logical_date is None:
+            if run_after is None:
+                raise ValueError("run_after cannot be None")
+            return run_after + get_random_string()

Review Comment:
   ```suggestion
               return f"{run_after}{get_random_string()}"
   ```



##########
airflow/api/common/trigger_dag.py:
##########
@@ -84,7 +84,10 @@ def _trigger_dag(
 
     data_interval = 
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
     run_id = run_id or dag.timetable.generate_run_id(
-        run_type=DagRunType.MANUAL, logical_date=coerced_logical_date, 
data_interval=data_interval
+        run_type=DagRunType.MANUAL,
+        logical_date=coerced_logical_date,
+        data_interval=data_interval,
+        run_after=data_interval.end,

Review Comment:
   `timezone.utcnow()` seems to be a reasonable default for `run_after`



##########
airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -371,6 +371,7 @@ def trigger_dag_run(
                 run_type=DagRunType.MANUAL,
                 logical_date=logical_date,
                 data_interval=data_interval,
+                run_after=data_interval.end,

Review Comment:
   utcnow for this as well probably?



##########
airflow/api/common/trigger_dag.py:
##########
@@ -84,7 +84,10 @@ def _trigger_dag(
 
     data_interval = 
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
     run_id = run_id or dag.timetable.generate_run_id(
-        run_type=DagRunType.MANUAL, logical_date=coerced_logical_date, 
data_interval=data_interval
+        run_type=DagRunType.MANUAL,
+        logical_date=coerced_logical_date,
+        data_interval=data_interval,
+        run_after=data_interval.end,

Review Comment:
   I think we'll need to do it in this PR 🤔



##########
airflow/models/dagrun.py:
##########
@@ -621,10 +621,12 @@ def find_duplicate(cls, dag_id: str, run_id: str, *, 
session: Session = NEW_SESS
         return session.scalars(select(cls).where(cls.dag_id == dag_id, 
cls.run_id == run_id)).one_or_none()
 
     @staticmethod
-    def generate_run_id(run_type: DagRunType, logical_date: datetime) -> str:
+    def generate_run_id(
+        run_type: DagRunType, logical_date: datetime | None, run_after: 
datetime | None = None

Review Comment:
   ```suggestion
           *, run_type: DagRunType, logical_date: datetime | None, run_after: 
datetime | None = None
   ```
   
   as the method becomes more complicate (hard to reason the order of 
logcial_date and run_after), we probably should make it keyword only



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to