hussein-awala commented on code in PR #30292:
URL: https://github.com/apache/airflow/pull/30292#discussion_r1148675368


##########
airflow/operators/trigger_dagrun.py:
##########
@@ -40,9 +42,27 @@
 
 
 if TYPE_CHECKING:
+    from sqlalchemy.orm.session import Session
+
     from airflow.models.taskinstance import TaskInstanceKey
 
 
+def _parse_execution_date(date: datetime.datetime | str | None) -> 
datetime.datetime:
+    if isinstance(date, datetime.datetime):
+        return date
+    elif isinstance(date, str):
+        return timezone.parse(date)
+    else:
+        return timezone.utcnow()
+
+
+def _get_run_id(run_id: str | None, parsed_execution_date: datetime.datetime) 
-> str:

Review Comment:
   Nit, I think `_get_or_generate_run_id` is more meaningful, WDYT? 



##########
airflow/operators/trigger_dagrun.py:
##########
@@ -185,3 +215,39 @@ def execute(self, context: Context):
                 if state in self.allowed_states:
                     self.log.info("%s finished with allowed state %s", 
self.trigger_dag_id, state)
                     return
+
+    @provide_session
+    def execute_complete(self, context: Context, session: "Session", **kwargs):

Review Comment:
   ```suggestion
       def execute_complete(self, context: Context, session: Session = 
NEW_SESSION, **kwargs):
   ```



##########
airflow/operators/trigger_dagrun.py:
##########
@@ -185,3 +215,39 @@ def execute(self, context: Context):
                 if state in self.allowed_states:
                     self.log.info("%s finished with allowed state %s", 
self.trigger_dag_id, state)
                     return
+
+    @provide_session
+    def execute_complete(self, context: Context, session: "Session", **kwargs):
+        parsed_execution_date = _parse_execution_date(self.execution_date)
+
+        dag_run = (
+            session.query(DagRun)
+            .filter(DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == parsed_execution_date)
+            .all()
+        )
+
+        if len(dag_run) > 1:
+            raise AirflowException(
+                f"Detected duplicate DAG run ({self.trigger_dag_id}) and 
execution date "
+                f"({self.execution_date}), found {len(dag_run)} records"
+            )
+
+        dag_run = dag_run[0]
+
+        if not dag_run:
+            raise AirflowException(
+                f"No DAG run found for DAG {self.trigger_dag_id} and execution 
date {self.execution_date}"
+            )

Review Comment:
   I don't think we need to check if there are multiple runs, because there is 
a [unique 
constraint](https://github.com/apache/airflow/blob/8f77cf8189bcf99bf8773e2f66585132fb2fa55e/airflow/models/dagrun.py#L143-L143)
 on (dag_id, execution_date).
   
   And when there is no dag run, the code will fail because you try to access 
the first element of an empty list.
   
   ```suggestion
           try:
               dag_run = (
                   session.query(DagRun)
                   .filter(DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == parsed_execution_date)
                   .one()
               )
           except NoResultFound:
               raise AirflowException(
                   f"No DAG run found for DAG {self.trigger_dag_id} and 
execution date {self.execution_date}"
               )
   ```
   And if you want to keep the duplication check, you can add:
   ```python
           except MultipleResultsFound:
               raise AirflowException(
                   f"Detected duplicate DAG run ({self.trigger_dag_id}) and 
execution date "
                   f"({self.execution_date}), found {len(dag_run)} records"
               )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to