This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b1edb09bff5 IGNITE-24666 Optimize Outbox#flush method (#11901)
b1edb09bff5 is described below

commit b1edb09bff5c1132943e256b0840d07704cba759
Author: Maksim Timonin <[email protected]>
AuthorDate: Fri Feb 28 23:30:09 2025 +0300

    IGNITE-24666 Optimize Outbox#flush method (#11901)
---
 .../processors/query/calcite/exec/rel/Outbox.java  | 39 ++++++++++++++++------
 1 file changed, 28 insertions(+), 11 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index ce7597801e7..ebd10b96359 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -19,13 +19,12 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.stream.Collectors;
+
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
@@ -230,19 +229,37 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Sing
         while (!inBuf.isEmpty()) {
             checkState();
 
-            Collection<Buffer> buffers = dest.targets(inBuf.peek()).stream()
-                .map(this::getOrCreateBuffer)
-                .collect(Collectors.toList());
+            List<UUID> nodes = dest.targets(inBuf.peek());
 
-            assert !F.isEmpty(buffers);
+            assert !F.isEmpty(nodes);
 
-            if (!buffers.stream().allMatch(Buffer::ready))
-                return;
+            // flush() method is invoked for every row, and in most cases the 
destination is a single node.
+            // Therefore, we use this optimization for the case to avoid 
excess memory allocations.
+            if (nodes.size() == 1) {
+                Buffer buf = getOrCreateBuffer(nodes.get(0));
+
+                if (!buf.ready())
+                    return;
+
+                buf.add(inBuf.remove());
+            }
+            else {
+                List<Buffer> buffers = new ArrayList<>(nodes.size());
 
-            Row row = inBuf.remove();
+                for (UUID nodeId : nodes) {
+                    Buffer buf = getOrCreateBuffer(nodeId);
 
-            for (Buffer dest : buffers)
-                dest.add(row);
+                    if (!buf.ready())
+                        return;
+
+                    buffers.add(buf);
+                }
+
+                Row row = inBuf.remove();
+
+                for (Buffer dest : buffers)
+                    dest.add(row);
+            }
         }
 
         assert inBuf.isEmpty();

Reply via email to