dabla commented on code in PR #57975:
URL: https://github.com/apache/airflow/pull/57975#discussion_r2634695154
##########
task-sdk/src/airflow/sdk/execution_time/lazy_sequence.py:
##########
@@ -35,25 +39,40 @@
# ``XCom.deserialize_value``. We don't want to wrap the API values in a nested
# {"value": value} dict since it wastes bandwidth.
_XComWrapper = collections.namedtuple("_XComWrapper", "value")
-
log = structlog.get_logger(logger_name=__name__)
+def _deque_factory() -> deque:
+ return deque(maxlen=conf.getint("core", "parallelism"))
+
+
@attrs.define
-class LazyXComIterator(Iterator[T]):
+class LazyXComIterator(LoggingMixin, Iterator[T]):
seq: LazyXComSequence[T]
index: int = 0
dir: Literal[1, -1] = 1
+ _buffer: deque[T] = attrs.field(factory=_deque_factory, init=False)
+
+ @property
+ def prefetch_size(self) -> int:
+ return self._buffer.maxlen or conf.getint("core", "parallelism")
def __next__(self) -> T:
if self.index < 0:
# When iterating backwards, avoid extra HTTP request
raise StopIteration()
- try:
- val = self.seq[self.index]
- except IndexError:
- raise StopIteration from None
+
+ # If the buffer is empty, fetch the next chunk
+ if not self._buffer:
+ chunk = list(self.seq[self.index : self.index +
self.prefetch_size])
+ if not chunk:
+ raise StopIteration()
+ self._buffer.extend(chunk)
+ self.log.debug("Buffered %s XCom's", len(self._buffer))
+
+ val = self._buffer.popleft()
self.index += self.dir
+ self.log.debug("Popped buffered XCom for index %s: %s", self.index,
val)
Review Comment:
> Maybe a very small default would kind of make sense, and if the amount of
entries is small enough we could even just send everything regardless, but I
feel this should be done more automatically at the server side instead, where
more context is available to make a better decision.
Don't see how you could do that on server side, as it is the client that
determines how many iterations it will do, and thus how many XCom's it wants to
retrieve, that just the part I'm struggling with.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]