uranusjr commented on code in PR #39426:
URL: https://github.com/apache/airflow/pull/39426#discussion_r1593758556


##########
airflow/utils/db.py:
##########
@@ -1888,12 +1914,144 @@ def get_query_count(query_stmt: Select, *, session: 
Session) -> int:
     return session.scalar(count_stmt)
 
 
+def get_query_exists(query_stmt: Select, *, session: Session) -> bool:
+    """Check whether there is at least one row matching a query.
+
+    A SELECT 1 FROM is issued against the subquery built from the given
+    statement. The ORDER BY clause is stripped from the statement since it's
+    unnecessary, and can impact query planning and degrade performance.
+
+    :meta private:
+    """
+    count_stmt = 
select(literal(True)).select_from(query_stmt.order_by(None).subquery())
+    return session.scalar(count_stmt)
+
+
 def exists_query(*where: ClauseElement, session: Session) -> bool:
-    """Check whether there is at least one row matching given clause.
+    """Check whether there is at least one row matching given clauses.
 
     This does a SELECT 1 WHERE ... LIMIT 1 and check the result.
 
     :meta private:
     """
     stmt = select(literal(True)).where(*where).limit(1)
     return session.scalar(stmt) is not None
+
+
[email protected](slots=True)
+class LazySelectSequence(Sequence[T]):
+    """List-like interface to lazily access a database model query.
+
+    The intended use case is inside a task execution context, where we manage 
an
+    active SQLAlchemy session in the background.
+
+    This is an abstract base class. Each use case should subclass, and 
implement
+    the following static methods:
+
+    * ``_rebuild_select`` is called when a lazy sequence is unpickled. Since it
+      is not easy to pickle SQLAlchemy constructs, this class serializes the
+      SELECT statements into plain text to storage. This method is called on
+      deserialization to convert the textual clause back into an ORM SELECT.
+    * ``_process_row`` is called when an item is accessed. The lazy sequence
+      uses ``session.execute()`` to fetch rows from the database, and this
+      method should know how to process each row into a value.
+
+    :meta private:
+    """
+
+    _select_asc: ClauseElement
+    _select_desc: ClauseElement
+    _session: Session = attrs.field(kw_only=True, 
factory=get_current_task_instance_session)
+    _len: int | None = attrs.field(init=False, default=None)
+
+    @classmethod
+    def from_select(
+        cls,
+        select: Select,
+        *,
+        order_by: Sequence[ClauseElement],
+        session: Session | None = None,
+    ) -> Self:
+        s1 = select
+        for col in order_by:
+            s1 = s1.order_by(col.asc())
+        s2 = select
+        for col in order_by:
+            s2 = s2.order_by(col.desc())
+        return cls(s1, s2, session=session or 
get_current_task_instance_session())
+
+    @staticmethod
+    def _rebuild_select(stmt: TextClause) -> Select:
+        """Rebuild a textual statement into an ORM-configured SELECT statement.
+
+        This should do something like ``select(field).from_statement(stmt)`` to
+        reconfigure ORM information to the textual SQL statement.
+        """
+        raise NotImplementedError
+
+    @staticmethod
+    def _process_row(row: Row) -> T:
+        """Process a SELECT-ed row into the end value."""
+        raise NotImplementedError
+
+    def __repr__(self) -> str:
+        counter = "item" if (length := len(self)) == 1 else "items"
+        return f"LazySelectSequence([{length} {counter}])"
+
+    def __str__(self) -> str:
+        counter = "item" if (length := len(self)) == 1 else "items"
+        return f"[... {length} {counter} ...]"

Review Comment:
   It is not particularly useful to want to print the _entire_ sequence in a 
task, and when you do it (likely out of desperation or something) resolving the 
list would be extremely costly for most things. In most cases you’d only want 
the last few items.
   
   It is not unreasonable for a DatasetEvent sequence to contain hundreds of 
rows, printing that all out is a disaster in more than one way, and I feel this 
might actually be better for debugging in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to