uranusjr commented on a change in pull request #18968:
URL: https://github.com/apache/airflow/pull/18968#discussion_r733321814



##########
File path: airflow/models/dagrun.py
##########
@@ -334,6 +334,34 @@ def find(
 
         return qry.order_by(DR.execution_date).all()
 
+    @staticmethod
+    @provide_session
+    def find_duplicate(
+        dag_id: str,
+        run_id: str,
+        execution_date: datetime,
+        session: Session = None,
+    ) -> Optional['DagRun']:
+        """
+        Returns a dag run if there is an existing run for a dag at a specific 
run_id and execution_date.

Review comment:
       ```suggestion
           Return an existing run for the DAG with a specific run_id or 
execution_date.
           
           *None* is returned if no such DAG run is found.
   ```
   
   I wonder if this function should be on `DAG` instead, since this needs a DAG 
ID.
   
   If we are going to put this on `DagRun`, this should be a classmethod (so 
you can do `session.query(cls)` below).

##########
File path: airflow/models/dagrun.py
##########
@@ -334,6 +334,34 @@ def find(
 
         return qry.order_by(DR.execution_date).all()
 
+    @staticmethod
+    @provide_session
+    def find_duplicate(
+        dag_id: str,
+        run_id: str,
+        execution_date: datetime,
+        session: Session = None,
+    ) -> Optional['DagRun']:
+        """
+        Returns a dag run if there is an existing run for a dag at a specific 
run_id and execution_date.
+
+        :param dag_id: the dag_id to find duplicates for
+        :type dag_id: str
+        :param run_id: defines the run id for this dag run
+        :type run_id: str
+        :param execution_date: the execution date
+        :type execution_date: datetime.datetime
+        :param session: database session
+        :type session: sqlalchemy.orm.session.Session
+        """
+        DR = DagRun
+
+        qry = session.query(DR)
+
+        qry = qry.filter(DR.dag_id == dag_id, or_(DR.run_id == run_id, 
DR.execution_date == execution_date))
+
+        return qry.first()

Review comment:
       ```suggestion
           return (
               session.query(cls)
                        .filter(
                            cls.dag_id == dag_id,
                            or_(cls.run_id == run_id, cls.execution_date == 
execution_date),
                        )
                        .one_or_none()
                )
   ```
   
   This should always return at most one row since we have a unique constraint 
on the database, but if it somehow returns multiple ones, we want to be loud 
about it to raise awareness to the inconsistency.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to