dabla commented on code in PR #57975:
URL: https://github.com/apache/airflow/pull/57975#discussion_r2632453165
##########
task-sdk/src/airflow/sdk/execution_time/lazy_sequence.py:
##########
@@ -35,25 +39,40 @@
# ``XCom.deserialize_value``. We don't want to wrap the API values in a nested
# {"value": value} dict since it wastes bandwidth.
_XComWrapper = collections.namedtuple("_XComWrapper", "value")
-
log = structlog.get_logger(logger_name=__name__)
+def _deque_factory() -> deque:
+ return deque(maxlen=conf.getint("core", "parallelism"))
Review Comment:
Good question and I think it should probably be a dedicated config
parameter, or even something more dynamic but there I don't have an idea to
make that possible. But for our patched Airflow instance I chose this config
parameter as we had the performance "issue" with the IterableOperator, as it
tries to create lots of mapped task instances in chunks of 'core.paralellism',
thus for our solution this was good enough and didn't require any extra
parametrization.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]