This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b1edb09bff5 IGNITE-24666 Optimize Outbox#flush method (#11901)
b1edb09bff5 is described below
commit b1edb09bff5c1132943e256b0840d07704cba759
Author: Maksim Timonin <[email protected]>
AuthorDate: Fri Feb 28 23:30:09 2025 +0300
IGNITE-24666 Optimize Outbox#flush method (#11901)
---
.../processors/query/calcite/exec/rel/Outbox.java | 39 ++++++++++++++++------
1 file changed, 28 insertions(+), 11 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index ce7597801e7..ebd10b96359 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -19,13 +19,12 @@ package
org.apache.ignite.internal.processors.query.calcite.exec.rel;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.stream.Collectors;
+
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
@@ -230,19 +229,37 @@ public class Outbox<Row> extends AbstractNode<Row>
implements Mailbox<Row>, Sing
while (!inBuf.isEmpty()) {
checkState();
- Collection<Buffer> buffers = dest.targets(inBuf.peek()).stream()
- .map(this::getOrCreateBuffer)
- .collect(Collectors.toList());
+ List<UUID> nodes = dest.targets(inBuf.peek());
- assert !F.isEmpty(buffers);
+ assert !F.isEmpty(nodes);
- if (!buffers.stream().allMatch(Buffer::ready))
- return;
+ // flush() method is invoked for every row, and in most cases the
destination is a single node.
+ // Therefore, we use this optimization for the case to avoid
excess memory allocations.
+ if (nodes.size() == 1) {
+ Buffer buf = getOrCreateBuffer(nodes.get(0));
+
+ if (!buf.ready())
+ return;
+
+ buf.add(inBuf.remove());
+ }
+ else {
+ List<Buffer> buffers = new ArrayList<>(nodes.size());
- Row row = inBuf.remove();
+ for (UUID nodeId : nodes) {
+ Buffer buf = getOrCreateBuffer(nodeId);
- for (Buffer dest : buffers)
- dest.add(row);
+ if (!buf.ready())
+ return;
+
+ buffers.add(buf);
+ }
+
+ Row row = inBuf.remove();
+
+ for (Buffer dest : buffers)
+ dest.add(row);
+ }
}
assert inBuf.isEmpty();