potiuk commented on code in PR #57975:
URL: https://github.com/apache/airflow/pull/57975#discussion_r2632042707


##########
task-sdk/src/airflow/sdk/execution_time/lazy_sequence.py:
##########
@@ -35,25 +39,40 @@
 # ``XCom.deserialize_value``. We don't want to wrap the API values in a nested
 # {"value": value} dict since it wastes bandwidth.
 _XComWrapper = collections.namedtuple("_XComWrapper", "value")
-
 log = structlog.get_logger(logger_name=__name__)
 
 
+def _deque_factory() -> deque:
+    return deque(maxlen=conf.getint("core", "parallelism"))
+
+
 @attrs.define
-class LazyXComIterator(Iterator[T]):
+class LazyXComIterator(LoggingMixin, Iterator[T]):
     seq: LazyXComSequence[T]
     index: int = 0
     dir: Literal[1, -1] = 1
+    _buffer: deque[T] = attrs.field(factory=_deque_factory, init=False)
+
+    @property
+    def prefetch_size(self) -> int:
+        return self._buffer.maxlen or conf.getint("core", "parallelism")
 
     def __next__(self) -> T:
         if self.index < 0:
             # When iterating backwards, avoid extra HTTP request
             raise StopIteration()
-        try:
-            val = self.seq[self.index]
-        except IndexError:
-            raise StopIteration from None
+
+        # If the buffer is empty, fetch the next chunk
+        if not self._buffer:
+            chunk = list(self.seq[self.index : self.index + 
self.prefetch_size])
+            if not chunk:
+                raise StopIteration()
+            self._buffer.extend(chunk)
+            self.log.debug("Buffered %s XCom's", len(self._buffer))
+
+        val = self._buffer.popleft()
         self.index += self.dir
+        self.log.debug("Popped buffered XCom for index %s: %s", self.index, 
val)

Review Comment:
   It has of course a drawback of consuming too many of those. I think before 
we merge it, we need to consider certain scenarios and better way of 
controlling it so that users can control prefetching.
   
   And of course explain in the docs all kinds of usages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to