dylanbstorey commented on code in PR #30292:
URL: https://github.com/apache/airflow/pull/30292#discussion_r1149238169


##########
airflow/operators/trigger_dagrun.py:
##########
@@ -40,9 +42,27 @@
 
 
 if TYPE_CHECKING:
+    from sqlalchemy.orm.session import Session
+
     from airflow.models.taskinstance import TaskInstanceKey
 
 
+def _parse_execution_date(date: datetime.datetime | str | None) -> 
datetime.datetime:
+    if isinstance(date, datetime.datetime):
+        return date
+    elif isinstance(date, str):
+        return timezone.parse(date)
+    else:
+        return timezone.utcnow()

Review Comment:
   Fair + makes sense. 
   
   While thinking through this a bit - noticed that in the event that `None` is 
passed the `parsed_execution_date` would change through the task execution 
cycle as `utcnow()` would get called twice.
   
   Handling the whole thing in `execute` method now (how it originally was), 
and passing the `parsed_execution_date` through context to `execute_complete` 
instead. 



##########
airflow/operators/trigger_dagrun.py:
##########
@@ -185,3 +215,39 @@ def execute(self, context: Context):
                 if state in self.allowed_states:
                     self.log.info("%s finished with allowed state %s", 
self.trigger_dag_id, state)
                     return
+
+    @provide_session
+    def execute_complete(self, context: Context, session: "Session", **kwargs):
+        parsed_execution_date = _parse_execution_date(self.execution_date)
+
+        dag_run = (
+            session.query(DagRun)
+            .filter(DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == parsed_execution_date)
+            .all()
+        )
+
+        if len(dag_run) > 1:
+            raise AirflowException(
+                f"Detected duplicate DAG run ({self.trigger_dag_id}) and 
execution date "
+                f"({self.execution_date}), found {len(dag_run)} records"
+            )
+
+        dag_run = dag_run[0]
+
+        if not dag_run:
+            raise AirflowException(
+                f"No DAG run found for DAG {self.trigger_dag_id} and execution 
date {self.execution_date}"
+            )

Review Comment:
   Makes sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to