dabla commented on code in PR #57975:
URL: https://github.com/apache/airflow/pull/57975#discussion_r2634691412
##########
task-sdk/src/airflow/sdk/execution_time/lazy_sequence.py:
##########
@@ -35,25 +39,40 @@
# ``XCom.deserialize_value``. We don't want to wrap the API values in a nested
# {"value": value} dict since it wastes bandwidth.
_XComWrapper = collections.namedtuple("_XComWrapper", "value")
-
log = structlog.get_logger(logger_name=__name__)
+def _deque_factory() -> deque:
+ return deque(maxlen=conf.getint("core", "parallelism"))
+
+
@attrs.define
-class LazyXComIterator(Iterator[T]):
+class LazyXComIterator(LoggingMixin, Iterator[T]):
seq: LazyXComSequence[T]
index: int = 0
dir: Literal[1, -1] = 1
+ _buffer: deque[T] = attrs.field(factory=_deque_factory, init=False)
+
+ @property
+ def prefetch_size(self) -> int:
+ return self._buffer.maxlen or conf.getint("core", "parallelism")
def __next__(self) -> T:
if self.index < 0:
# When iterating backwards, avoid extra HTTP request
raise StopIteration()
- try:
- val = self.seq[self.index]
- except IndexError:
- raise StopIteration from None
+
+ # If the buffer is empty, fetch the next chunk
+ if not self._buffer:
+ chunk = list(self.seq[self.index : self.index +
self.prefetch_size])
+ if not chunk:
+ raise StopIteration()
+ self._buffer.extend(chunk)
+ self.log.debug("Buffered %s XCom's", len(self._buffer))
+
+ val = self._buffer.popleft()
self.index += self.dir
+ self.log.debug("Popped buffered XCom for index %s: %s", self.index,
val)
Review Comment:
> I feel it’d be a good idea to have a separate interface. The appropriate
value here varies per-task or even per-XCom, so a deployment-wide config
wouldn’t be too useful for anyone.
>
Fully agree on that, I tried to search for better solution but couldn't come
up with one that easily
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]