dabla commented on code in PR #57975:
URL: https://github.com/apache/airflow/pull/57975#discussion_r2632462655


##########
task-sdk/src/airflow/sdk/execution_time/lazy_sequence.py:
##########
@@ -35,25 +39,40 @@
 # ``XCom.deserialize_value``. We don't want to wrap the API values in a nested
 # {"value": value} dict since it wastes bandwidth.
 _XComWrapper = collections.namedtuple("_XComWrapper", "value")
-
 log = structlog.get_logger(logger_name=__name__)
 
 
+def _deque_factory() -> deque:
+    return deque(maxlen=conf.getint("core", "parallelism"))
+
+
 @attrs.define
-class LazyXComIterator(Iterator[T]):
+class LazyXComIterator(LoggingMixin, Iterator[T]):
     seq: LazyXComSequence[T]
     index: int = 0
     dir: Literal[1, -1] = 1
+    _buffer: deque[T] = attrs.field(factory=_deque_factory, init=False)
+
+    @property
+    def prefetch_size(self) -> int:
+        return self._buffer.maxlen or conf.getint("core", "parallelism")
 
     def __next__(self) -> T:
         if self.index < 0:
             # When iterating backwards, avoid extra HTTP request
             raise StopIteration()
-        try:
-            val = self.seq[self.index]
-        except IndexError:
-            raise StopIteration from None
+
+        # If the buffer is empty, fetch the next chunk
+        if not self._buffer:
+            chunk = list(self.seq[self.index : self.index + 
self.prefetch_size])
+            if not chunk:
+                raise StopIteration()
+            self._buffer.extend(chunk)
+            self.log.debug("Buffered %s XCom's", len(self._buffer))
+
+        val = self._buffer.popleft()
         self.index += self.dir
+        self.log.debug("Popped buffered XCom for index %s: %s", self.index, 
val)

Review Comment:
   Yes, and there I have to come back to me above response regarding your why 
'core.paralellis' question?  I don't see directly how we could for example 
derive the 'max_active_tis_per_dagrun' parameter from the operator from where 
the Xcom is being processed, as that would make it already less rigid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to