This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c8ad63012 IGNITE-18580 Sql. Redesign the Exchange to use a pull-based 
approach (#1553)
4c8ad63012 is described below

commit 4c8ad6301258610849566cf384fd48e4d6eede02
Author: korlov42 <[email protected]>
AuthorDate: Tue Jan 24 16:42:31 2023 +0200

    IGNITE-18580 Sql. Redesign the Exchange to use a pull-based approach (#1553)
---
 .../internal/sql/engine/SqlQueryProcessor.java     |   2 -
 .../internal/sql/engine/exec/ExchangeService.java  |  15 +-
 .../sql/engine/exec/ExchangeServiceImpl.java       | 108 +++----
 .../sql/engine/exec/ExecutionServiceImpl.java      |   4 +-
 .../sql/engine/exec/LogicalRelImplementor.java     |   9 +-
 .../internal/sql/engine/exec/MailboxRegistry.java  |  20 +-
 .../sql/engine/exec/MailboxRegistryImpl.java       |  40 +--
 .../internal/sql/engine/exec/rel/AbstractNode.java |   8 +-
 .../sql/engine/exec/rel/AsyncRootNode.java         | 120 +++-----
 .../ignite/internal/sql/engine/exec/rel/Inbox.java | 340 ++++++++++++---------
 .../internal/sql/engine/exec/rel/Outbox.java       | 289 +++++++++++-------
 ...eMessage.java => QueryBatchRequestMessage.java} |  17 +-
 .../sql/engine/message/SqlQueryMessageGroup.java   |   3 +-
 .../ignite/internal/sql/engine/util/Commons.java   |   3 +
 .../sql/engine/benchmarks/SqlBenchmark.java        |  43 +--
 .../sql/engine/exec/ExecutionServiceImplTest.java  |   4 +-
 .../sql/engine/exec/rel/AbstractExecutionTest.java |   5 +-
 .../sql/engine/exec/rel/ExchangeExecutionTest.java | 238 +++++++++++++++
 ...sterService.java => ClusterServiceFactory.java} |  12 +-
 .../sql/engine/framework/DataProvider.java         |  42 +++
 .../sql/engine/framework/TestBuilders.java         | 107 ++++++-
 .../internal/sql/engine/framework/TestNode.java    |   2 +-
 22 files changed, 916 insertions(+), 515 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index eecaf29519..2e383ec123 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -199,8 +199,6 @@ public class SqlQueryProcessor implements QueryProcessor {
         ));
 
         var exchangeService = registerService(new ExchangeServiceImpl(
-                clusterSrvc.topologyService().localMember(),
-                taskExecutor,
                 mailboxRegistry,
                 msgSrvc
         ));
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
index 3724784dae..12b58de40f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
@@ -41,15 +41,16 @@ public interface ExchangeService extends LifecycleAware {
             List<RowT> rows) throws IgniteInternalCheckedException;
 
     /**
-     * Acknowledges a batch with given ID is processed.
+     * Requests batches from remote source.
      *
-     * @param nodeName Node consistent ID to notify.
-     * @param qryId Query ID.
-     * @param fragmentId Target fragment ID.
-     * @param exchangeId Exchange ID.
-     * @param batchId Batch ID.
+     * @param nodeName A consistent identifier of the node to request from.
+     * @param queryId An identifier of the query.
+     * @param fragmentId An identifier of the fragment to request from.
+     * @param exchangeId An identifier of the exchange to request from.
+     * @param amountOfBatches A count of batches to request.
      */
-    void acknowledge(String nodeName, UUID qryId, long fragmentId, long 
exchangeId, int batchId) throws IgniteInternalCheckedException;
+    void request(String nodeName, UUID queryId, long fragmentId, long 
exchangeId, int amountOfBatches)
+            throws IgniteInternalCheckedException;
 
     /**
      * Sends cancel request.
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index 5f26aa2585..39c7f248a3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -20,72 +20,64 @@ package org.apache.ignite.internal.sql.engine.exec;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR;
 
-import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
 import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
 import org.apache.ignite.internal.sql.engine.message.InboxCloseMessage;
 import org.apache.ignite.internal.sql.engine.message.MessageService;
-import 
org.apache.ignite.internal.sql.engine.message.QueryBatchAcknowledgeMessage;
 import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
+import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
-import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
-import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.ClusterNode;
 
 /**
- * ExchangeServiceImpl. TODO Documentation 
https://issues.apache.org/jira/browse/IGNITE-15859
+ * Message-based implementation of {@link ExchangeService} interface.
+ *
+ * <p>Provides simple methods of interaction with the mailbox, hiding all the 
machinery to send and receive messages.
  */
 public class ExchangeServiceImpl implements ExchangeService {
     private static final IgniteLogger LOG = 
Loggers.forClass(ExchangeServiceImpl.class);
-
     private static final SqlQueryMessagesFactory FACTORY = new 
SqlQueryMessagesFactory();
 
-    private final ClusterNode localNode;
-
-    private final QueryTaskExecutor taskExecutor;
-
     private final MailboxRegistry mailboxRegistry;
-
-    private final MessageService msgSrvc;
+    private final MessageService messageService;
 
     /**
-     * Constructor. TODO Documentation 
https://issues.apache.org/jira/browse/IGNITE-15859
+     * Creates the object.
+     *
+     * @param mailboxRegistry A registry of mailboxes created on the node.
+     * @param messageService A messaging service to exchange messages between 
mailboxes.
      */
     public ExchangeServiceImpl(
-            ClusterNode localNode,
-            QueryTaskExecutor taskExecutor,
             MailboxRegistry mailboxRegistry,
-            MessageService msgSrvc
+            MessageService messageService
     ) {
-        this.localNode = localNode;
-        this.taskExecutor = taskExecutor;
         this.mailboxRegistry = mailboxRegistry;
-        this.msgSrvc = msgSrvc;
+        this.messageService = messageService;
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {
-        msgSrvc.register((n, m) -> onMessage(n, (InboxCloseMessage) m), 
SqlQueryMessageGroup.INBOX_CLOSE_MESSAGE);
-        msgSrvc.register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage) 
m), SqlQueryMessageGroup.QUERY_BATCH_ACK);
-        msgSrvc.register((n, m) -> onMessage(n, (QueryBatchMessage) m), 
SqlQueryMessageGroup.QUERY_BATCH_MESSAGE);
+        messageService.register((n, m) -> onMessage(n, (InboxCloseMessage) m), 
SqlQueryMessageGroup.INBOX_CLOSE_MESSAGE);
+        messageService.register((n, m) -> onMessage(n, 
(QueryBatchRequestMessage) m), SqlQueryMessageGroup.QUERY_BATCH_REQUEST);
+        messageService.register((n, m) -> onMessage(n, (QueryBatchMessage) m), 
SqlQueryMessageGroup.QUERY_BATCH_MESSAGE);
     }
 
     /** {@inheritDoc} */
     @Override
     public <RowT> void sendBatch(String nodeName, UUID qryId, long fragmentId, 
long exchangeId, int batchId,
             boolean last, List<RowT> rows) throws 
IgniteInternalCheckedException {
-        msgSrvc.send(
+        messageService.send(
                 nodeName,
                 FACTORY.queryBatchMessage()
                         .queryId(qryId)
@@ -100,15 +92,15 @@ public class ExchangeServiceImpl implements 
ExchangeService {
 
     /** {@inheritDoc} */
     @Override
-    public void acknowledge(String nodeName, UUID qryId, long fragmentId, long 
exchangeId, int batchId)
+    public void request(String nodeName, UUID queryId, long fragmentId, long 
exchangeId, int amountOfBatches)
             throws IgniteInternalCheckedException {
-        msgSrvc.send(
+        messageService.send(
                 nodeName,
-                FACTORY.queryBatchAcknowledgeMessage()
-                        .queryId(qryId)
+                FACTORY.queryBatchRequestMessage()
+                        .queryId(queryId)
                         .fragmentId(fragmentId)
                         .exchangeId(exchangeId)
-                        .batchId(batchId)
+                        .amountOfBatches(amountOfBatches)
                         .build()
         );
     }
@@ -116,7 +108,7 @@ public class ExchangeServiceImpl implements ExchangeService 
{
     /** {@inheritDoc} */
     @Override
     public void closeQuery(String nodeName, UUID qryId) throws 
IgniteInternalCheckedException {
-        msgSrvc.send(
+        messageService.send(
                 nodeName,
                 FACTORY.queryCloseMessage()
                         .queryId(qryId)
@@ -127,7 +119,7 @@ public class ExchangeServiceImpl implements ExchangeService 
{
     /** {@inheritDoc} */
     @Override
     public void closeInbox(String nodeName, UUID qryId, long fragmentId, long 
exchangeId) throws IgniteInternalCheckedException {
-        msgSrvc.send(
+        messageService.send(
                 nodeName,
                 FACTORY.inboxCloseMessage()
                         .queryId(qryId)
@@ -140,7 +132,7 @@ public class ExchangeServiceImpl implements ExchangeService 
{
     /** {@inheritDoc} */
     @Override
     public void sendError(String nodeName, UUID qryId, long fragmentId, 
Throwable err) throws IgniteInternalCheckedException {
-        msgSrvc.send(
+        messageService.send(
                 nodeName,
                 FACTORY.errorMessage()
                         .queryId(qryId)
@@ -153,7 +145,7 @@ public class ExchangeServiceImpl implements ExchangeService 
{
     /** {@inheritDoc} */
     @Override
     public boolean alive(String nodeName) {
-        return msgSrvc.alive(nodeName);
+        return messageService.alive(nodeName);
     }
 
     private void onMessage(String nodeName, InboxCloseMessage msg) {
@@ -169,35 +161,29 @@ public class ExchangeServiceImpl implements 
ExchangeService {
         }
     }
 
-    private void onMessage(String nodeName, QueryBatchAcknowledgeMessage msg) {
-        Outbox<?> outbox = mailboxRegistry.outbox(msg.queryId(), 
msg.exchangeId());
+    private void onMessage(String nodeName, QueryBatchRequestMessage msg) {
+        CompletableFuture<Outbox<?>> outboxFut = 
mailboxRegistry.outbox(msg.queryId(), msg.exchangeId());
 
-        if (outbox != null) {
+        Consumer<Outbox<?>> onRequestHandler = outbox -> {
             try {
-                outbox.onAcknowledge(nodeName, msg.batchId());
+                outbox.onRequest(nodeName, msg.amountOfBatches());
             } catch (Throwable e) {
                 outbox.onError(e);
 
                 throw new IgniteInternalException(UNEXPECTED_ERR, "Unexpected 
exception", e);
             }
-        } else if (LOG.isDebugEnabled()) {
-            LOG.debug("Stale acknowledge message received: [nodeName={}, 
queryId={}, fragmentId={}, exchangeId={}, batchId={}]",
-                    nodeName, msg.queryId(), msg.fragmentId(), 
msg.exchangeId(), msg.batchId());
+        };
+
+        if (outboxFut.isDone()) {
+            onRequestHandler.accept(outboxFut.join());
+        } else {
+            outboxFut.thenAccept(onRequestHandler);
         }
     }
 
     private void onMessage(String nodeName, QueryBatchMessage msg) {
         Inbox<?> inbox = mailboxRegistry.inbox(msg.queryId(), 
msg.exchangeId());
 
-        if (inbox == null && msg.batchId() == 0) {
-            // first message sent before a fragment is built
-            // note that an inbox source fragment id is also used as an 
exchange id
-            Inbox<?> newInbox = new Inbox<>(baseInboxContext(nodeName, 
msg.queryId(), msg.fragmentId()),
-                    this, mailboxRegistry, msg.exchangeId(), msg.exchangeId());
-
-            inbox = mailboxRegistry.register(newInbox);
-        }
-
         if (inbox != null) {
             try {
                 inbox.onBatchReceived(nodeName, msg.batchId(), msg.last(), 
Commons.cast(msg.rows()));
@@ -212,28 +198,6 @@ public class ExchangeServiceImpl implements 
ExchangeService {
         }
     }
 
-    /**
-     * Get minimal execution context to meet Inbox needs.
-     */
-    private ExecutionContext<?> baseInboxContext(String nodeName, UUID qryId, 
long fragmentId) {
-        return new ExecutionContext<>(
-                BaseQueryContext.builder()
-                        .logger(LOG)
-                        .build(),
-                taskExecutor,
-                qryId,
-                localNode,
-                nodeName,
-                new FragmentDescription(
-                        fragmentId,
-                        null,
-                        null,
-                        Long2ObjectMaps.emptyMap()),
-                null,
-                Map.of(),
-                null);
-    }
-
     /** {@inheritDoc} */
     @Override
     public void stop() {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index ff96768d28..bc792f0c04 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -511,6 +511,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 });
                 node.onRegister(rootNode);
 
+                rootNode.prefetch();
+
                 root.complete(rootNode);
             }
 
@@ -527,7 +529,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             }
 
             if (node instanceof Outbox) {
-                ((Outbox<?>) node).init();
+                ((Outbox<?>) node).prefetch();
             }
         }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 18cdd49fe7..81ad1df7aa 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -587,12 +587,11 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
     /** {@inheritDoc} */
     @Override
     public Node<RowT> visit(IgniteReceiver rel) {
-        Inbox<RowT> inbox = mailboxRegistry.register(
-                new Inbox<>(ctx, exchangeSvc, mailboxRegistry, 
rel.exchangeId(), rel.sourceFragmentId()));
+        Inbox<RowT> inbox = new Inbox<>(ctx, exchangeSvc, mailboxRegistry,
+                ctx.remotes(rel.exchangeId()), 
expressionFactory.comparator(rel.collation()),
+                rel.exchangeId(), rel.sourceFragmentId());
 
-        // here may be an already created (to consume rows from remote nodes) 
inbox
-        // without proper context, we need to init it with a right one.
-        inbox.init(ctx, ctx.remotes(rel.exchangeId()), 
expressionFactory.comparator(rel.collation()));
+        mailboxRegistry.register(inbox);
 
         return inbox;
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
index f0ec828de3..eed6a5d0ba 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.exec;
 
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
 import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
 import org.jetbrains.annotations.Nullable;
@@ -29,12 +30,11 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface MailboxRegistry extends LifecycleAware {
     /**
-     * Tries to register and inbox node and returns it if success or returns 
previously registered inbox otherwise.
+     * Registers an inbox.
      *
-     * @param inbox Inbox.
-     * @return Registered inbox.
+     * @param inbox Inbox to register.
      */
-    <T> Inbox<T> register(Inbox<T> inbox);
+    void register(Inbox<?> inbox);
 
     /**
      * Registers an outbox.
@@ -64,7 +64,7 @@ public interface MailboxRegistry extends LifecycleAware {
      * @param exchangeId Exchange ID.
      * @return Registered outbox. May be {@code null} if execution was 
cancelled.
      */
-    Outbox<?> outbox(UUID qryId, long exchangeId);
+    CompletableFuture<Outbox<?>> outbox(UUID qryId, long exchangeId);
 
     /**
      * Returns a registered inbox by provided query ID, exchange ID pair.
@@ -84,14 +84,4 @@ public interface MailboxRegistry extends LifecycleAware {
      * @return Registered inboxes.
      */
     Collection<Inbox<?>> inboxes(@Nullable UUID qryId, long fragmentId, long 
exchangeId);
-
-    /**
-     * Returns all registered outboxes for provided query ID.
-     *
-     * @param qryId      Query ID. {@code null} means return outboxes with any 
query id.
-     * @param fragmentId Fragment Id. {@code -1} means return outboxes with 
any fragment id.
-     * @param exchangeId Exchange Id. {@code -1} means return outboxes with 
any exchange id.
-     * @return Registered outboxes.
-     */
-    Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long fragmentId, long 
exchangeId);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
index 2f00be6d6b..aa939b9769 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -43,7 +44,7 @@ public class MailboxRegistryImpl implements MailboxRegistry, 
TopologyEventHandle
 
     private static final Predicate<Mailbox<?>> ALWAYS_TRUE = o -> true;
 
-    private final Map<MailboxKey, Outbox<?>> locals;
+    private final Map<MailboxKey, CompletableFuture<Outbox<?>>> locals;
 
     private final Map<MailboxKey, Inbox<?>> remotes;
 
@@ -64,30 +65,29 @@ public class MailboxRegistryImpl implements 
MailboxRegistry, TopologyEventHandle
 
     /** {@inheritDoc} */
     @Override
-    public <T> Inbox<T> register(Inbox<T> inbox) {
-        Inbox<T> old = (Inbox<T>) remotes.putIfAbsent(new 
MailboxKey(inbox.queryId(), inbox.exchangeId()), inbox);
+    public void register(Inbox<?> inbox) {
+        Inbox<?> res = remotes.putIfAbsent(new MailboxKey(inbox.queryId(), 
inbox.exchangeId()), inbox);
 
         if (LOG.isTraceEnabled()) {
-            if (old != null) {
-                LOG.trace("Inbox already registered [qryId={}, 
fragmentId={}]", inbox.queryId(), inbox.fragmentId());
-            } else {
-                LOG.trace("Inbox registered [qryId={}, fragmentId={}]", 
inbox.queryId(), inbox.fragmentId());
-            }
+            LOG.trace("Inbox registered [qryId={}, fragmentId={}]", 
inbox.queryId(), inbox.fragmentId());
         }
 
-        return old != null ? old : inbox;
+        assert res == null : res;
     }
 
     /** {@inheritDoc} */
     @Override
     public void register(Outbox<?> outbox) {
-        Outbox<?> res = locals.put(new MailboxKey(outbox.queryId(), 
outbox.exchangeId()), outbox);
+        CompletableFuture<Outbox<?>> res = locals.computeIfAbsent(new 
MailboxKey(outbox.queryId(), outbox.exchangeId()),
+                k -> new CompletableFuture<>());
+
+        assert !res.isDone();
+
+        res.complete(outbox);
 
         if (LOG.isTraceEnabled()) {
             LOG.trace("Outbox registered [qryId={}, fragmentId={}]", 
outbox.queryId(), outbox.fragmentId());
         }
-
-        assert res == null : res;
     }
 
     /** {@inheritDoc} */
@@ -104,7 +104,7 @@ public class MailboxRegistryImpl implements 
MailboxRegistry, TopologyEventHandle
     /** {@inheritDoc} */
     @Override
     public void unregister(Outbox<?> outbox) {
-        boolean removed = locals.remove(new MailboxKey(outbox.queryId(), 
outbox.exchangeId()), outbox);
+        boolean removed = locals.remove(new MailboxKey(outbox.queryId(), 
outbox.exchangeId())) != null;
 
         if (LOG.isTraceEnabled()) {
             LOG.trace("Outbox {} unregistered [qryId={}, fragmentId={}]", 
removed ? "was" : "wasn't",
@@ -114,8 +114,8 @@ public class MailboxRegistryImpl implements 
MailboxRegistry, TopologyEventHandle
 
     /** {@inheritDoc} */
     @Override
-    public Outbox<?> outbox(UUID qryId, long exchangeId) {
-        return locals.get(new MailboxKey(qryId, exchangeId));
+    public CompletableFuture<Outbox<?>> outbox(UUID qryId, long exchangeId) {
+        return locals.computeIfAbsent(new MailboxKey(qryId, exchangeId), k -> 
new CompletableFuture<>());
     }
 
     /** {@inheritDoc} */
@@ -132,14 +132,6 @@ public class MailboxRegistryImpl implements 
MailboxRegistry, TopologyEventHandle
                 .collect(Collectors.toList());
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long 
fragmentId, long exchangeId) {
-        return locals.values().stream()
-                .filter(makeFilter(qryId, fragmentId, exchangeId))
-                .collect(Collectors.toList());
-    }
-
     private static Predicate<Mailbox<?>> makeFilter(@Nullable UUID qryId, long 
fragmentId, long exchangeId) {
         Predicate<Mailbox<?>> filter = ALWAYS_TRUE;
         if (qryId != null) {
@@ -177,7 +169,7 @@ public class MailboxRegistryImpl implements 
MailboxRegistry, TopologyEventHandle
     /** {@inheritDoc} */
     @Override
     public void onDisappeared(ClusterNode member) {
-        locals.values().forEach(n -> n.onNodeLeft(member.name()));
+        locals.values().forEach(fut -> fut.thenAccept(n -> 
n.onNodeLeft(member.name())));
         remotes.values().forEach(n -> n.onNodeLeft(member.name()));
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
index 811095d789..e7c7bd5a5a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
@@ -34,13 +34,13 @@ import 
org.apache.ignite.lang.IgniteInternalCheckedException;
  * Abstract node of execution tree.
  */
 public abstract class AbstractNode<RowT> implements Node<RowT> {
-    public static final int MODIFY_BATCH_SIZE = 100; 
//IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_BATCH_SIZE", 100);
+    public static final int MODIFY_BATCH_SIZE = 100;
 
-    protected static final int IO_BATCH_SIZE = 256; 
//IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_SIZE", 256);
+    protected static final int IO_BATCH_SIZE = Commons.IO_BATCH_SIZE;
 
-    protected static final int IO_BATCH_CNT = 4; 
//IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_CNT", 4);
+    protected static final int IO_BATCH_CNT = Commons.IO_BATCH_COUNT;
 
-    protected final int inBufSize = Commons.IN_BUFFER_SIZE; 
//IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IN_BUFFER_SIZE", 2);
+    protected final int inBufSize = Commons.IN_BUFFER_SIZE;
 
     /** For debug purpose. */
     private volatile Thread thread;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index e159e4a79a..f3d0423a32 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -17,18 +17,20 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
+import static 
org.apache.ignite.internal.sql.engine.util.Commons.IN_BUFFER_SIZE;
+
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import org.apache.ignite.internal.sql.engine.AsyncCursor;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
 import org.apache.ignite.sql.CursorClosedException;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * An async iterator over the execution tree.
@@ -48,9 +50,11 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
 
     private final Function<InRowT, OutRowT> converter;
 
+    private final Queue<OutRowT> buff = new ArrayDeque<>(IN_BUFFER_SIZE);
+
     private final AtomicBoolean taskScheduled = new AtomicBoolean();
 
-    private final Queue<PendingRequest<OutRowT>> pendingRequests = new 
LinkedBlockingQueue<>();
+    private final Queue<PendingRequest<OutRowT>> pendingRequests = new 
ConcurrentLinkedQueue<>();
 
     private volatile boolean closed = false;
 
@@ -61,26 +65,6 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
      */
     private int waiting;
 
-    /**
-     * Last row that pushed to this downstream.
-     *
-     * <p>{@link 
org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult} requires 
information about whether there are more
-     * rows available or not. To meet this requirement, we request an extra 
row from source at the very first requested batch. Thus,
-     * we have an extra row which should be the very first row of the next 
batch.
-     *
-     * <p>Note: this variable should be accessed from an execution task only.
-     */
-    private @Nullable OutRowT lastRow;
-
-    /**
-     * Whether the very first batch was already requested or not. See {@link 
#lastRow} for details.
-     *
-     * <p>Note: this variable should be accessed from an execution task only.
-     *
-     * @see #lastRow
-     */
-    private boolean firstRequest = true;
-
     /**
      * Constructor.
      *
@@ -97,19 +81,9 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
     public void push(InRowT row) throws Exception {
         assert waiting > 0;
 
-        waiting--;
-
-        var currentReq = pendingRequests.peek();
-
-        assert currentReq != null;
-
-        if (currentReq.buff.size() < currentReq.requested) {
-            currentReq.buff.add(converter.apply(row));
-        } else {
-            assert waiting == 0;
-
-            lastRow = converter.apply(row);
+        buff.add(converter.apply(row));
 
+        if (--waiting == 0) {
             flush();
         }
     }
@@ -121,16 +95,6 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
 
         waiting = -1;
 
-        var currentReq = pendingRequests.peek();
-
-        assert currentReq != null;
-
-        if (currentReq.buff.size() < currentReq.requested && lastRow != null) {
-            currentReq.buff.add(lastRow);
-
-            lastRow = null;
-        }
-
         flush();
     }
 
@@ -214,20 +178,48 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
         return cancelFut.thenApply(Function.identity());
     }
 
-    private void flush() {
-        var currentReq = pendingRequests.remove();
+    /**
+     * Starts the execution of the fragment and keeps the result in the 
intermediate buffer.
+     *
+     * <p>Note: this method must be called by the same thread that will 
execute the whole fragment.
+     */
+    public void prefetch() {
+        if (waiting == 0) {
+            try {
+                source.request(waiting = IN_BUFFER_SIZE);
+            } catch (Exception ex) {
+                onError(ex);
+            }
+        }
+    }
+
+    private void flush() throws Exception {
+        // flush may be triggered by prefetching, so let's do nothing in this 
case
+        if (pendingRequests.isEmpty()) {
+            return;
+        }
+
+        PendingRequest<OutRowT> currentReq = pendingRequests.peek();
 
         assert currentReq != null;
 
         taskScheduled.set(false);
 
-        currentReq.fut.complete(new BatchedResult<>(currentReq.buff, waiting 
!= -1 || lastRow != null));
+        while (!buff.isEmpty() && currentReq.buff.size() < 
currentReq.requested) {
+            currentReq.buff.add(buff.remove());
+        }
 
-        boolean hasMoreRow = waiting != -1 || lastRow != null;
+        boolean hasMoreRows = waiting != -1 || !buff.isEmpty();
 
-        if (hasMoreRow) {
-            scheduleTask();
-        } else {
+        if (currentReq.buff.size() == currentReq.requested || !hasMoreRows) {
+            pendingRequests.remove();
+
+            currentReq.fut.complete(new BatchedResult<>(currentReq.buff, 
hasMoreRows));
+        }
+
+        if (waiting == 0) {
+            source.request(waiting = IN_BUFFER_SIZE);
+        } else if (waiting == -1 && buff.isEmpty()) {
             closeAsync();
         }
     }
@@ -237,27 +229,7 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
      */
     private void scheduleTask() {
         if (!pendingRequests.isEmpty() && taskScheduled.compareAndSet(false, 
true)) {
-            var nextTask = pendingRequests.peek();
-
-            assert nextTask != null;
-
-            source.context().execute(() -> {
-                // for the very first request we need to request one extra row
-                // to be able to determine whether there is more rows or not
-                if (firstRequest) {
-                    waiting = nextTask.requested + 1;
-                    firstRequest = false;
-                } else {
-                    waiting = nextTask.requested;
-
-                    assert lastRow != null;
-
-                    nextTask.buff.add(lastRow);
-                    lastRow = null;
-                }
-
-                source.request(waiting);
-            }, source::onError);
+            source.context().execute(this::flush, source::onError);
         }
     }
 
@@ -277,7 +249,7 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
          */
         private final List<OutRowT> buff;
 
-        public PendingRequest(int requested, 
CompletableFuture<BatchedResult<OutRowT>> fut) {
+        private PendingRequest(int requested, 
CompletableFuture<BatchedResult<OutRowT>> fut) {
             this.requested = requested;
             this.fut = fut;
             this.buff = new ArrayList<>(requested);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index c519c9a019..d67264c21e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -20,25 +20,24 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.calcite.util.Util.unexpected;
 import static org.apache.ignite.lang.ErrorGroups.Sql.NODE_LEFT_ERR;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
-import java.util.stream.Collectors;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.rel.Inbox.RemoteSource.State;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * A part of exchange.
+ * A part of exchange which receives batches from remote sources.
  */
 public class Inbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, 
SingleNode<RowT> {
     private final ExchangeService exchange;
@@ -49,13 +48,13 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
 
     private final long srcFragmentId;
 
-    private final Map<String, Buffer> perNodeBuffers;
+    private final Map<String, RemoteSource<RowT>> perNodeBuffers;
 
-    private volatile Collection<String> srcNodeNames;
+    private final Collection<String> srcNodeNames;
 
-    private Comparator<RowT> comp;
+    private final @Nullable Comparator<RowT> comp;
 
-    private List<Buffer> buffers;
+    private List<RemoteSource<RowT>> remoteSources;
 
     private int requested;
 
@@ -74,12 +73,16 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
             ExecutionContext<RowT> ctx,
             ExchangeService exchange,
             MailboxRegistry registry,
+            Collection<String> srcNodeNames,
+            @Nullable Comparator<RowT> comp,
             long exchangeId,
             long srcFragmentId
     ) {
         super(ctx);
         this.exchange = exchange;
         this.registry = registry;
+        this.srcNodeNames = srcNodeNames;
+        this.comp = comp;
 
         this.srcFragmentId = srcFragmentId;
         this.exchangeId = exchangeId;
@@ -93,35 +96,9 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
         return exchangeId;
     }
 
-    /**
-     * Inits this Inbox.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-     *
-     * @param ctx Execution context.
-     * @param srcNodeNames Source nodes' consistent IDs.
-     * @param comp Optional comparator for merge exchange.
-     */
-    public void init(
-            ExecutionContext<RowT> ctx, Collection<String> srcNodeNames, 
@Nullable Comparator<RowT> comp) {
-        assert srcNodeNames != null : "Collection srcNodeNames not found for 
exchangeId: " + exchangeId;
-        assert context().fragmentId() == ctx.fragmentId() : "different 
fragments unsupported: previous=" + context().fragmentId()
-                + " current=" + ctx.fragmentId();
-
-        // It's important to set proper context here because
-        // the one, that is created on a first message
-        // received doesn't have all context variables in place.
-        context(ctx);
-
-        this.comp = comp;
-
-        // memory barier
-        this.srcNodeNames = new HashSet<>(srcNodeNames);
-    }
-
     /** {@inheritDoc} */
     @Override
     public void request(int rowsCnt) throws Exception {
-        assert srcNodeNames != null;
         assert rowsCnt > 0 && requested == 0;
 
         checkState();
@@ -168,13 +145,13 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
      * @param rows Rows.
      */
     public void onBatchReceived(String srcNodeName, int batchId, boolean last, 
List<RowT> rows) throws Exception {
-        Buffer buf = getOrCreateBuffer(srcNodeName);
+        RemoteSource<RowT> source = getOrCreateBuffer(srcNodeName);
 
-        boolean waitingBefore = buf.check() == State.WAITING;
+        boolean waitingBefore = source.check() == State.WAITING;
 
-        buf.offer(batchId, last, rows);
+        source.onBatchReceived(batchId, last, rows);
 
-        if (requested > 0 && waitingBefore && buf.check() != State.WAITING) {
+        if (requested > 0 && waitingBefore && source.check() != State.WAITING) 
{
             push();
         }
     }
@@ -186,16 +163,16 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
     }
 
     private void push() throws Exception {
-        if (buffers == null) {
+        if (remoteSources == null) {
             for (String node : srcNodeNames) {
                 checkNode(node);
             }
 
-            buffers = srcNodeNames.stream()
-                    .map(this::getOrCreateBuffer)
-                    .collect(Collectors.toList());
+            remoteSources = new ArrayList<>(srcNodeNames.size());
 
-            assert buffers.size() == perNodeBuffers.size();
+            for (String nodeName : srcNodeNames) {
+                remoteSources.add(getOrCreateBuffer(nodeName));
+            }
         }
 
         if (comp != null) {
@@ -206,9 +183,9 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
     }
 
     /** Checks that all corresponding buffers are in ready state. */
-    private boolean checkAllBuffsReady(Iterator<Buffer> it) {
+    private boolean checkAllBuffsReady(Iterator<RemoteSource<RowT>> it) {
         while (it.hasNext()) {
-            Buffer buf = it.next();
+            RemoteSource<?> buf = it.next();
 
             State state = buf.check();
 
@@ -227,15 +204,22 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
         return true;
     }
 
+    @SuppressWarnings("LabeledStatement")
     private void pushOrdered() throws Exception {
-        if (!checkAllBuffsReady(buffers.iterator())) {
+        if (!checkAllBuffsReady(remoteSources.iterator())) {
+            for (RemoteSource<RowT> remote : remoteSources) {
+                remote.requestNextBatchIfNeeded();
+            }
+
             return;
         }
 
-        PriorityQueue<Pair<RowT, Buffer>> heap =
-                new PriorityQueue<>(Math.max(buffers.size(), 1), 
Map.Entry.comparingByKey(comp));
+        assert comp != null;
 
-        for (Buffer buf : buffers) {
+        PriorityQueue<Pair<RowT, RemoteSource<RowT>>> heap =
+                new PriorityQueue<>(Math.max(remoteSources.size(), 1), 
Map.Entry.comparingByKey(comp));
+
+        for (RemoteSource<RowT> buf : remoteSources) {
             State state = buf.check();
 
             if (state == State.READY) {
@@ -247,25 +231,28 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
 
         inLoop = true;
         try {
+            loop:
             while (requested > 0 && !heap.isEmpty()) {
                 checkState();
 
-                Buffer buf = heap.poll().right;
+                RemoteSource<RowT> source = heap.poll().right;
 
                 requested--;
-                downstream().push(buf.remove());
+                downstream().push(source.remove());
 
-                State state = buf.check();
+                State state = source.check();
 
                 switch (state) {
                     case END:
-                        buffers.remove(buf);
+                        remoteSources.remove(source);
                         break;
                     case READY:
-                        heap.offer(Pair.of(buf.peek(), buf));
+                        heap.offer(Pair.of(source.peek(), source));
                         break;
                     case WAITING:
-                        return;
+                        // at this point we've drained all received batches 
from particular source,
+                        // thus we need to wait next batch in order to be able 
to preserve the ordering
+                        break loop;
                     default:
                         throw unexpected(state);
                 }
@@ -274,9 +261,11 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
             inLoop = false;
         }
 
-        if (requested > 0 && heap.isEmpty()) {
-            assert buffers.isEmpty();
+        for (RemoteSource<?> remote : remoteSources) {
+            remote.requestNextBatchIfNeeded();
+        }
 
+        if (requested > 0 && remoteSources.isEmpty() && heap.isEmpty()) {
             requested = 0;
             downstream().end();
         }
@@ -288,33 +277,36 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
 
         inLoop = true;
         try {
-            while (requested > 0 && !buffers.isEmpty()) {
+            while (requested > 0 && !remoteSources.isEmpty()) {
                 checkState();
 
-                Buffer buf = buffers.get(idx);
+                RemoteSource<RowT> source = remoteSources.get(idx);
 
-                switch (buf.check()) {
+                switch (source.check()) {
                     case END:
-                        buffers.remove(idx--);
+                        remoteSources.remove(idx);
 
                         break;
                     case READY:
                         noProgress = 0;
                         requested--;
-                        downstream().push(buf.remove());
+                        downstream().push(source.remove());
 
                         break;
                     case WAITING:
-                        if (++noProgress >= buffers.size()) {
-                            return;
-                        }
+                        noProgress++;
+                        idx++;
 
                         break;
                     default:
                         break;
                 }
 
-                if (++idx == buffers.size()) {
+                if (noProgress >= remoteSources.size()) {
+                    break;
+                }
+
+                if (idx == remoteSources.size()) {
                     idx = 0;
                 }
             }
@@ -322,22 +314,22 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
             inLoop = false;
         }
 
-        if (requested > 0 && buffers.isEmpty()) {
+        for (RemoteSource<?> source : remoteSources) {
+            source.requestNextBatchIfNeeded();
+        }
+
+        if (requested > 0 && remoteSources.isEmpty()) {
             requested = 0;
             downstream().end();
         }
     }
 
-    private void acknowledge(String nodeName, int batchId) throws 
IgniteInternalCheckedException {
-        exchange.acknowledge(nodeName, queryId(), srcFragmentId, exchangeId, 
batchId);
-    }
-
-    private Buffer getOrCreateBuffer(String nodeName) {
-        return perNodeBuffers.computeIfAbsent(nodeName, this::createBuffer);
+    private void requestBatches(String nodeName, int cnt) throws 
IgniteInternalCheckedException {
+        exchange.request(nodeName, queryId(), srcFragmentId, exchangeId, cnt);
     }
 
-    private Buffer createBuffer(String nodeName) {
-        return new Buffer(nodeName);
+    private RemoteSource<RowT> getOrCreateBuffer(String nodeName) {
+        return perNodeBuffers.computeIfAbsent(nodeName, name -> new 
RemoteSource<>(cnt -> requestBatches(name, cnt)));
     }
 
     /**
@@ -404,118 +396,182 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
 
         /** {@inheritDoc} */
         @Override
-        public int compareTo(@NotNull Inbox.Batch<RowT> o) {
+        public int compareTo(Inbox.Batch<RowT> o) {
             return Integer.compare(batchId, o.batchId);
         }
     }
 
-    private enum State {
-        END,
-
-        READY,
-
-        WAITING
-    }
+    /**
+     * An object to keep track of batches and their order from particular 
remote source.
+     *
+     * @param <RowT> A type if the rows received in batches.
+     * @see State
+     */
+    static final class RemoteSource<RowT> {
+        @FunctionalInterface
+        private interface BatchRequester {
+            void request(int amountOfBatches) throws 
IgniteInternalCheckedException;
+        }
 
-    private static final Batch<?> WAITING = new Batch<>(0, false, null);
+        /**
+         * A enumeration of all possible states of the {@link RemoteSource 
remote source}. Below is a state diagram showing possible
+         * transitions from one state to another.
+         *
+         * <p>Node: "out of order", "next received", "and batch drained" are 
ephemeral states, thus not presented in enumeration.
+         * <pre>
+         *                    +---+
+         *                    | * |
+         *                    +---+
+         *                      |
+         *                      v
+         *                 +---------+
+         *                 | WAITING |<-----\
+         *                 +---------+       \
+         *                  /       ^         \
+         *       batch received     |          \
+         *                /         |           \
+         *               v          |            \
+         *        /-------------\   |            |
+         *       | out of order? |  |            |
+         *        \-------------/   |            |
+         *           /       \      |            no
+         *          no       yes   /             |
+         *          |          \__/              |
+         *          v                            |
+         *      +-------+                 /--------------\
+         *      | READY |<-----yes-------| next received? |
+         *      +-------+                 \--------------/
+         *           \___                        ^
+         *               \                       |
+         *           batch drained               /
+         *                 \____                /
+         *                      v              no
+         *                /-----------\       /
+         *               | last batch? |-----/
+         *                \-----------/
+         *                      |
+         *                     yes
+         *                      |
+         *                      v
+         *                   +-----+
+         *                   | END |
+         *                   +-----+
+         *                      |
+         *                      v
+         *                    +---+
+         *                    | * |
+         *                    +---+
+         * </pre>
+         */
+        enum State {
+            /** Last batch was received and has already drained. No more data 
expected from this source. */
+            END,
+
+            /** Batch with expected id is received and ready to be drained. */
+            READY,
+
+            /** Batch with expected id is not received yet. */
+            WAITING
+        }
 
-    private static final Batch<?> END = new Batch<>(0, false, null);
+        private final PriorityQueue<Batch<RowT>> batches = new 
PriorityQueue<>(IO_BATCH_CNT);
 
-    private final class Buffer {
-        private final String nodeName;
+        private final BatchRequester batchRequester;
 
+        private State state = State.WAITING;
         private int lastEnqueued = -1;
+        private int lastRequested = -1;
+        private @Nullable Batch<RowT> curr = null;
 
-        private final PriorityQueue<Batch<RowT>> batches = new 
PriorityQueue<>(IO_BATCH_CNT);
-
-        private Batch<RowT> curr = waitingMark();
-
-        private Buffer(String nodeName) {
-            this.nodeName = nodeName;
+        private RemoteSource(BatchRequester batchRequester) {
+            this.batchRequester = batchRequester;
         }
 
-        private void offer(int id, boolean last, List<RowT> rows) {
+        /** A handler for batches received from remote source. */
+        void onBatchReceived(int id, boolean last, List<RowT> rows) {
             batches.offer(new Batch<>(id, last, rows));
-        }
 
-        private Batch<RowT> pollBatch() {
-            if (batches.isEmpty() || batches.peek().batchId != lastEnqueued + 
1) {
-                return waitingMark();
+            if (state == State.WAITING && id == lastEnqueued + 1) {
+                advanceBatch();
             }
-
-            Batch<RowT> batch = batches.poll();
-
-            assert batch != null && batch.batchId == lastEnqueued + 1;
-
-            lastEnqueued = batch.batchId;
-
-            return batch;
         }
 
-        private State check() {
-            if (finished()) {
-                return State.END;
-            }
+        /**
+         * Requests another several batches from remote source if a count of 
in-flight batches
+         * is less or equal than half of {@link #IO_BATCH_CNT}.
+         */
+        void requestNextBatchIfNeeded() throws IgniteInternalCheckedException {
+            int inFlightCount = lastRequested - lastEnqueued;
 
-            if (waiting()) {
-                return State.WAITING;
-            }
+            // IO_BATCH_CNT should never be less than 1, but we don't have 
validation
+            if (IO_BATCH_CNT <= 1 && inFlightCount == 0) {
+                batchRequester.request(1);
+            } else if (IO_BATCH_CNT / 2 >= inFlightCount) {
+                int countOfBatches = IO_BATCH_CNT - inFlightCount;
 
-            if (isEnd()) {
-                curr = finishedMark();
+                lastRequested += countOfBatches;
 
-                return State.END;
+                batchRequester.request(countOfBatches);
             }
+        }
 
-            return State.READY;
+        /** Returns the state of the source. */
+        State check() {
+            return state;
         }
 
-        private RowT peek() {
+        /**
+         * Returns the first element of a buffer without removing.
+         *
+         * @return The first element of a buffer.
+         */
+        RowT peek() {
+            assert state == State.READY;
             assert curr != null;
-            assert curr != WAITING;
-            assert curr != END;
-            assert !isEnd();
 
             return curr.rows.get(curr.idx);
         }
 
-        private RowT remove() throws IgniteInternalCheckedException {
+        /**
+         * Removes the first element from a buffer.
+         *
+         * @return The removed element.
+         */
+        RowT remove() {
+            assert state == State.READY;
             assert curr != null;
-            assert curr != WAITING;
-            assert curr != END;
-            assert !isEnd();
 
             RowT row = curr.rows.set(curr.idx++, null);
 
             if (curr.idx == curr.rows.size()) {
-                acknowledge(nodeName, curr.batchId);
-
-                if (!isEnd()) {
-                    curr = pollBatch();
+                if (curr.last) {
+                    state = State.END;
+                } else {
+                    advanceBatch();
                 }
             }
 
             return row;
         }
 
-        private boolean finished() {
-            return curr == END;
+        private boolean hasNextBatch() {
+            return !batches.isEmpty() && batches.peek().batchId == 
lastEnqueued + 1;
         }
 
-        private boolean waiting() {
-            return curr == WAITING && (curr = pollBatch()) == WAITING;
-        }
+        private void advanceBatch() {
+            if (!hasNextBatch()) {
+                state = State.WAITING;
 
-        private boolean isEnd() {
-            return curr.last && curr.idx == curr.rows.size();
-        }
+                return;
+            }
 
-        private Batch<RowT> finishedMark() {
-            return (Batch<RowT>) END;
-        }
+            curr = batches.poll();
+
+            assert curr != null;
+
+            state = curr.rows.isEmpty() ? State.END : State.READY;
 
-        private Batch<RowT> waitingMark() {
-            return (Batch<RowT>) WAITING;
+            lastEnqueued = curr.batchId;
         }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index d18a2ef5c0..69b608995d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -17,17 +17,14 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
-import static 
org.apache.ignite.internal.sql.engine.util.Commons.IN_BUFFER_SIZE;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
@@ -36,39 +33,34 @@ import 
org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
 import org.apache.ignite.internal.sql.engine.trait.Destination;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * A part of exchange.
+ * A part of exchange which sends batches to a remote downstream.
  */
 public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, 
SingleNode<RowT>, Downstream<RowT> {
     private static final IgniteLogger LOG = Loggers.forClass(Outbox.class);
 
-    private final ExchangeService exchange;
-
-    private final MailboxRegistry registry;
-
     private final long exchangeId;
-
     private final long targetFragmentId;
-
+    private final ExchangeService exchange;
+    private final MailboxRegistry registry;
     private final Destination<RowT> dest;
 
     private final Deque<RowT> inBuf = new ArrayDeque<>(inBufSize);
-
-    private final Map<String, Buffer> nodeBuffers = new HashMap<>();
+    private final Map<String, RemoteDownstream<RowT>> nodeBuffers = new 
HashMap<>();
 
     private int waiting;
 
     /**
      * Constructor.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      *
-     * @param ctx Execution context.
-     * @param exchange Exchange service.
-     * @param registry Mailbox registry.
-     * @param exchangeId Exchange ID.
-     * @param targetFragmentId Target fragment ID.
-     * @param dest Destination.
+     * @param ctx An execution context.
+     * @param exchange A service that provide a way for Inbox and Outbox to 
communicate with each other.
+     * @param registry A registry of all created inboxes and outboxes.
+     * @param exchangeId An identifier of the exchange this outbox is part of.
+     * @param targetFragmentId An identifier of the fragment to send batches 
to.
+     * @param dest A function which determines which row to send on which 
remote.
      */
     public Outbox(
             ExecutionContext<RowT> ctx,
@@ -84,6 +76,14 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
         this.targetFragmentId = targetFragmentId;
         this.exchangeId = exchangeId;
         this.dest = dest;
+
+        initBuffers();
+    }
+
+    private void initBuffers() {
+        for (String nodeName : dest.targets()) {
+            nodeBuffers.put(nodeName, new RemoteDownstream<>(nodeName, 
this::sendBatch));
+        }
     }
 
     /** {@inheritDoc} */
@@ -93,28 +93,35 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
     }
 
     /**
-     * Callback method.
+     * A handler with saves the demand from remote downstream and starts the 
execution.
      *
-     * @param nodeName Target consistent ID.
-     * @param batchId Batch ID.
+     * @param nodeName An identifier of the demander.
+     * @param amountOfBatches A count of demanded batches.
      */
-    public void onAcknowledge(String nodeName, int batchId) throws Exception {
-        assert nodeBuffers.containsKey(nodeName);
-
+    public void onRequest(String nodeName, int amountOfBatches) throws 
Exception {
         checkState();
 
-        nodeBuffers.get(nodeName).acknowledge(batchId);
+        RemoteDownstream<?> downstream = getOrCreateBuffer(nodeName);
+
+        downstream.onBatchRequested(amountOfBatches);
+
+        if (waiting != -1 || !inBuf.isEmpty()) {
+            flush();
+        }
     }
 
     /**
-     * Init.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Starts the execution of the fragment and keeps the result in the 
intermediate buffer.
+     *
+     * <p>Note: this method must be called by the same thread that will 
execute the whole fragment.
      */
-    public void init() {
+    public void prefetch() {
         try {
             checkState();
 
-            flush();
+            if (waiting == 0) {
+                source().request(waiting = inBufSize);
+            }
         } catch (Throwable t) {
             onError(t);
         }
@@ -186,7 +193,11 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
     /** {@inheritDoc} */
     @Override
     protected void rewindInternal() {
-        throw new UnsupportedOperationException();
+        inBuf.clear();
+        nodeBuffers.clear();
+        waiting = 0;
+
+        initBuffers();
     }
 
     /** {@inheritDoc} */
@@ -215,31 +226,32 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
         }
     }
 
-    private Buffer getOrCreateBuffer(String nodeName) {
-        return nodeBuffers.computeIfAbsent(nodeName, this::createBuffer);
-    }
-
-    private Buffer createBuffer(String nodeName) {
-        return new Buffer(nodeName);
+    private RemoteDownstream<RowT> getOrCreateBuffer(String nodeName) {
+        return nodeBuffers.computeIfAbsent(nodeName, name -> new 
RemoteDownstream<>(name, this::sendBatch));
     }
 
     private void flush() throws Exception {
         while (!inBuf.isEmpty()) {
             checkState();
 
-            Collection<Buffer> buffers = dest.targets(inBuf.peek()).stream()
-                    .map(this::getOrCreateBuffer)
-                    .collect(Collectors.toList());
+            List<String> targets = dest.targets(inBuf.peek());
+            List<RemoteDownstream<RowT>> buffers = new 
ArrayList<>(targets.size());
 
-            assert !nullOrEmpty(buffers);
+            for (String target : targets) {
+                RemoteDownstream<RowT> buffer = getOrCreateBuffer(target);
+
+                if (!buffer.ready()) {
+                    return;
+                }
 
-            if (!buffers.stream().allMatch(Buffer::ready)) {
-                return;
+                buffers.add(buffer);
             }
 
+            assert !nullOrEmpty(buffers);
+
             RowT row = inBuf.remove();
 
-            for (Buffer dest : buffers) {
+            for (RemoteDownstream<RowT> dest : buffers) {
                 dest.add(row);
             }
         }
@@ -247,10 +259,10 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
         assert inBuf.isEmpty();
 
         if (waiting == 0) {
-            source().request(waiting = IN_BUFFER_SIZE);
+            source().request(waiting = inBufSize);
         } else if (waiting == -1) {
-            for (String node : dest.targets()) {
-                getOrCreateBuffer(node).end();
+            for (RemoteDownstream<RowT> buffer : nodeBuffers.values()) {
+                buffer.end();
             }
         }
     }
@@ -265,101 +277,164 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
         }
     }
 
-    private final class Buffer {
-        private final String nodeName;
+    private static final class RemoteDownstream<RowT> {
+        @FunctionalInterface
+        private interface BatchSender<RowT> {
+            void send(String targetNodeName, int batchId, boolean last, 
List<RowT> rows) throws IgniteInternalCheckedException;
+        }
 
-        private int hwm = -1;
+        /**
+         * A enumeration of all possible states of the {@link RemoteDownstream 
remote downstream}. Below is a state diagram showing possible
+         * transitions from one state to another.
+         *
+         * <p>Node: "batch is full" is ephemeral state, thus not presented in 
enumeration.
+         * <pre>
+         *                    +---+
+         *                    | * |
+         *                    +---+
+         *                      |
+         *                      v
+         *                 +---------+
+         *        /--------| FILLING |<---\
+         *       /         +---------+     \
+         *      |           /       ^       \
+         *      |    row added       \       |
+         *      |          \         no      |
+         *      |           v        /       |
+         *      |       /---------------\    |
+         *      |      | batch is full?  |   |
+         *      |       \---------------/    |
+         *      |               |            |
+         *      |              yes          /
+         *      |               |       batch sent
+         * downstream ended     v         /
+         *      |           +-------+    /
+         *      |           | FULL  |---/
+         *      |           +-------+
+         *      |               |
+         *       \     downstream ended
+         *        \             |
+         *         \            v
+         *          \     /-----------\
+         *           \-->| LAST BATCH  |
+         *                \-----------/
+         *                      |
+         *                  batch sent
+         *                      |
+         *                      v
+         *                   +-----+
+         *                   | END |
+         *                   +-----+
+         *                      |
+         *                      v
+         *                    +---+
+         *                    | * |
+         *                    +---+
+         * </pre>
+         */
+        enum State {
+            /** Batch is ready to accept at leas one row. */
+            FILLING,
+
+            /** Batch is full, thus is ready to be sent. */
+            FULL,
 
-        private int lwm = -1;
+            /** No more rows are expected to be added to downstream. The next 
batch will be sent, probably, partially filled. */
+            LAST_BATCH,
 
-        private List<RowT> curr;
+            /** Downstream is closed. All resources were released. */
+            END
+        }
+
+        private final String nodeName;
+        private final BatchSender<RowT> sender;
 
-        private Buffer(String nodeName) {
+        private State state = State.FILLING;
+        private int lastSentBatchId = -1;
+
+        private @Nullable List<RowT> curr;
+        private int pendingCount;
+
+        private RemoteDownstream(String nodeName, BatchSender<RowT> sender) {
             this.nodeName = nodeName;
+            this.sender = sender;
 
             curr = new ArrayList<>(IO_BATCH_SIZE);
         }
 
-        /**
-         * Checks whether there is a place for a new row.
-         *
-         * @return {@code True} is it possible to add a row to a batch.
-         */
-        private boolean ready() {
-            if (hwm == Integer.MAX_VALUE) {
-                return false;
+        /** A handler of a requests from downstream. */
+        void onBatchRequested(int amountOfBatches) throws Exception {
+            assert amountOfBatches > 0 : amountOfBatches;
+
+            this.pendingCount = amountOfBatches;
+
+            // if there is a batch which is ready to be sent, then just sent it
+            if (state == State.FULL || state == State.LAST_BATCH) {
+                sendBatch();
             }
+        }
 
-            return curr.size() < IO_BATCH_SIZE || hwm - lwm < IO_BATCH_CNT;
+        /** Returns {@code true} if this downstream is ready to accepts at 
least one more row. */
+        boolean ready() {
+            return state == State.FILLING;
         }
 
         /**
          * Adds a row to current batch.
          *
-         * @param row Row.
+         * @param row Row to add.
          */
-        public void add(RowT row) throws IgniteInternalCheckedException {
-            assert ready();
+        void add(RowT row) throws Exception {
+            assert ready() : state;
+            assert curr != null;
+
+            curr.add(row);
 
             if (curr.size() == IO_BATCH_SIZE) {
-                sendBatch(nodeName, ++hwm, false, curr);
+                state = State.FULL;
 
-                curr = new ArrayList<>(IO_BATCH_SIZE);
+                if (pendingCount > 0) {
+                    sendBatch();
+                }
             }
-
-            curr.add(row);
         }
 
-        /**
-         * Signals data is over.
-         */
-        public void end() throws IgniteInternalCheckedException {
-            if (hwm == Integer.MAX_VALUE) {
-                return;
-            }
+        /** Sends current batch to remote downstream. */
+        void sendBatch() throws Exception {
+            assert pendingCount > 0;
+            assert state == State.FULL || state == State.LAST_BATCH : state;
+            assert curr != null;
 
-            int batchId = hwm + 1;
-            hwm = Integer.MAX_VALUE;
+            boolean lastBatch = state == State.LAST_BATCH;
 
-            List<RowT> tmp = curr;
-            curr = null;
+            sender.send(nodeName, ++lastSentBatchId, lastBatch, curr);
 
-            sendBatch(nodeName, batchId, true, tmp);
-        }
+            pendingCount--;
 
-        /**
-         * Callback method.
-         *
-         * @param id batch ID.
-         */
-        private void acknowledge(int id) throws Exception {
-            if (lwm > id) {
-                return;
+            if (lastBatch) {
+                state = State.END;
+                curr = null;
+            } else {
+                state = State.FILLING;
+                curr = new ArrayList<>(IO_BATCH_SIZE);
             }
+        }
 
-            boolean readyBefore = ready();
+        /** Completes this downstream by sending all collected so far rows. */
+        void end() throws Exception {
+            assert state == State.FILLING || state == State.FULL : state;
 
-            lwm = id;
+            state = State.LAST_BATCH;
 
-            if (!readyBefore && ready()) {
-                flush();
+            if (pendingCount > 0) {
+                sendBatch();
             }
         }
 
-        public void close() {
-            final int currBatchId = hwm;
-
-            if (hwm == Integer.MAX_VALUE) {
-                return;
-            }
-
-            hwm = Integer.MAX_VALUE;
-
+        /** Closes this downstream and clears all acquired resources. */
+        void close() {
             curr = null;
-
-            if (currBatchId >= 0) {
-                sendInboxClose(nodeName);
-            }
+            state = State.END;
         }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchAcknowledgeMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
similarity index 70%
rename from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchAcknowledgeMessage.java
rename to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
index 6076d51a47..7e1d7460b4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchAcknowledgeMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
@@ -20,18 +20,13 @@ package org.apache.ignite.internal.sql.engine.message;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * QueryBatchAcknowledgeMessage interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
  */
-@Transferable(value = SqlQueryMessageGroup.QUERY_BATCH_ACK)
-public interface QueryBatchAcknowledgeMessage extends 
ExecutionContextAwareMessage {
-    /**
-     * Get exchange ID.
-     */
+@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
+public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
+    /** Returns an identifier of the exchange to request batches from. */
     long exchangeId();
 
-    /**
-     * Get batch ID.
-     */
-    int batchId();
+    /** Returns amount of batches to request. */
+    int amountOfBatches();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
index 69bfa8c43f..d4cc366191 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
@@ -36,7 +36,8 @@ public final class SqlQueryMessageGroup {
 
     public static final short QUERY_BATCH_MESSAGE = 3;
 
-    public static final short QUERY_BATCH_ACK = 4;
+    /** See {@link QueryBatchRequestMessage} for details. */
+    public static final short QUERY_BATCH_REQUEST = 4;
 
     public static final short INBOX_CLOSE_MESSAGE = 5;
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index 1d388ebe67..a8bd3f6e22 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -128,6 +128,9 @@ public final class Commons {
 
     public static final int IN_BUFFER_SIZE = 512;
 
+    public static final int IO_BATCH_SIZE = 256;
+    public static final int IO_BATCH_COUNT = 4;
+
     /**
      * The number of elements to be prefetched from each partition when 
scanning the sorted index.
      * The higher the value, the fewer calls to the upstream will be, but at 
the same time, the bigger
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
index 7a5d239c77..ecd0889b04 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
@@ -19,9 +19,7 @@ package org.apache.ignite.internal.sql.engine.benchmarks;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 
-import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.schema.NativeTypes;
@@ -54,11 +52,11 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 @Measurement(iterations = 20, time = 1, timeUnit = TimeUnit.SECONDS)
 @BenchmarkMode(Mode.Throughput)
 @OutputTimeUnit(TimeUnit.SECONDS)
-@Fork(1)
+@Fork(3)
 @State(Scope.Benchmark)
 public class SqlBenchmark {
-    private final DataProvider<Object[]> dataProvider = new 
SameRowDataProvider(
-            new Object[] {42, UUID.randomUUID().toString()}, 333
+    private final DataProvider<Object[]> dataProvider = DataProvider.fromRow(
+            new Object[]{42, UUID.randomUUID().toString()}, 3_333
     );
 
     // @formatter:off
@@ -95,7 +93,7 @@ public class SqlBenchmark {
     /** Very simple test to measure performance of minimal possible 
distributed query. */
     @Benchmark
     public void selectAllSimple(Blackhole bh) {
-        for (var row : 
await(gatewayNode.executePlan(plan).requestNextAsync(1_000)).items()) {
+        for (var row : 
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
             bh.consume(row);
         }
     }
@@ -114,37 +112,4 @@ public class SqlBenchmark {
 
         new Runner(build).run();
     }
-
-    private static class SameRowDataProvider implements DataProvider<Object[]> 
{
-        private final int times;
-        private final Object[] row;
-
-        SameRowDataProvider(Object[] row, int times) {
-            this.times = times;
-            this.row = row;
-        }
-
-        @Override
-        public Iterator<Object[]> iterator() {
-            return new Iterator<>() {
-                private int counter;
-
-                @Override
-                public boolean hasNext() {
-                    return counter < times;
-                }
-
-                @Override
-                public Object[] next() {
-                    if (!hasNext()) {
-                        throw new NoSuchElementException();
-                    }
-
-                    counter++;
-
-                    return row;
-                }
-            };
-        }
-    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 49771678a9..23b55a970f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -347,14 +347,14 @@ public class ExecutionServiceImplTest {
 
         var messageService = node.messageService();
         var mailboxRegistry = new MailboxRegistryImpl();
-        var clusterNode = new ClusterNode(UUID.randomUUID().toString(), 
nodeName, NetworkAddress.from("127.0.0.1:1111"));
 
-        var exchangeService = new ExchangeServiceImpl(clusterNode, 
taskExecutor, mailboxRegistry, messageService);
+        var exchangeService = new ExchangeServiceImpl(mailboxRegistry, 
messageService);
 
         var schemaManagerMock = mock(SqlSchemaManager.class);
 
         when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table);
 
+        var clusterNode = new ClusterNode(UUID.randomUUID().toString(), 
nodeName, NetworkAddress.from("127.0.0.1:1111"));
         var executionService = new ExecutionServiceImpl<>(
                 clusterNode,
                 messageService,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index ae73d2c3f9..7e81eb4ede 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -64,11 +64,10 @@ public class AbstractExecutionTest extends 
IgniteAbstractTest {
     public static final Object[][] EMPTY = new Object[0][];
 
     private Throwable lastE;
-
-    private QueryTaskExecutorImpl taskExecutor;
-
     private List<UUID> nodes;
 
+    protected QueryTaskExecutorImpl taskExecutor;
+
     @BeforeEach
     public void beforeTest() {
         taskExecutor = new QueryTaskExecutorImpl("no_node");
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
new file mode 100644
index 0000000000..12af90b9a7
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
+import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
+import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
+import org.apache.ignite.internal.sql.engine.framework.ClusterServiceFactory;
+import org.apache.ignite.internal.sql.engine.framework.DataProvider;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.message.MessageService;
+import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
+import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
+import org.apache.ignite.internal.sql.engine.trait.AllNodes;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify Outbox to Inbox interoperation.
+ */
+public class ExchangeExecutionTest extends AbstractExecutionTest {
+    private static final String ROOT_NODE_NAME = "N1";
+    private static final String ANOTHER_NODE_NAME = "N2";
+    private static final List<String> NODE_NAMES = List.of(ROOT_NODE_NAME, 
ANOTHER_NODE_NAME);
+    private static final ClusterNode ROOT_NODE =
+            new ClusterNode(ROOT_NODE_NAME, ROOT_NODE_NAME, 
NetworkAddress.from("127.0.0.1:10001"));
+    private static final ClusterNode ANOTHER_NODE =
+            new ClusterNode(ANOTHER_NODE_NAME, ANOTHER_NODE_NAME, 
NetworkAddress.from("127.0.0.1:10002"));
+    private static final int SOURCE_FRAGMENT_ID = 0;
+    private static final int TARGET_FRAGMENT_ID = 1;
+    private static final Comparator<Object[]> COMPARATOR = 
Comparator.comparingInt(o -> (Integer) o[0]);
+
+    private final Map<String, MailboxRegistry> mailboxes = new HashMap<>();
+    private final Map<String, ExchangeService> exchangeServices = new 
HashMap<>();
+    private final ClusterServiceFactory serviceFactory = 
TestBuilders.clusterServiceFactory(List.of(ROOT_NODE_NAME, ANOTHER_NODE_NAME));
+
+    @ParameterizedTest(name = "rowCount={0}, prefetch={1}, ordered={2}")
+    @MethodSource("testArgs")
+    public void test(int rowCount, boolean prefetch, boolean ordered) {
+        UUID queryId = UUID.randomUUID();
+
+        List<Outbox<?>> sourceFragments = new ArrayList<>();
+
+        int idx = 0;
+        for (ClusterNode node : List.of(ROOT_NODE, ANOTHER_NODE)) {
+            Outbox<?> outbox = createSourceFragment(
+                    queryId,
+                    idx++,
+                    node,
+                    serviceFactory,
+                    rowCount
+            );
+
+            sourceFragments.add(outbox);
+        }
+
+        if (prefetch) {
+            for (Outbox<?> outbox : sourceFragments) {
+                await(outbox.context().submit(outbox::prefetch, 
outbox::onError));
+            }
+        }
+
+        AsyncRootNode<Object[], Object[]> root = createRootFragment(
+                queryId,
+                ROOT_NODE,
+                NODE_NAMES,
+                ordered,
+                serviceFactory
+        );
+
+        int expectedRowCount = NODE_NAMES.size() * rowCount;
+
+        BatchedResult<Object[]> res = 
await(root.requestNextAsync(expectedRowCount));
+
+        assertEquals(expectedRowCount, res.items().size());
+
+        if (ordered) {
+            List<Object[]> expected = new ArrayList<>(res.items());
+            expected.sort(COMPARATOR);
+
+            assertEquals(expected, res.items());
+        }
+    }
+
+    private static Stream<Arguments> testArgs() {
+        List<Integer> sizes = List.of(
+                // half of the batch size
+                Math.min(Commons.IO_BATCH_SIZE / 2, 1),
+
+                // full batch
+                Commons.IO_BATCH_SIZE,
+
+                // full batch + one extra row
+                Commons.IO_BATCH_SIZE + 1,
+
+                // several batches
+                2 * Commons.IO_BATCH_SIZE + 1,
+
+                // more than count of so called "in-flight" batches. In flight 
batches
+                // are batches that have been sent but not yet acknowledged
+                2 * Commons.IO_BATCH_SIZE * Commons.IO_BATCH_COUNT
+        );
+
+        List<Boolean> trueFalseList = List.of(true, false);
+
+        List<Arguments> args = new ArrayList<>(2 * sizes.size());
+        for (int size : sizes) {
+            for (boolean prefetch : trueFalseList) {
+                for (boolean ordered : trueFalseList) {
+                    args.add(Arguments.of(size, prefetch, ordered));
+                }
+            }
+        }
+
+        return args.stream();
+    }
+
+    private AsyncRootNode<Object[], Object[]> createRootFragment(
+            UUID queryId,
+            ClusterNode localNode,
+            List<String> sourceNodeNames,
+            boolean ordered,
+            ClusterServiceFactory serviceFactory
+    ) {
+        ExecutionContext<Object[]> targetCtx = TestBuilders.executionContext()
+                .queryId(queryId)
+                .executor(taskExecutor)
+                .fragment(new FragmentDescription(TARGET_FRAGMENT_ID, null, 
null, Long2ObjectMaps.emptyMap()))
+                .localNode(localNode)
+                .build();
+
+        Comparator<Object[]> comparator = ordered ? COMPARATOR : null;
+
+        MailboxRegistry mailboxRegistry = 
mailboxes.computeIfAbsent(localNode.name(), name -> new MailboxRegistryImpl());
+        ExchangeService exchangeService = 
exchangeServices.computeIfAbsent(localNode.name(), name ->
+                
createExchangeService(serviceFactory.forNode(localNode.name()), 
mailboxRegistry));
+
+        Inbox<Object[]> inbox = new Inbox<>(
+                targetCtx, exchangeService, mailboxRegistry, sourceNodeNames, 
comparator, SOURCE_FRAGMENT_ID, SOURCE_FRAGMENT_ID
+        );
+
+        mailboxRegistry.register(inbox);
+
+        AsyncRootNode<Object[], Object[]> root = new AsyncRootNode<>(
+                inbox, Function.identity()
+        );
+
+        inbox.onRegister(root);
+
+        return root;
+    }
+
+    private Outbox<?> createSourceFragment(
+            UUID queryId,
+            int rowValue,
+            ClusterNode localNode,
+            ClusterServiceFactory serviceFactory,
+            int rowCount
+    ) {
+        ExecutionContext<Object[]> sourceCtx = TestBuilders.executionContext()
+                .queryId(queryId)
+                .executor(taskExecutor)
+                .fragment(new FragmentDescription(SOURCE_FRAGMENT_ID, null, 
null, Long2ObjectMaps.emptyMap()))
+                .localNode(localNode)
+                .build();
+
+        MailboxRegistry mailboxRegistry = 
mailboxes.computeIfAbsent(localNode.name(), name -> new MailboxRegistryImpl());
+        ExchangeService exchangeService = 
exchangeServices.computeIfAbsent(localNode.name(), name ->
+                
createExchangeService(serviceFactory.forNode(localNode.name()), 
mailboxRegistry));
+
+        Outbox<Object[]> outbox = new Outbox<>(
+                sourceCtx, exchangeService, mailboxRegistry, 
SOURCE_FRAGMENT_ID,
+                TARGET_FRAGMENT_ID, new AllNodes<>(List.of(ROOT_NODE_NAME))
+        );
+        mailboxRegistry.register(outbox);
+
+        ScanNode<Object[]> source = new ScanNode<>(sourceCtx, 
DataProvider.fromRow(new Object[]{rowValue, rowValue}, rowCount));
+
+        outbox.register(source);
+
+        return outbox;
+    }
+
+    private ExchangeService createExchangeService(ClusterService 
clusterService, MailboxRegistry mailboxRegistry) {
+        MessageService messageService = new MessageServiceImpl(
+                clusterService.topologyService(),
+                clusterService.messagingService(),
+                taskExecutor,
+                new IgniteSpinBusyLock()
+        );
+
+        ExchangeService exchangeService = new ExchangeServiceImpl(
+                mailboxRegistry,
+                messageService
+        );
+
+        messageService.start();
+        exchangeService.start();
+
+        return exchangeService;
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
similarity index 95%
rename from 
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
rename to 
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
index a3cf08296d..98198e2e65 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
@@ -43,7 +43,7 @@ import org.jetbrains.annotations.Nullable;
  * Auxiliary object to create the associated {@link MessagingService}
  * and {@link TopologyService} for each node in the cluster.
  */
-class TestClusterService {
+public class ClusterServiceFactory {
     private final List<String> allNodes;
 
     private final Map<String, LocalMessagingService> messagingServicesByNode = 
new ConcurrentHashMap<>();
@@ -54,11 +54,17 @@ class TestClusterService {
      *
      * @param allNodes A collection of nodes to create cluster service from.
      */
-    TestClusterService(List<String> allNodes) {
+    ClusterServiceFactory(List<String> allNodes) {
         this.allNodes = allNodes;
     }
 
-    ClusterService spawnForNode(String nodeName) {
+    /**
+     * Creates an instance of {@link ClusterService} for a given node.
+     *
+     * @param nodeName A name of the node to create cluster service for.
+     * @return An instance of cluster service.
+     */
+    public ClusterService forNode(String nodeName) {
         return new ClusterService() {
             /** {@inheritDoc} */
             @Override
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
index a143f4c6ae..94d7a61b2a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.sql.engine.framework;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 /**
  * Producer of the rows to use with {@link TestTable} in execution-related 
scenarios.
@@ -40,4 +42,44 @@ public interface DataProvider<T> extends Iterable<T> {
     static <T> DataProvider<T> fromCollection(Collection<T> collection) {
         return collection::iterator;
     }
+
+    /**
+     * Creates data provider from repeating the given row specified amount of 
times.
+     *
+     * @param row A row to repeat.
+     * @param repeatTimes An amount of times to repeat the row.
+     * @param <T> A type of the produced elements.
+     * @return A data provider instance.
+     */
+    static <T> DataProvider<T> fromRow(T row, int repeatTimes) {
+        return new DataProvider<>() {
+            private final int times = repeatTimes;
+
+            /** {@inheritDoc} */
+            @Override
+            public Iterator<T> iterator() {
+                return new Iterator<>() {
+                    private int counter;
+
+                    /** {@inheritDoc} */
+                    @Override
+                    public boolean hasNext() {
+                        return counter < times;
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override
+                    public T next() {
+                        if (!hasNext()) {
+                            throw new NoSuchElementException();
+                        }
+
+                        counter++;
+
+                        return row;
+                    }
+                };
+            }
+        };
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 6a6ae1bb52..7885aa5b5e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -18,24 +18,35 @@
 package org.apache.ignite.internal.sql.engine.framework;
 
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
+import static org.mockito.Mockito.mock;
 
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.calcite.schema.Table;
 import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
 
 /**
  * A collection of builders to create test objects.
@@ -51,6 +62,16 @@ public class TestBuilders {
         return new TableBuilderImpl();
     }
 
+    /** Returns a builder of the execution context. */
+    public static ExecutionContextBuilder executionContext() {
+        return new ExecutionContextBuilderImpl();
+    }
+
+    /** Factory method to create a cluster service factory for cluster 
consisting of provided nodes. */
+    public static ClusterServiceFactory clusterServiceFactory(List<String> 
nodes) {
+        return new ClusterServiceFactory(nodes);
+    }
+
     /**
      * A builder to create a test cluster object.
      *
@@ -111,6 +132,88 @@ public class TestBuilders {
         ClusterTableBuilder defaultDataProvider(DataProvider<?> dataProvider);
     }
 
+    /**
+     * A builder to create an execution context.
+     *
+     * @see ExecutionContext
+     */
+    public interface ExecutionContextBuilder {
+        /** Sets the identifier of the query. */
+        ExecutionContextBuilder queryId(UUID queryId);
+
+        /** Sets the description of the fragment this context will be created 
for. */
+        ExecutionContextBuilder fragment(FragmentDescription 
fragmentDescription);
+
+        /** Sets the query task executor. */
+        ExecutionContextBuilder executor(QueryTaskExecutor executor);
+
+        /** Sets the node this fragment will be executed on. */
+        ExecutionContextBuilder localNode(ClusterNode node);
+
+        /**
+         * Builds the context object.
+         *
+         * @return Created context object.
+         */
+        ExecutionContext<Object[]> build();
+    }
+
+    private static class ExecutionContextBuilderImpl implements 
ExecutionContextBuilder {
+        private FragmentDescription description = new FragmentDescription(0, 
null, null, Long2ObjectMaps.emptyMap());
+
+        private UUID queryId = null;
+        private QueryTaskExecutor executor = null;
+        private ClusterNode node = null;
+
+        /** {@inheritDoc} */
+        @Override
+        public ExecutionContextBuilder queryId(UUID queryId) {
+            this.queryId = Objects.requireNonNull(queryId, "queryId");
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ExecutionContextBuilder fragment(FragmentDescription 
fragmentDescription) {
+            this.description = Objects.requireNonNull(fragmentDescription, 
"fragmentDescription");
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ExecutionContextBuilder executor(QueryTaskExecutor executor) {
+            this.executor = Objects.requireNonNull(executor, "executor");
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ExecutionContextBuilder localNode(ClusterNode node) {
+            this.node = Objects.requireNonNull(node, "node");
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public ExecutionContext<Object[]> build() {
+            return new ExecutionContext<>(
+                    BaseQueryContext.builder().build(),
+                    Objects.requireNonNull(executor, "executor"),
+                    queryId,
+                    Objects.requireNonNull(node, "node"),
+                    node.name(),
+                    description,
+                    ArrayRowHandler.INSTANCE,
+                    Map.of(),
+                    mock(InternalTransaction.class)
+            );
+        }
+    }
+
     private static class ClusterBuilderImpl implements ClusterBuilder {
         private final List<ClusterTableBuilderImpl> tableBuilders = new 
ArrayList<>();
         private List<String> nodeNames;
@@ -135,7 +238,7 @@ public class TestBuilders {
         /** {@inheritDoc} */
         @Override
         public TestCluster build() {
-            var clusterService = new TestClusterService(nodeNames);
+            var clusterService = new ClusterServiceFactory(nodeNames);
 
             for (ClusterTableBuilderImpl tableBuilder : tableBuilders) {
                 validateTableBuilder(tableBuilder);
@@ -149,7 +252,7 @@ public class TestBuilders {
             var schemaManager = new PredefinedSchemaManager(new 
IgniteSchema("PUBLIC", tableMap, null));
 
             Map<String, TestNode> nodes = nodeNames.stream()
-                    .map(name -> new TestNode(name, 
clusterService.spawnForNode(name), schemaManager))
+                    .map(name -> new TestNode(name, 
clusterService.forNode(name), schemaManager))
                     .collect(Collectors.toMap(TestNode::name, 
Function.identity()));
 
             return new TestCluster(nodes);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index cba801716e..bf778cc357 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -104,7 +104,7 @@ public class TestNode implements LifecycleAware {
                 topologyService, messagingService, taskExecutor, new 
IgniteSpinBusyLock()
         ));
         ExchangeService exchangeService = registerService(new 
ExchangeServiceImpl(
-                topologyService.localMember(), taskExecutor, mailboxRegistry, 
messageService
+                mailboxRegistry, messageService
         ));
 
         executionService = registerService(new ExecutionServiceImpl<>(

Reply via email to