This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new bff7ad8  IGNITE-13198: Calcite integration. Rework error / cancel 
logic at execution. This closes #7981
bff7ad8 is described below

commit bff7ad8d6aa36e1381b9e83be456c47a65d992e1
Author: Taras Ledkov <tled...@gridgain.com>
AuthorDate: Tue Aug 25 22:00:27 2020 +0300

    IGNITE-13198: Calcite integration. Rework error / cancel logic at 
execution. This closes #7981
---
 .../query/calcite/exec/ExchangeService.java        |  27 +-
 .../query/calcite/exec/ExchangeServiceImpl.java    |  63 ++++-
 .../query/calcite/exec/ExecutionContext.java       |  18 --
 .../query/calcite/exec/ExecutionServiceImpl.java   | 289 +++++++--------------
 .../processors/query/calcite/exec/IndexScan.java   | 256 +++++++++---------
 .../query/calcite/exec/LogicalRelImplementor.java  |   4 +-
 .../query/calcite/exec/MailboxRegistry.java        |  12 +-
 .../query/calcite/exec/MailboxRegistryImpl.java    |  91 +++++--
 .../processors/query/calcite/exec/TableScan.java   | 251 ------------------
 .../query/calcite/exec/rel/AbstractJoinNode.java   |  15 ++
 .../query/calcite/exec/rel/AbstractNode.java       |  23 +-
 .../query/calcite/exec/rel/AggregateNode.java      |   9 +
 .../query/calcite/exec/rel/FilterNode.java         |   9 +
 .../processors/query/calcite/exec/rel/Inbox.java   | 122 ++++++---
 .../MessageService.java => exec/rel/Mailbox.java}  |  32 ++-
 .../query/calcite/exec/rel/ModifyNode.java         |   9 +
 .../processors/query/calcite/exec/rel/Node.java    |   7 +-
 .../processors/query/calcite/exec/rel/Outbox.java  |  93 ++++---
 .../query/calcite/exec/rel/ProjectNode.java        |   9 +
 .../query/calcite/exec/rel/RootNode.java           | 117 +++++----
 .../query/calcite/exec/rel/ScanNode.java           |  45 +++-
 .../query/calcite/exec/rel/SortNode.java           |  43 ++-
 .../{InboxCancelMessage.java => ErrorMessage.java} |  86 +++---
 ...oxCancelMessage.java => InboxCloseMessage.java} |  57 ++--
 .../query/calcite/message/MessageService.java      |  19 ++
 .../query/calcite/message/MessageServiceImpl.java  |  23 +-
 .../query/calcite/message/MessageType.java         |   8 +-
 ...yCancelRequest.java => OutboxCloseMessage.java} |  64 ++++-
 .../query/calcite/message/QueryStartRequest.java   |  13 +-
 .../RemoteException.java}                          |  52 +++-
 .../processors/query/calcite/prepare/Cloner.java   |   1 +
 .../processors/query/calcite/util/Commons.java     |  14 -
 .../processors/query/calcite/CancelTest.java       | 277 ++++++++++++++++++++
 .../processors/query/calcite/PlannerTest.java      |   9 +-
 .../calcite/exec/rel/AbstractExecutionTest.java    |   5 +
 .../calcite/exec/rel/ContinuousExecutionTest.java  |   2 +-
 .../query/calcite/exec/rel/ExecutionTest.java      |  36 +--
 .../ignite/testsuites/IgniteCalciteTestSuite.java  |   4 +-
 38 files changed, 1224 insertions(+), 990 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index 731023b..ac1f58f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -56,7 +56,30 @@ public interface ExchangeService extends Service {
      * @param qryId Query ID.
      * @param fragmentId Target fragment ID.
      * @param exchangeId Exchange ID.
-     * @param batchId Batch ID.
      */
-    void cancel(UUID nodeId, UUID qryId, long fragmentId, long exchangeId, int 
batchId) throws IgniteCheckedException;
+    void closeInbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) 
throws IgniteCheckedException;
+
+    /**
+     * Sends cancel request.
+     * @param nodeId Target node ID.
+     * @param qryId Query ID.
+     * @param fragmentId Target fragment ID.
+     * @param exchangeId Exchange ID.
+     */
+    void closeOutbox(UUID nodeId, UUID qryId, long fragmentId, long 
exchangeId) throws IgniteCheckedException;
+
+    /**
+     * @param nodeId Target node ID.
+     * @param qryId Query ID.
+     * @param fragmentId Source fragment ID.
+     * @param err Exception to send.
+     * @throws IgniteCheckedException On error marshaling or send ErrorMessage.
+     */
+    void sendError(UUID nodeId, UUID qryId, long fragmentId, Throwable err) 
throws IgniteCheckedException;
+
+    /**
+     * @param nodeId Node ID.
+     * @return {@code true} if node is alive, {@code false} otherwise.
+     */
+    boolean alive(UUID nodeId);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 4ab6cb6..fc2879b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
@@ -27,15 +28,18 @@ import org.apache.ignite.internal.GridKernalContext;
 import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
-import 
org.apache.ignite.internal.processors.query.calcite.message.InboxCancelMessage;
+import 
org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
+import 
org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
+import 
org.apache.ignite.internal.processors.query.calcite.message.OutboxCloseMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescription;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import 
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
@@ -111,8 +115,18 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
     }
 
     /** {@inheritDoc} */
-    @Override public void cancel(UUID nodeId, UUID qryId, long fragmentId, 
long exchangeId, int batchId) throws IgniteCheckedException {
-        messageService().send(nodeId, new InboxCancelMessage(qryId, 
fragmentId, exchangeId, batchId));
+    @Override public void closeOutbox(UUID nodeId, UUID qryId, long 
fragmentId, long exchangeId) throws IgniteCheckedException {
+        messageService().send(nodeId, new OutboxCloseMessage(qryId, 
fragmentId, exchangeId));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void closeInbox(UUID nodeId, UUID qryId, long fragmentId, 
long exchangeId) throws IgniteCheckedException {
+        messageService().send(nodeId, new InboxCloseMessage(qryId, fragmentId, 
exchangeId));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendError(UUID nodeId, UUID qryId, long fragmentId, 
Throwable err) throws IgniteCheckedException {
+        messageService().send(nodeId, new ErrorMessage(qryId, fragmentId, 
err));
     }
 
     /** {@inheritDoc} */
@@ -129,24 +143,46 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
 
     /** {@inheritDoc} */
     @Override public void init() {
-        messageService().register((n, m) -> onMessage(n, (InboxCancelMessage) 
m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
+        messageService().register((n, m) -> onMessage(n, (InboxCloseMessage) 
m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
+        messageService().register((n, m) -> onMessage(n, (OutboxCloseMessage) 
m), MessageType.QUERY_OUTBOX_CANCEL_MESSAGE);
         messageService().register((n, m) -> onMessage(n, 
(QueryBatchAcknowledgeMessage) m), MessageType.QUERY_ACKNOWLEDGE_MESSAGE);
         messageService().register((n, m) -> onMessage(n, (QueryBatchMessage) 
m), MessageType.QUERY_BATCH_MESSAGE);
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean alive(UUID nodeId) {
+        return messageService().alive(nodeId);
+    }
+
     /** */
-    protected void onMessage(UUID nodeId, InboxCancelMessage msg) {
-        Inbox<?> inbox = mailboxRegistry().inbox(msg.queryId(), 
msg.exchangeId());
+    protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
+        Collection<Inbox<?>> inboxes = 
mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
+        if (!F.isEmpty(inboxes)) {
+            for (Inbox<?> inbox : inboxes)
+                inbox.context().execute(inbox::close);
+        }
+        else if (log.isDebugEnabled()) {
+            log.debug("Stale inbox cancel message received: [" +
+                "nodeId=" + nodeId + ", " +
+                "queryId=" + msg.queryId() + ", " +
+                "fragmentId=" + msg.fragmentId() + ", " +
+                "exchangeId=" + msg.exchangeId() + "]");
+        }
+    }
 
-        if (inbox != null)
-            inbox.cancel();
+    /** */
+    protected void onMessage(UUID nodeId, OutboxCloseMessage msg) {
+        Collection<Outbox<?>> outboxes = 
mailboxRegistry().outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
+        if (!F.isEmpty(outboxes)) {
+            for (Outbox<?> outbox : outboxes)
+                outbox.context().execute(outbox::close);
+        }
         else if (log.isDebugEnabled()) {
-            log.debug("Stale cancel message received: [" +
+            log.debug("Stale oubox cancel message received: [" +
                 "nodeId=" + nodeId + ", " +
                 "queryId=" + msg.queryId() + ", " +
                 "fragmentId=" + msg.fragmentId() + ", " +
-                "exchangeId=" + msg.exchangeId() + ", " +
-                "batchId=" + msg.batchId() + "]");
+                "exchangeId=" + msg.exchangeId() + "]");
         }
     }
 
@@ -173,7 +209,7 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
         if (inbox == null && msg.batchId() == 0) {
             // first message sent before a fragment is built
             // note that an inbox source fragment id is also used as an 
exchange id
-            Inbox<?> newInbox = new Inbox<>(baseInboxContext(msg.queryId(), 
msg.fragmentId()),
+            Inbox<?> newInbox = new Inbox<>(baseInboxContext(nodeId, 
msg.queryId(), msg.fragmentId()),
                 this, mailboxRegistry(), msg.exchangeId(), msg.exchangeId());
 
             inbox = mailboxRegistry().register(newInbox);
@@ -194,10 +230,11 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
     /**
      * @return Minimal execution context to meet Inbox needs.
      */
-    private ExecutionContext<?> baseInboxContext(UUID qryId, long fragmentId) {
+    private ExecutionContext<?> baseInboxContext(UUID nodeId, UUID qryId, long 
fragmentId) {
         return new ExecutionContext<>(
             taskExecutor(),
             PlanningContext.builder()
+                .originatingNodeId(nodeId)
                 .logger(log)
                 .build(),
             qryId,
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 70d52b0..4e7cdd5 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -57,9 +57,6 @@ public class ExecutionContext<Row> implements DataContext {
     /** */
     private final ExpressionFactory<Row> expressionFactory;
 
-    /** */
-    private volatile boolean cancelled;
-
     /**
      * @param ctx Parent context.
      * @param qryId Query ID.
@@ -150,13 +147,6 @@ public class ExecutionContext<Row> implements DataContext {
     }
 
     /**
-     * @return Cancelled flag.
-     */
-    public boolean cancelled() {
-        return cancelled;
-    }
-
-    /**
      * @return Handler to access row fields.
      */
     public RowHandler<Row> rowHandler() {
@@ -198,14 +188,6 @@ public class ExecutionContext<Row> implements DataContext {
     }
 
     /**
-     * Sets cancelled flag.
-     */
-    public void markCancelled() {
-        if (!cancelled)
-            cancelled = true;
-    }
-
-    /**
      * Executes a query task.
      *
      * @param task Query task.
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 4826dbe..aea46c7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -27,7 +27,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Predicate;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.ConventionTraitDef;
@@ -49,7 +48,6 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.events.EventType;
@@ -68,17 +66,19 @@ import 
org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryCancellable;
 import org.apache.ignite.internal.processors.query.QueryContext;
 import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
+import 
org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
-import 
org.apache.ignite.internal.processors.query.calcite.message.QueryCancelRequest;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException;
 import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.CalciteQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
@@ -399,13 +399,10 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
 
     /** {@inheritDoc} */
     @Override public void cancelQuery(UUID qryId) {
-        mailboxRegistry().outboxes(qryId).forEach(this::executeCancel);
-        mailboxRegistry().inboxes(qryId).forEach(this::executeCancel);
-
         QueryInfo info = running.get(qryId);
 
         if (info != null)
-            info.cancel();
+            info.doCancel();
     }
 
     /** {@inheritDoc} */
@@ -435,7 +432,7 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
     @Override public void init() {
         messageService().register((n,m) -> onMessage(n, (QueryStartRequest) 
m), MessageType.QUERY_START_REQUEST);
         messageService().register((n,m) -> onMessage(n, (QueryStartResponse) 
m), MessageType.QUERY_START_RESPONSE);
-        messageService().register((n,m) -> onMessage(n, (QueryCancelRequest) 
m), MessageType.QUERY_CANCEL_REQUEST);
+        messageService().register((n,m) -> onMessage(n, (ErrorMessage) m), 
MessageType.QUERY_ERROR_MESSAGE);
 
         eventManager().addDiscoveryEventListener(discoLsnr, 
EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
@@ -722,47 +719,38 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         register(info);
 
         // start remote execution
-        if (fragments.size() > 1) {
-            for (int i = 1; i < fragments.size(); i++) {
-                Fragment fragment0 = fragments.get(i);
-                NodesMapping mapping0 = plan.fragmentMapping(fragment0);
-
-                boolean error = false;
-
-                for (UUID nodeId : mapping0.nodes()) {
-                    if (error)
-                        info.onResponse(nodeId, fragment0.fragmentId(), new 
QueryCancelledException());
-                    else {
-                        try {
-                            FragmentDescription fragmentDesc0 = new 
FragmentDescription(
-                                fragment0.fragmentId(),
-                                mapping0.partitions(nodeId),
-                                mapping0.assignments().size(),
-                                plan.targetMapping(fragment0),
-                                plan.remoteSources(fragment0)
-                            );
-
-                            QueryStartRequest req = new QueryStartRequest(
-                                qryId,
-                                pctx.schemaName(),
-                                toJson(fragment0.root()),
-                                pctx.topologyVersion(),
-                                fragmentDesc0,
-                                pctx.parameters());
-
-                            messageService().send(nodeId, req);
-                        }
-                        catch (Exception e) {
-                            info.onResponse(nodeId, fragment0.fragmentId(), e);
-                            error = true;
-                        }
+        for (int i = 1; i < fragments.size(); i++) {
+            Fragment fragment0 = fragments.get(i);
+            NodesMapping mapping0 = plan.fragmentMapping(fragment0);
+
+            boolean error = false;
+            for (UUID nodeId : mapping0.nodes()) {
+                if (error)
+                    info.onResponse(nodeId, fragment0.fragmentId(), new 
QueryCancelledException());
+                else {
+                    try {
+                        FragmentDescription fragmentDesc0 = new 
FragmentDescription(
+                            fragment0.fragmentId(),
+                            mapping0.partitions(nodeId),
+                            mapping0.assignments().size(),
+                            plan.targetMapping(fragment0),
+                            plan.remoteSources(fragment0)
+                        );
+
+                        QueryStartRequest req = new QueryStartRequest(
+                            qryId,
+                            pctx.schemaName(),
+                            toJson(fragment0.root()),
+                            pctx.topologyVersion(),
+                            fragmentDesc0,
+                            pctx.parameters());
+
+                        messageService().send(nodeId, req);
+                    }
+                    catch (Exception e) {
+                        info.onResponse(nodeId, fragment0.fragmentId(), e);
+                        error = true;
                     }
-                }
-
-                if (error) {
-                    info.awaitAllReplies();
-
-                    throw new AssertionError(); // Previous call must throw an 
exception
                 }
             }
         }
@@ -844,58 +832,49 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         assert nodeId != null && msg != null;
 
         PlanningContext ctx = createContext(msg.schema(), nodeId, 
msg.topologyVersion());
+        ExecutionContext<Row> execCtx = new ExecutionContext<>(taskExecutor(), 
ctx, msg.queryId(),
+            msg.fragmentDescription(), handler, 
Commons.parametersMap(msg.parameters()));
 
+        Outbox<Row> node;
         try {
-            ExecutionContext<Row> execCtx = new ExecutionContext<>(
-                taskExecutor(),
-                ctx,
-                msg.queryId(),
-                msg.fragmentDescription(),
-                handler,
-                Commons.parametersMap(msg.parameters())
-            );
-
-            Node<Row> node = new LogicalRelImplementor<>(
+            node = new LogicalRelImplementor<>(
                 execCtx,
                 partitionService(),
                 mailboxRegistry(),
                 exchangeService(),
                 failureProcessor())
                 .go(fromJson(ctx, msg.root()));
-
-            assert node instanceof Outbox : node;
-
-            node.context().execute(((Outbox<Row>) node)::init);
-
-            messageService().send(nodeId, new 
QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId()));
         }
-        catch (Throwable ex) { // TODO don't catch errors!
-            cancelQuery(msg.queryId());
-
-            if (ex instanceof ClusterTopologyCheckedException)
-                return;
+        catch (Exception ex) {
+            U.error(log, "Failed to build execution tree. ", ex);
 
-            U.warn(log, "Failed to start query. [nodeId=" + nodeId + ']', ex);
+            mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), -1)
+                .forEach(Outbox::close);
+            mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), -1)
+                .forEach(Inbox::close);
 
             try {
-                messageService().send(nodeId, new 
QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId(), ex));
+                messageService().send(nodeId, new 
QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
             }
             catch (IgniteCheckedException e) {
-                e.addSuppressed(ex);
-
                 U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', 
e);
             }
 
-            if (ex instanceof Error)
-                throw (Error)ex;
+            return;
         }
-    }
 
-    /** */
-    private void onMessage(UUID nodeId, QueryCancelRequest msg) {
-        assert nodeId != null && msg != null;
+        try {
+            messageService().send(nodeId, new 
QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId()));
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e);
+
+            node.onNodeLeft(nodeId);
+
+            return;
+        }
 
-        cancelQuery(msg.queryId());
+        node.init();
     }
 
     /** */
@@ -909,37 +888,18 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
     }
 
     /** */
-    private void onCursorClose(RootNode<?> rootNode) {
-        switch (rootNode.state()) {
-            case CANCELLED:
-                cancelQuery(rootNode.queryId());
+    private void onMessage(UUID nodeId, ErrorMessage msg) {
+        assert nodeId != null && msg != null;
 
-                break;
-            case END:
-                running.remove(rootNode.queryId());
+        QueryInfo info = running.get(msg.queryId());
 
-                break;
-            default:
-                throw new AssertionError();
-        }
+        if (info != null)
+            info.onError(new RemoteException(nodeId, msg.queryId(), 
msg.fragmentId(), msg.error()));
     }
 
     /** */
     private void onNodeLeft(UUID nodeId) {
         running.forEach((uuid, queryInfo) -> queryInfo.onNodeLeft(nodeId));
-
-        final Predicate<Node<?>> p = new OriginatingFilter(nodeId);
-
-        mailboxRegistry().outboxes(null).stream()
-            .filter(p).forEach(this::executeCancel);
-
-        mailboxRegistry().inboxes(null).stream()
-            .filter(p).forEach(this::executeCancel);
-    }
-
-    /** */
-    private void executeCancel(Node<?> node) {
-        node.context().execute(node::cancel);
     }
 
     /** */
@@ -948,10 +908,10 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         RUNNING,
 
         /** */
-        CANCELLING,
+        CLOSING,
 
         /** */
-        CANCELLED
+        CLOSED
     }
 
     /** */
@@ -1009,13 +969,10 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         private QueryState state;
 
         /** */
-        private Throwable error;
-
-        /** */
         private QueryInfo(ExecutionContext<Row> ctx, MultiStepPlan plan, 
Node<Row> root) {
             this.ctx = ctx;
 
-            RootNode<Row> rootNode = new RootNode<>(ctx, 
ExecutionServiceImpl.this::onCursorClose);
+            RootNode<Row> rootNode = new RootNode<>(ctx, this::tryClose);
             rootNode.register(root);
 
             this.root = rootNode;
@@ -1043,71 +1000,43 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
 
         /** {@inheritDoc} */
         @Override public void doCancel() {
-            cancel();
-        }
-
-        /** */
-        private void awaitAllReplies() {
-            Throwable error;
-
-            try {
-                synchronized (this) {
-                    while (!waiting.isEmpty())
-                        wait();
-
-                    error = this.error;
-                }
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedException(e);
-            }
-
-            if (error != null)
-                throw new IgniteSQLException("Failed to execute query.", 
error);
+            root.close();
         }
 
-        /** */
-        private void cancel() {
-            boolean cancelLoc = false;
-            boolean cancelRemote = false;
+        /**
+         * Can be called multiple times after receive each error at {@link 
#onResponse(RemoteFragmentKey, Throwable)}.
+         */
+        private void tryClose() {
             QueryState state0 = null;
 
             synchronized (this) {
-                if (state == QueryState.CANCELLED)
+                if (state == QueryState.CLOSED)
                     return;
 
-                if (state == QueryState.RUNNING) {
-                    cancelLoc = true;
-                    state0 = state = QueryState.CANCELLING;
-                }
+                if (state == QueryState.RUNNING)
+                    state0 = state = QueryState.CLOSING;
 
-                if (state == QueryState.CANCELLING && waiting.isEmpty()) {
-                    cancelRemote = true;
-                    state0 = state = QueryState.CANCELLED;
-                }
+                if (state == QueryState.CLOSING && waiting.isEmpty())
+                    state0 = state = QueryState.CLOSED;
             }
 
-            if (cancelLoc)
-                root.cancel();
+            if (state0 == QueryState.CLOSED) {
+                // 1) unregister runing query
+                running.remove(ctx.queryId());
 
-            if (cancelRemote) {
-                QueryCancelRequest msg = new QueryCancelRequest(ctx.queryId());
+                // 2) close local fragment
+                root.proceedClose();
 
-                for (UUID remote : remotes) {
+                // 3) close remote fragments
+                for (UUID nodeId : remotes) {
                     try {
-                        messageService().send(remote, msg);
-                    }
-                    catch (ClusterTopologyCheckedException e) {
-                        U.warn(log, e.getMessage(), e);
+                        exchangeService().closeOutbox(nodeId, ctx.queryId(), 
-1, -1);
                     }
                     catch (IgniteCheckedException e) {
-                        throw U.convertException(e);
+                        U.warn(log, "Failed to send cancel message. [nodeId=" 
+ nodeId + ']', e);
                     }
                 }
             }
-
-            if (state0 == QueryState.CANCELLED)
-                running.remove(ctx.queryId());
         }
 
         /** */
@@ -1127,7 +1056,8 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
             }
 
             if (!F.isEmpty(fragments)) {
-                ClusterTopologyCheckedException ex = new 
ClusterTopologyCheckedException("Failed to start query, node left. nodeId=" + 
nodeId);
+                ClusterTopologyCheckedException ex = new 
ClusterTopologyCheckedException(
+                    "Failed to start query, node left. nodeId=" + nodeId);
 
                 for (RemoteFragmentKey fragment : fragments)
                     onResponse(fragment, ex);
@@ -1141,46 +1071,23 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
 
         /** */
         private void onResponse(RemoteFragmentKey fragment, Throwable error) {
-            boolean cancel;
-
+            QueryState state;
             synchronized (this) {
-                if (!waiting.remove(fragment))
-                    return;
-
-                if (error != null) {
-                    if (this.error != null)
-                        this.error.addSuppressed(error);
-                    else
-                        this.error = error;
-                }
-
-                boolean empty = waiting.isEmpty();
-
-                cancel = empty && this.error != null;
-
-                if (empty)
-                    notifyAll();
+                waiting.remove(fragment);
+                state = this.state;
             }
 
-            if (cancel)
-                cancel();
+            if (error != null)
+                onError(error);
+            else if (state == QueryState.CLOSING)
+                tryClose();
         }
-    }
-
-    /** */
-    private static final class OriginatingFilter implements Predicate<Node<?>> 
{
-        /** */
-        private final UUID nodeId;
 
         /** */
-        private OriginatingFilter(UUID nodeId) {
-            this.nodeId = nodeId;
-        }
+        private void onError(Throwable error) {
+            root.onError(error);
 
-        /** {@inheritDoc} */
-        @Override public boolean test(Node node) {
-            // Uninitialized inbox doesn't know originating node ID.
-            return Objects.equals(node.context().originatingNodeId(), nodeId);
+            tryClose();
         }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index a6d720f..e15a8b6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -43,30 +43,32 @@ import 
org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import 
org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import 
org.apache.ignite.internal.processors.query.h2.database.H2TreeFilterClosure;
+import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRow;
 import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
-import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
 import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
 import org.h2.value.DataType;
 import org.h2.value.Value;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Scan on index.
  */
-public class IndexScan<Row> implements Iterable<Row> {
+public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
     /** */
-    private final ExecutionContext<Row> ectx;
+    private final GridKernalContext kctx;
 
     /** */
-    private final CacheObjectContext coCtx;
+    private final GridCacheContext<?, ?> cctx;
 
     /** */
-    private final GridKernalContext ctx;
+    private final ExecutionContext<Row> ectx;
 
     /** */
-    private final GridCacheContext<?, ?> cacheCtx;
+    private final CacheObjectContext coCtx;
 
     /** */
     private final TableDescriptor desc;
@@ -96,154 +98,172 @@ public class IndexScan<Row> implements Iterable<Row> {
     private final MvccSnapshot mvccSnapshot;
 
     /** */
-    private List<GridDhtLocalPartition> partsToReserve;
+    private List<GridDhtLocalPartition> reserved;
 
     /**
-     * @param ctx Cache context.
+     * @param ectx Execution context.
      * @param igniteIdx Index tree.
      * @param filters Additional filters.
      * @param lowerBound Lower index scan bound.
      * @param upperBound Upper index scan bound.
      */
     public IndexScan(
-        ExecutionContext<Row> ctx,
+        ExecutionContext<Row> ectx,
         IgniteIndex igniteIdx,
         Predicate<Row> filters,
         Row lowerBound,
         Row upperBound
     ) {
-        ectx = ctx;
+        this.ectx = ectx;
         desc = igniteIdx.table().descriptor();
+        cctx = desc.cacheContext();
+        kctx = cctx.kernalContext();
+        coCtx = cctx.cacheObjectContext();
+
+        RelDataType rowType = desc.selectRowType(this.ectx.getTypeFactory());
+
+        factory = this.ectx.rowHandler().factory(this.ectx.getTypeFactory(), 
rowType);
         idx = igniteIdx.index();
+        topVer = ectx.planningContext().topologyVersion();
         this.filters = filters;
         this.lowerBound = lowerBound;
         this.upperBound = upperBound;
-        cacheCtx = igniteIdx.table().descriptor().cacheContext();
-        coCtx = cacheCtx.cacheObjectContext();
-        this.ctx = coCtx.kernalContext();
-        topVer = ctx.planningContext().topologyVersion();
-        partsArr = ctx.partitions();
-        mvccSnapshot = ctx.mvccSnapshot();
-
-        RelDataType rowType = desc.selectRowType(ectx.getTypeFactory());
-        factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
+        partsArr = ectx.partitions();
+        mvccSnapshot = ectx.mvccSnapshot();
     }
 
     /** {@inheritDoc} */
-    @Override public Iterator<Row> iterator() {
-        H2TreeFilterClosure filterC = filterClosure();
+    @Override public synchronized Iterator<Row> iterator() {
+        reserve();
+        try {
+            H2Row lower = lowerBound == null ? null : new 
H2PlainRow(values(coCtx, ectx, lowerBound));
+            H2Row upper = upperBound == null ? null : new 
H2PlainRow(values(coCtx, ectx, upperBound));
 
-        H2Row lower = lowerBound == null ? null : new CalciteH2Row<>(coCtx, 
ectx, lowerBound);
-        H2Row upper = upperBound == null ? null : new CalciteH2Row<>(coCtx, 
ectx, upperBound);
-
-        reservePartitions();
-
-        GridCursor<H2Row> cur = idx.find(lower, upper, filterC);
+            return new IteratorImpl(idx.find(lower, upper, filterClosure()));
+        }
+        catch (Exception e) {
+            release();
 
-        return new CursorIteratorWrapper(cur);
+            throw e;
+        }
     }
 
     /** */
-    public H2TreeFilterClosure filterClosure() {
-        IndexingQueryFilter filter = new IndexingQueryFilterImpl(ctx, topVer, 
partsArr);
-        IndexingQueryCacheFilter f = filter.forCache(cacheCtx.name());
-        H2TreeFilterClosure filterC = null;
-
-        if (f != null || mvccSnapshot != null )
-            filterC = new H2TreeFilterClosure(f, mvccSnapshot, cacheCtx, 
ectx.planningContext().logger());
-
-        return filterC;
+    @Override public void close() {
+        release();
     }
 
     /** */
-    private void reservePartitions() {
-        assert partsToReserve == null : partsToReserve;
+    private synchronized void reserve() {
+        if (reserved != null)
+            return;
 
-        partsToReserve = gatherPartitions(cacheCtx, partsArr);
+        List<GridDhtLocalPartition> toReserve;
 
-        for (GridDhtLocalPartition part : partsToReserve) {
-            if (part == null || !part.reserve())
-                throw reservationException();
-            else if (part.state() != GridDhtPartitionState.OWNING) {
-                part.release();
+        if (cctx.isReplicated()) {
+            int partsCnt = cctx.affinity().partitions();
+            toReserve = new ArrayList<>(partsCnt);
+            GridDhtPartitionTopology top = cctx.topology();
+            for (int i = 0; i < partsCnt; i++)
+                toReserve.add(top.localPartition(i));
+        }
+        else if (cctx.isPartitioned()) {
+            assert partsArr != null;
 
-                throw reservationException();
-            }
+            toReserve = new ArrayList<>(partsArr.length);
+            GridDhtPartitionTopology top = cctx.topology();
+            for (int i = 0; i < partsArr.length; i++)
+                toReserve.add(top.localPartition(partsArr[i]));
         }
-    }
+        else {
+            assert cctx.isLocal();
 
-    /** */
-    private List<GridDhtLocalPartition> gatherPartitions(GridCacheContext<?, 
?> ctx, int[] arr ) {
-        if (ctx.isReplicated()) {
-            int partsCnt = ctx.affinity().partitions();
-            GridDhtPartitionTopology top = ctx.topology();
-            List<GridDhtLocalPartition> parts = new ArrayList<>(partsCnt);
+            toReserve = Collections.emptyList();
+        }
 
-            for (int i = 0; i < partsCnt; i++)
-                parts.add(top.localPartition(i));
+        reserved = new ArrayList<>(toReserve.size());
 
-            return parts;
-        }
-        else if (ctx.isPartitioned()) {
-            assert arr != null;
-            List<GridDhtLocalPartition> parts = new ArrayList<>(arr.length);
-            GridDhtPartitionTopology top = ctx.topology();
+        try {
+            for (GridDhtLocalPartition part : toReserve) {
+                if (part == null || !part.reserve())
+                    throw new IgniteSQLException("Failed to reserve partition 
for query execution. Retry on stable topology.");
+                else if (part.state() != GridDhtPartitionState.OWNING) {
+                    part.release();
 
-            for (int i = 0; i < arr.length; i++)
-                parts.add(top.localPartition(arr[i]));
+                    throw new IgniteSQLException("Failed to reserve partition 
for query execution. Retry on stable topology.");
+                }
 
-            return parts;
+                reserved.add(part);
+            }
         }
-        else {
-            assert ctx.isLocal();
+        catch (Exception e) {
+            release();
 
-            return Collections.emptyList();
+            throw e;
         }
     }
 
     /** */
-    private void releasePartitions() {
-        assert partsToReserve != null;
+    private synchronized void release() {
+        if (reserved == null)
+            return;
 
-        for (GridDhtLocalPartition part : partsToReserve)
+        for (GridDhtLocalPartition part : reserved)
             part.release();
+
+        reserved = null;
     }
 
     /** */
-    private IgniteSQLException reservationException() {
-        return new IgniteSQLException("Failed to reserve partition for query 
execution. Retry on stable topology.");
+    private H2TreeFilterClosure filterClosure() {
+        IndexingQueryFilter filter = new IndexingQueryFilterImpl(kctx, topVer, 
partsArr);
+        IndexingQueryCacheFilter f = filter.forCache(cctx.name());
+        H2TreeFilterClosure filterC = null;
+
+        if (f != null || mvccSnapshot != null )
+            filterC = new H2TreeFilterClosure(f, mvccSnapshot, cctx, 
ectx.planningContext().logger());
+
+        return filterC;
     }
 
     /** */
-    private class CursorIteratorWrapper extends 
GridCloseableIteratorAdapter<Row> {
+    private Value[] values(CacheObjectValueContext cctx, ExecutionContext<Row> 
ectx, Row row) {
+        try {
+            RowHandler<Row> rowHnd = ectx.rowHandler();
+            int rowLen = rowHnd.columnCount(row);
+
+            Value[] values = new Value[rowLen];
+            for (int i = 0; i < rowLen; i++) {
+                Object o = rowHnd.get(i, row);
+
+                if (o != null)
+                    values[i] = H2Utils.wrap(cctx, o, 
DataType.getTypeFromClass(o.getClass()));
+            }
+
+            return values;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to wrap object into H2 Value.", 
e);
+        }
+    }
+
+    /** */
+    private class IteratorImpl extends GridIteratorAdapter<Row> {
         /** */
         private final GridCursor<H2Row> cursor;
 
         /** Next element. */
         private Row next;
 
-        /**
-         * @param cursor Cursor.
-         */
-        CursorIteratorWrapper(GridCursor<H2Row> cursor) {
-            assert cursor != null;
+        /** */
+        public IteratorImpl(@NotNull GridCursor<H2Row> cursor) {
             this.cursor = cursor;
         }
 
         /** {@inheritDoc} */
-        @Override protected Row onNext() {
-            if (next == null)
-                throw new NoSuchElementException();
-
-            Row res = next;
-
-            next = null;
-
-            return res;
-        }
+        @Override public boolean hasNextX() throws IgniteCheckedException {
+            assert cursor != null;
 
-        /** {@inheritDoc} */
-        @Override protected boolean onHasNext() throws IgniteCheckedException {
             if (next != null)
                 return true;
 
@@ -259,58 +279,22 @@ public class IndexScan<Row> implements Iterable<Row> {
         }
 
         /** {@inheritDoc} */
-        @Override protected void onClose() {
-            releasePartitions();
-        }
-    }
-
-    /** */
-    private static class CalciteH2Row<Row> extends H2Row {
-        /** */
-        private final Value[] values;
-
-        /** */
-        CalciteH2Row(CacheObjectValueContext coCtx, ExecutionContext<Row> ctx, 
Row row) {
-            try {
-                RowHandler<Row> rowHnd = ctx.rowHandler();
-
-                int colCnt = rowHnd.columnCount(row);
-
-                values = new Value[colCnt];
-
-                for (int i = 0; i < colCnt; i++) {
-                    Object o = rowHnd.get(i, row);
-
-                    if (o != null) {
-                        Value v = H2Utils.wrap(coCtx, o, 
DataType.getTypeFromClass(o.getClass()));
+        @Override public Row nextX() {
+            assert cursor != null;
 
-                        values[i] = v;
-                    }
-                }
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException("Failed to wrap object into H2 
Value.", e);
-            }
-        }
+            if (next == null)
+                throw new NoSuchElementException();
 
-        /** {@inheritDoc} */
-        @Override public boolean indexSearchRow() {
-            return true;
-        }
+            Row res = next;
 
-        /** {@inheritDoc} */
-        @Override public int getColumnCount() {
-            return values.length;
-        }
+            next = null;
 
-        /** {@inheritDoc} */
-        @Override public Value getValue(int idx) {
-            return values[idx];
+            return res;
         }
 
         /** {@inheritDoc} */
-        @Override public void setValue(int idx, Value v) {
-            throw new AssertionError("Not supported.");
+        @Override public void removeX() {
+            throw new UnsupportedOperationException("Remove is not 
supported.");
         }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index bb4297a..8b9ae84 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -400,7 +400,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
     }
 
     /** */
-    public Node<Row> go(IgniteRel rel) {
-        return visit(rel);
+    public <T extends Node<Row>> T go(IgniteRel rel) {
+        return (T)visit(rel);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
index d49009d..8c7c2d4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
@@ -81,16 +81,20 @@ public interface MailboxRegistry extends Service {
     /**
      * Returns all registered inboxes for provided query ID.
      *
-     * @param qryId Query ID. {@code null} means return all registered inboxes.
+     * @param qryId Query ID. {@code null} means return inboxes with any query 
id.
+     * @param fragmentId Fragment Id. {@code -1} means return inboxes with any 
fragment id.
+     * @param exchangeId Exchange Id. {@code -1} means return inboxes with any 
exchange id.
      * @return Registered inboxes.
      */
-    Collection<Inbox<?>> inboxes(@Nullable UUID qryId);
+    Collection<Inbox<?>> inboxes(@Nullable UUID qryId, long fragmentId, long 
exchangeId);
 
     /**
      * Returns all registered outboxes for provided query ID.
      *
-     * @param qryId Query ID. {@code null} means return all registered 
outboxes.
+     * @param qryId Query ID. {@code null} means return outboxes with any 
query id.
+     * @param fragmentId Fragment Id. {@code -1} means return outboxes with 
any fragment id.
+     * @param exchangeId Exchange Id. {@code -1} means return outboxes with 
any exchange id.
      * @return Registered outboxes.
      */
-    Collection<Outbox<?>> outboxes(@Nullable UUID qryId);
+    Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long fragmentId, long 
exchangeId);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
index 77e1101..39ace60 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
@@ -19,12 +19,18 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Mailbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import 
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.jetbrains.annotations.Nullable;
@@ -34,11 +40,20 @@ import org.jetbrains.annotations.Nullable;
  */
 public class MailboxRegistryImpl extends AbstractService implements 
MailboxRegistry {
     /** */
+    private static final Predicate<Mailbox<?>> ALWAYS_TRUE = o -> true;
+
+    /** */
     private final Map<MailboxKey, Outbox<?>> locals;
 
     /** */
     private final Map<MailboxKey, Inbox<?>> remotes;
 
+    /** */
+    private final DiscoveryEventListener discoLsnr;
+
+    /** */
+    private GridEventStorageManager evtMgr;
+
     /**
      * @param ctx Kernal.
      */
@@ -47,6 +62,25 @@ public class MailboxRegistryImpl extends AbstractService 
implements MailboxRegis
 
         locals = new ConcurrentHashMap<>();
         remotes = new ConcurrentHashMap<>();
+
+        discoLsnr = (e, c) -> onNodeLeft(e.eventNode().id());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onStart(GridKernalContext ctx) {
+        eventManager(ctx.event());
+
+        init();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void init() {
+        eventManager().addDiscoveryEventListener(discoLsnr, 
EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void tearDown() {
+        eventManager().removeDiscoveryEventListener(discoLsnr, 
EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
     }
 
     /** {@inheritDoc} */
@@ -58,7 +92,7 @@ public class MailboxRegistryImpl extends AbstractService 
implements MailboxRegis
 
     /** {@inheritDoc} */
     @Override public void unregister(Inbox<?> inbox) {
-        remotes.remove(new MailboxKey(inbox.queryId(), inbox.exchangeId()));
+        remotes.remove(new MailboxKey(inbox.queryId(), inbox.exchangeId()), 
inbox);
     }
 
     /** {@inheritDoc} */
@@ -70,7 +104,7 @@ public class MailboxRegistryImpl extends AbstractService 
implements MailboxRegis
 
     /** {@inheritDoc} */
     @Override public void unregister(Outbox<?> outbox) {
-        locals.remove(new MailboxKey(outbox.queryId(), outbox.exchangeId()));
+        locals.remove(new MailboxKey(outbox.queryId(), outbox.exchangeId()), 
outbox);
     }
 
     /** {@inheritDoc} */
@@ -84,27 +118,52 @@ public class MailboxRegistryImpl extends AbstractService 
implements MailboxRegis
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<Inbox<?>> inboxes(@Nullable UUID qryId) {
-        if (qryId == null)
-            return remotes.values();
-
-        return remotes.entrySet().stream()
-            .filter(e -> e.getKey().qryId.equals(qryId))
-            .map(Map.Entry::getValue)
+    @Override public Collection<Inbox<?>> inboxes(@Nullable UUID qryId, long 
fragmentId, long exchangeId) {
+        return remotes.values().stream()
+            .filter(makeFilter(qryId, fragmentId, exchangeId))
             .collect(Collectors.toList());
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<Outbox<?>> outboxes(@Nullable UUID qryId) {
-        if (qryId == null)
-            return locals.values();
-
-        return locals.entrySet().stream()
-            .filter(e -> e.getKey().qryId.equals(qryId))
-            .map(Map.Entry::getValue)
+    @Override public Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long 
fragmentId, long exchangeId) {
+        return locals.values().stream()
+            .filter(makeFilter(qryId, fragmentId, exchangeId))
             .collect(Collectors.toList());
     }
 
+    /**
+     * @param evtMgr Event manager.
+     */
+    public void eventManager(GridEventStorageManager evtMgr) {
+        this.evtMgr = evtMgr;
+    }
+
+    /**
+     * @return Event manager.
+     */
+    public GridEventStorageManager eventManager() {
+        return evtMgr;
+    }
+
+    /** */
+    private void onNodeLeft(UUID nodeId) {
+        locals.values().forEach(n -> n.onNodeLeft(nodeId));
+        remotes.values().forEach(n -> n.onNodeLeft(nodeId));
+    }
+
+    /** */
+    private static Predicate<Mailbox<?>> makeFilter(@Nullable UUID qryId, long 
fragmentId, long exchangeId) {
+        Predicate<Mailbox<?>> filter = ALWAYS_TRUE;
+        if (qryId != null)
+            filter = filter.and(mailbox -> Objects.equals(mailbox.queryId(), 
qryId));
+        if (fragmentId != -1)
+            filter = filter.and(mailbox -> mailbox.fragmentId() == fragmentId);
+        if (exchangeId != -1)
+            filter = filter.and(mailbox -> mailbox.exchangeId() == exchangeId);
+
+        return filter;
+    }
+
     /** */
     private static class MailboxKey {
         /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
deleted file mode 100644
index 090abc7..0000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.exec;
-
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheStoppedException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import 
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
-import 
org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
-import org.apache.ignite.internal.util.lang.GridCursor;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/** */
-public class TableScan<Row> implements Iterable<Row> {
-    /** */
-    private final ExecutionContext<Row> ectx;
-
-    /** */
-    private final TableDescriptor desc;
-
-    /** */
-    private final RowFactory<Row> factory;
-
-    /** */
-    public TableScan(ExecutionContext<Row> ectx, TableDescriptor desc) {
-        this.ectx = ectx;
-        this.desc = desc;
-
-        RelDataType rowType = desc.selectRowType(ectx.getTypeFactory());
-        factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<Row> iterator() {
-        try {
-            return new IteratorImpl().init();
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-    }
-
-    /**
-     * Table scan iterator.
-     */
-    private class IteratorImpl extends GridCloseableIteratorAdapter<Row> {
-        /** */
-        private int cacheId;
-
-        /** */
-        private Queue<GridDhtLocalPartition> parts;
-
-        /** */
-        private GridDhtLocalPartition part;
-
-        /** */
-        private GridCursor<? extends CacheDataRow> cur;
-
-        /** */
-        private Row next;
-
-        /** {@inheritDoc} */
-        @Override protected Row onNext() {
-            if (next == null)
-                throw new NoSuchElementException();
-
-            Row next = this.next;
-
-            this.next = null;
-
-            return next;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected boolean onHasNext() throws IgniteCheckedException {
-            assert parts != null;
-
-            if (next != null)
-                return true;
-
-            while (true) {
-                if (cur == null) {
-                    if ((part = parts.poll()) == null)
-                        break;
-
-                    cur = part.dataStore().cursor(cacheId, 
ectx.mvccSnapshot());
-                }
-
-                if (cur.next()) {
-                    CacheDataRow row = cur.get();
-
-                    if (!desc.match(row))
-                        continue;
-
-                    next = desc.toRow(ectx, row, factory);
-
-                    break;
-                } else {
-                    cur = null;
-
-                    part.release();
-                    part = null;
-                }
-            }
-
-            return next != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void onClose() {
-            if (part != null)
-                part.release();
-
-            part = null;
-
-            while (!F.isEmpty(parts))
-                parts.poll().release();
-
-            parts = null;
-        }
-
-        /** */
-        public Iterator<Row> init() throws IgniteCheckedException {
-            if (isClosed())
-                return Collections.emptyIterator();
-
-            GridCacheContext<?, ?> cctx = desc.cacheContext();
-
-            if (!cctx.gate().enterIfNotStopped()) {
-                close();
-
-                throw new CacheStoppedException(cctx.name());
-            }
-
-            try {
-                GridDhtPartitionTopology top = cctx.topology();
-
-                top.readLock();
-                try {
-                    GridDhtTopologyFuture fut = top.topologyVersionFuture();
-                    AffinityTopologyVersion topVer = 
ectx.planningContext().topologyVersion();
-
-                    if (!fut.isDone() || 
fut.topologyVersion().compareTo(topVer) != 0)
-                        throw new ClusterTopologyCheckedException("Failed to 
execute query. Retry on stable topology.");
-
-                    if (cctx.isPartitioned())
-                        reservePartitioned(top);
-                    else
-                        reserveReplicated(top);
-                }
-                finally {
-                    top.readUnlock();
-                }
-
-                cacheId = cctx.cacheId();
-
-                return this;
-            }
-            catch (Exception e) {
-                Commons.closeQuiet(this, e);
-
-                throw e;
-            }
-            finally {
-                cctx.gate().leave();
-            }
-        }
-
-        /** */
-        private void reserveReplicated(GridDhtPartitionTopology top) {
-            List<GridDhtLocalPartition> locParts = top.localPartitions();
-
-            parts = new ArrayDeque<>(locParts.size());
-
-            for (GridDhtLocalPartition local : locParts) {
-                if (!local.reserve())
-                    throw reservationException();
-                else if (local.state() != GridDhtPartitionState.OWNING) {
-                    local.release();
-
-                    throw reservationException();
-                }
-
-                parts.offer(local);
-            }
-        }
-
-        /** */
-        private void reservePartitioned(GridDhtPartitionTopology top) {
-            AffinityTopologyVersion topVer = 
ectx.planningContext().topologyVersion();
-            int[] partitions = ectx.partitions();
-
-            assert topVer != null && !F.isEmpty(partitions);
-
-            parts = new ArrayDeque<>(partitions.length);
-
-            for (int p : partitions) {
-                GridDhtLocalPartition loc = top.localPartition(p, topVer, 
false);
-
-                if (loc == null || !loc.reserve())
-                    throw reservationException();
-                else if (loc.state() != GridDhtPartitionState.OWNING) {
-                    loc.release();
-
-                    throw reservationException();
-                }
-
-                parts.offer(loc);
-            }
-        }
-
-        /** */
-        private IgniteSQLException reservationException() {
-            return new IgniteSQLException("Failed to reserve partition for 
query execution. Retry on stable topology.");
-        }
-    }
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java
index 5332485..a64e48b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinNode.java
@@ -71,6 +71,9 @@ public abstract class AbstractJoinNode<Row> extends 
AbstractNode<Row> {
     @Override public void request(int rowsCnt) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert !F.isEmpty(sources) && sources.size() == 2;
         assert rowsCnt > 0 && requested == 0;
 
@@ -124,6 +127,9 @@ public abstract class AbstractJoinNode<Row> extends 
AbstractNode<Row> {
     private void pushLeft(Row row) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waitingLeft > 0;
 
@@ -138,6 +144,9 @@ public abstract class AbstractJoinNode<Row> extends 
AbstractNode<Row> {
     private void pushRight(Row row) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waitingRight > 0;
 
@@ -153,6 +162,9 @@ public abstract class AbstractJoinNode<Row> extends 
AbstractNode<Row> {
     private void endLeft() {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waitingLeft > 0;
 
@@ -165,6 +177,9 @@ public abstract class AbstractJoinNode<Row> extends 
AbstractNode<Row> {
     private void endRight() {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waitingRight > 0;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
index 67a2718..bd1ad5e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
@@ -37,10 +37,10 @@ public abstract class AbstractNode<Row> implements 
Node<Row> {
     protected static final int MODIFY_BATCH_SIZE = 
IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_BATCH_SIZE", 100);
 
     /** */
-    protected static final int IO_BATCH_SIZE = 
IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_SIZE", 200);
+    protected static final int IO_BATCH_SIZE = 
IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_SIZE", 256);
 
     /** */
-    protected static final int IO_BATCH_CNT = 
IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_CNT", 50);
+    protected static final int IO_BATCH_CNT = 
IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_CNT", 4);
 
     /** for debug purpose */
     private volatile Thread thread;
@@ -58,6 +58,9 @@ public abstract class AbstractNode<Row> implements Node<Row> {
     /** */
     protected List<Node<Row>> sources;
 
+    /** */
+    protected boolean closed;
+
     /**
      * @param ctx Execution context.
      */
@@ -84,13 +87,23 @@ public abstract class AbstractNode<Row> implements 
Node<Row> {
     }
 
     /** {@inheritDoc} */
-    @Override public void cancel() {
+    @Override public void close() {
         checkThread();
 
-        context().markCancelled();
+        if (closed)
+            return;
+
+        closed = true;
 
         if (!F.isEmpty(sources))
-            sources.forEach(Node::cancel);
+            sources.forEach(U::closeQuiet);
+    }
+
+    /**
+     * @return {@code true} if the subtree is canceled.
+     */
+    protected boolean isClosed() {
+        return closed;
     }
 
     /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
index 347f533..f496b56 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
@@ -97,6 +97,9 @@ public class AggregateNode<Row> extends AbstractNode<Row> 
implements SingleNode<
     @Override public void request(int rowsCnt) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert !F.isEmpty(sources) && sources.size() == 1;
         assert rowsCnt > 0 && requested == 0;
 
@@ -114,6 +117,9 @@ public class AggregateNode<Row> extends AbstractNode<Row> 
implements SingleNode<
     @Override public void push(Row row) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waiting > 0;
 
@@ -135,6 +141,9 @@ public class AggregateNode<Row> extends AbstractNode<Row> 
implements SingleNode<
     @Override public void end() {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waiting > 0;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
index e7202fd..19c87d3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
@@ -57,6 +57,9 @@ public class FilterNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     @Override public void request(int rowsCnt) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert !F.isEmpty(sources) && sources.size() == 1;
         assert rowsCnt > 0 && requested == 0;
 
@@ -70,6 +73,9 @@ public class FilterNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     @Override public void push(Row row) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waiting > 0;
 
@@ -90,6 +96,9 @@ public class FilterNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     @Override public void end() {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waiting > 0;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index ce82e36..5e7e281 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -17,27 +17,30 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * A part of exchange.
  */
-public class Inbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, 
AutoCloseable {
+public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, 
SingleNode<Row> {
     /** */
     private final ExchangeService exchange;
 
@@ -54,7 +57,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, Au
     private final Map<UUID, Buffer> perNodeBuffers;
 
     /** */
-    private Collection<UUID> nodes;
+    private volatile Collection<UUID> srcNodeIds;
 
     /** */
     private Comparator<Row> comp;
@@ -92,17 +95,8 @@ public class Inbox<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, Au
         perNodeBuffers = new HashMap<>();
     }
 
-    /**
-     * @return Query ID.
-     */
-    public UUID queryId() {
-        return context().queryId();
-    }
-
-    /**
-     * @return Exchange ID.
-     */
-    public long exchangeId() {
+    /** {@inheritDoc} */
+    @Override public long exchangeId() {
         return exchangeId;
     }
 
@@ -110,48 +104,44 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, Au
      * Inits this Inbox.
      *
      * @param ctx Execution context.
-     * @param nodes Source nodes.
+     * @param srcNodeIds Source node IDs.
      * @param comp Optional comparator for merge exchange.
      */
-    public void init(ExecutionContext<Row> ctx, Collection<UUID> nodes, 
Comparator<Row> comp) {
+    public void init(ExecutionContext<Row> ctx, Collection<UUID> srcNodeIds, 
@Nullable Comparator<Row> comp) {
         // It's important to set proper context here because
         // because the one, that is created on a first message
-        // recived doesn't have all context variables in place.
+        // received doesn't have all context variables in place.
         this.ctx = ctx;
-        this.nodes = nodes;
         this.comp = comp;
+
+        // memory barier
+        this.srcNodeIds = new HashSet<>(srcNodeIds);
     }
 
     /** {@inheritDoc} */
     @Override public void request(int rowsCnt) {
         checkThread();
 
-        assert nodes != null;
+        if (isClosed())
+            return;
+
+        assert srcNodeIds != null;
         assert rowsCnt > 0 && requested == 0;
 
         requested = rowsCnt;
 
-        if (buffers == null) {
-            nodes.forEach(this::getOrCreateBuffer);
-            buffers = new ArrayList<>(perNodeBuffers.values());
-
-            assert buffers.size() == nodes.size();
-        }
-
         if (!inLoop)
             context().execute(this::pushInternal);
     }
 
     /** {@inheritDoc} */
-    @Override public void cancel() {
-        checkThread();
-        context().markCancelled();
-        close();
-    }
-
-    /** {@inheritDoc} */
     @Override public void close() {
+        if (isClosed())
+            return;
+
         registry.unregister(this);
+
+        super.close();
     }
 
     /** {@inheritDoc} */
@@ -175,6 +165,9 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, Au
     public void onBatchReceived(UUID src, int batchId, boolean last, List<Row> 
rows) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         Buffer buf = getOrCreateBuffer(src);
 
         boolean waitingBefore = buf.check() == State.WAITING;
@@ -185,20 +178,47 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, Au
             pushInternal();
     }
 
+    /**
+     * @param e Error.
+     */
+    private void onError(Throwable e) {
+        checkThread();
+
+        assert downstream != null;
+
+        downstream.onError(e);
+
+        close();
+    }
+
     /** */
     private void pushInternal() {
+        if (isClosed())
+            return;
+
         assert downstream != null;
 
         inLoop = true;
+
         try {
+            if (buffers == null) {
+                for (UUID node : srcNodeIds)
+                    checkNode(node);
+
+                buffers = srcNodeIds.stream()
+                    .map(this::getOrCreateBuffer)
+                    .collect(Collectors.toList());
+
+                assert buffers.size() == perNodeBuffers.size();
+            }
+
             if (comp != null)
                 pushOrdered();
             else
                 pushUnordered();
         }
-        catch (Exception e) {
-            downstream.onError(e);
-            close();
+        catch (Throwable e) {
+            onError(e);
         }
         finally {
             inLoop = false;
@@ -231,7 +251,7 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, Au
         }
 
         while (requested > 0 && !heap.isEmpty()) {
-            if (context().cancelled())
+            if (isClosed())
                 return;
 
             Buffer buf = heap.poll().right;
@@ -259,8 +279,6 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, Au
 
             downstream.end();
             requested = 0;
-
-            close();
         }
     }
 
@@ -269,7 +287,7 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, Au
         int idx = 0, noProgress = 0;
 
         while (requested > 0 && !buffers.isEmpty()) {
-            if (context().cancelled())
+            if (isClosed())
                 return;
 
             Buffer buf = buffers.get(idx);
@@ -299,8 +317,6 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, Au
         if (requested > 0 && buffers.isEmpty()) {
             downstream.end();
             requested = 0;
-
-            close();
         }
     }
 
@@ -320,6 +336,28 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, Au
     }
 
     /** */
+    public void onNodeLeft(UUID nodeId) {
+        if (ctx.originatingNodeId().equals(nodeId) && srcNodeIds == null)
+            ctx.execute(this::close);
+        else if (srcNodeIds != null && srcNodeIds.contains(nodeId))
+            ctx.execute(() -> onNodeLeft0(nodeId));
+    }
+
+    /** */
+    private void onNodeLeft0(UUID nodeId) {
+        checkThread();
+
+        if (getOrCreateBuffer(nodeId).check() != State.END)
+            onError(new ClusterTopologyCheckedException("Node left [nodeId=" + 
nodeId + ']'));
+    }
+
+    /** */
+    private void checkNode(UUID nodeId) throws ClusterTopologyCheckedException 
{
+        if (!exchange.alive(nodeId))
+            throw new ClusterTopologyCheckedException("Node left [nodeId=" + 
nodeId + ']');
+    }
+
+    /** */
     private static final class Batch<Row> implements Comparable<Batch<Row>> {
         /** */
         private final int batchId;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Mailbox.java
similarity index 58%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Mailbox.java
index 3444347..0dc7077 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Mailbox.java
@@ -15,30 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.message;
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
 import java.util.UUID;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.query.calcite.util.Service;
+/** */
+public interface Mailbox<T> extends Node<T> {
+    /**
+     * @return Query ID.
+     */
+    default UUID queryId() {
+        return context().queryId();
+    }
 
-/**
- *
- */
-public interface MessageService extends Service {
     /**
-     * Sends a message to given node.
-     *
-     * @param nodeId Node ID.
-     * @param msg Message.
+     * @return Fragment ID.
      */
-    void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
+    default long fragmentId() {
+        return context().fragmentId();
+    }
 
     /**
-     * Registers a listener for messages of a given type.
-     *
-     * @param lsnr Listener.
-     * @param type Message type.
+     * @return Exchange ID.
      */
-    void register(MessageListener lsnr, MessageType type);
+    long exchangeId();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index d291413..773a173 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -91,6 +91,9 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     @Override public void request(int rowsCnt) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert !F.isEmpty(sources) && sources.size() == 1;
         assert rowsCnt > 0 && requested == 0;
 
@@ -104,6 +107,9 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     @Override public void push(Row row) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waiting > 0;
         assert state == State.UPDATING;
@@ -134,6 +140,9 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     @Override public void end() {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waiting > 0;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
index 33f14ba..6c99f4d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
@@ -27,7 +27,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
  * <p/><b>Note</b>: except several cases (like consumer node and mailboxes), 
{@link Node#request(int)}, {@link Node#cancel()},
  * {@link Downstream#push(Object)} and {@link Downstream#end()} methods should 
be used from one single thread.
  */
-public interface Node<Row> {
+public interface Node<Row> extends AutoCloseable {
     /**
      * Returns runtime context allowing access to the tables in a database.
      *
@@ -53,9 +53,4 @@ public interface Node<Row> {
      * Requests next bunch of rows.
      */
     void request(int rowsCnt);
-
-    /**
-     * Cancels execution.
-     */
-    void cancel();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index b81223f..199f528 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -38,7 +38,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 /**
  * A part of exchange.
  */
-public class Outbox<Row> extends AbstractNode<Row> implements SingleNode<Row>, 
Downstream<Row>, AutoCloseable {
+public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, 
SingleNode<Row>, Downstream<Row> {
     /** */
     private final ExchangeService exchange;
 
@@ -61,9 +61,6 @@ public class Outbox<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, D
     private final Map<UUID, Buffer> nodeBuffers = new HashMap<>();
 
     /** */
-    private boolean cancelled;
-
-    /** */
     private int waiting;
 
     /**
@@ -78,8 +75,8 @@ public class Outbox<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, D
         ExecutionContext<Row> ctx,
         ExchangeService exchange,
         MailboxRegistry registry,
-        long exchangeId, long
-        targetFragmentId,
+        long exchangeId,
+        long targetFragmentId,
         Destination dest
     ) {
         super(ctx);
@@ -90,17 +87,8 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, D
         this.dest = dest;
     }
 
-    /**
-     * @return Query ID.
-     */
-    public UUID queryId() {
-        return context().queryId();
-    }
-
-    /**
-     * @return Exchange ID.
-     */
-    public long exchangeId() {
+    /** {@inheritDoc} */
+    @Override public long exchangeId() {
         return exchangeId;
     }
 
@@ -111,7 +99,11 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, D
      * @param batchId Batch ID.
      */
     public void onAcknowledge(UUID nodeId, int batchId) {
-        nodeBuffers.get(nodeId).onAcknowledge(batchId);
+        Buffer buffer = nodeBuffers.get(nodeId);
+
+        assert buffer != null;
+
+        buffer.onAcknowledge(batchId);
     }
 
     /** */
@@ -125,6 +117,9 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, D
     @Override public void push(Row row) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert waiting > 0;
 
         waiting--;
@@ -138,6 +133,9 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, D
     @Override public void end() {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert waiting > 0;
 
         waiting = -1;
@@ -145,8 +143,6 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, D
         try {
             for (UUID node : dest.targets())
                 getOrCreateBuffer(node).end();
-
-            close();
         }
         catch (Exception e) {
             onError(e);
@@ -158,30 +154,29 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, D
         U.error(context().planningContext().logger(),
             "Error occurred during execution: " + X.getFullStackTrace(e));
 
-        cancel(); // TODO send cause to originator.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancel() {
-        checkThread();
-
-        context().markCancelled();
-
-        if (cancelled)
-            return;
-
-        cancelled = true;
-
-        nodeBuffers.values().forEach(Buffer::cancel);
+        try {
+            sendError(e);
+        }
+        catch (IgniteCheckedException ex) {
+            U.error(context().planningContext().logger(),
+                "Error occurred during send error message: " + 
X.getFullStackTrace(e));
+        }
 
         close();
-
-        super.cancel();
     }
 
     /** {@inheritDoc} */
     @Override public void close() {
+        if (isClosed())
+            return;
+
         registry.unregister(this);
+
+        // Send cancel message for the Inbox to close Inboxes created by batch 
message race.
+        for (UUID node : dest.targets())
+            getOrCreateBuffer(node).close();
+
+        super.close();
     }
 
     /** {@inheritDoc} */
@@ -208,9 +203,14 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, D
     }
 
     /** */
-    private void sendCancel(UUID nodeId, int batchId) {
+    private void sendError(Throwable err) throws IgniteCheckedException {
+        exchange.sendError(ctx.originatingNodeId(), queryId(), fragmentId(), 
err);
+    }
+
+    /** */
+    private void sendInboxClose(UUID nodeId) {
         try {
-            exchange.cancel(nodeId, queryId(), targetFragmentId, exchangeId, 
batchId);
+            exchange.closeInbox(nodeId, queryId(), targetFragmentId, 
exchangeId);
         }
         catch (IgniteCheckedException e) {
             U.warn(context().planningContext().logger(), "Failed to send 
cancel message.", e);
@@ -264,6 +264,12 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, D
     }
 
     /** */
+    public void onNodeLeft(UUID nodeId) {
+        if (nodeId.equals(ctx.originatingNodeId()))
+            ctx.execute(this::close);
+    }
+
+    /** */
     private final class Buffer {
         /** */
         private final UUID nodeId;
@@ -318,15 +324,18 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements SingleNode<Row>, D
         }
 
         /** */
-        public void cancel() {
+        public void close() {
+            int currBatchId = hwm;
+
             if (hwm == Integer.MAX_VALUE)
                 return;
 
-            int batchId = hwm + 1;
             hwm = Integer.MAX_VALUE;
 
             curr = null;
-            sendCancel(nodeId, batchId);
+
+            if (currBatchId >= 0)
+                sendInboxClose(nodeId);
         }
 
         /**
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
index 0c2c2ab..afe7fc0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
@@ -43,6 +43,9 @@ public class ProjectNode<Row> extends AbstractNode<Row> 
implements SingleNode<Ro
     @Override public void request(int rowsCnt) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert !F.isEmpty(sources) && sources.size() == 1;
         assert rowsCnt > 0;
 
@@ -53,6 +56,9 @@ public class ProjectNode<Row> extends AbstractNode<Row> 
implements SingleNode<Ro
     @Override public void push(Row row) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
 
         try {
@@ -67,6 +73,9 @@ public class ProjectNode<Row> extends AbstractNode<Row> 
implements SingleNode<Ro
     @Override public void end() {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
 
         downstream.end();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index ca4c370..68da4fd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -22,9 +22,9 @@ import java.util.Deque;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
 
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -35,8 +35,7 @@ import org.apache.ignite.internal.util.typedef.F;
 /**
  * Client iterator.
  */
-public class RootNode<Row> extends AbstractNode<Row>
-    implements SingleNode<Row>, Downstream<Row>, Iterator<Row>, AutoCloseable {
+public class RootNode<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, Downstream<Row>, Iterator<Row> {
     /** */
     private final ReentrantLock lock;
 
@@ -47,13 +46,13 @@ public class RootNode<Row> extends AbstractNode<Row>
     private final Deque<Row> buff;
 
     /** */
-    private final Consumer<RootNode<Row>> onClose;
+    private final Runnable onClose;
 
     /** */
     private volatile State state = State.RUNNING;
 
     /** */
-    private volatile IgniteSQLException ex;
+    private final AtomicReference<Throwable> ex = new AtomicReference<>();
 
     /** */
     private Row row;
@@ -64,15 +63,27 @@ public class RootNode<Row> extends AbstractNode<Row>
     /**
      * @param ctx Execution context.
      */
-    public RootNode(ExecutionContext<Row> ctx, Consumer<RootNode<Row>> 
onClose) {
+    public RootNode(ExecutionContext<Row> ctx) {
         super(ctx);
 
-        this.onClose = onClose;
+        buff = new ArrayDeque<>(IN_BUFFER_SIZE);
+        lock = new ReentrantLock();
+        cond = lock.newCondition();
 
-        // extra space for possible END marker
-        buff = new ArrayDeque<>(IN_BUFFER_SIZE + 1);
+        onClose = this::proceedClose;
+    }
+
+    /**
+     * @param ctx Execution context.
+     */
+    public RootNode(ExecutionContext<Row> ctx, Runnable onClose) {
+        super(ctx);
+
+        buff = new ArrayDeque<>(IN_BUFFER_SIZE);
         lock = new ReentrantLock();
         cond = lock.newCondition();
+
+        this.onClose = onClose;
     }
 
     /** */
@@ -80,39 +91,38 @@ public class RootNode<Row> extends AbstractNode<Row>
         return context().queryId();
     }
 
-    /** {@inheritDoc} */
-    @Override public void cancel() {
-        if (state != State.RUNNING)
-            return;
+    /** */
+    public void proceedClose() {
+        context().execute(() -> {
+            checkThread();
 
+            if (isClosed())
+                return;
+
+            buff.clear();
+
+            super.close();
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
         lock.lock();
         try {
-            if (state != State.RUNNING)
+            if (state == State.RUNNING)
+                state = State.CANCELLED;
+            else if (state == State.END)
+                state = State.CLOSED;
+            else
                 return;
 
-            context().markCancelled();
-            state = State.CANCELLED;
-            buff.clear();
             cond.signalAll();
         }
         finally {
             lock.unlock();
         }
-        
-        context().execute(F.first(sources)::cancel);
-        onClose.accept(this);
-    }
 
-    /**
-     * @return Execution state.
-     */
-    public State state() {
-        return state;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        cancel();
+        onClose.run();
     }
 
     /** {@inheritDoc} */
@@ -123,7 +133,7 @@ public class RootNode<Row> extends AbstractNode<Row>
 
         lock.lock();
         try {
-            assert waiting > 0;
+            assert waiting > 0 : "waiting=" + waiting;
 
             waiting--;
 
@@ -149,7 +159,7 @@ public class RootNode<Row> extends AbstractNode<Row>
     @Override public void end() {
         lock.lock();
         try {
-            assert waiting > 0;
+            assert waiting > 0 : "waiting=" + waiting;
 
             waiting = -1;
 
@@ -165,18 +175,17 @@ public class RootNode<Row> extends AbstractNode<Row>
 
     /** {@inheritDoc} */
     @Override public void onError(Throwable e) {
-        checkThread();
+        if (!ex.compareAndSet(null, e))
+            ex.get().addSuppressed(e);
 
-        ex = new IgniteSQLException("An error occurred while query 
executing.", IgniteQueryErrorCode.UNKNOWN, e);
-
-        cancel();
+        close();
     }
 
     /** {@inheritDoc} */
     @Override public boolean hasNext() {
         if (row != null)
             return true;
-        else if (state == State.END)
+        else if (state == State.END || state == State.CLOSED)
             return false;
         else
             return (row = take()) != null;
@@ -217,10 +226,10 @@ public class RootNode<Row> extends AbstractNode<Row>
 
         lock.lock();
         try {
-            checkCancelled();
-            assert state == State.RUNNING;
-
             while (true) {
+                checkCancelled();
+                assert state == State.RUNNING;
+
                 if (!buff.isEmpty())
                     return buff.poll();
                 else if (waiting == -1)
@@ -231,9 +240,6 @@ public class RootNode<Row> extends AbstractNode<Row>
                 }
 
                 cond.await();
-
-                checkCancelled();
-                assert state == State.RUNNING;
             }
 
             state = State.END;
@@ -247,7 +253,7 @@ public class RootNode<Row> extends AbstractNode<Row>
 
         assert state == State.END;
 
-        onClose.accept(this);
+        close();
         
         return null;
     }
@@ -255,22 +261,21 @@ public class RootNode<Row> extends AbstractNode<Row>
     /** */
     private void checkCancelled() {
         if (state == State.CANCELLED) {
-            if (ex != null)
-                throw ex;
+            ex.compareAndSet(null, new IgniteSQLException("The query was 
cancelled while executing.", IgniteQueryErrorCode.QUERY_CANCELED));
 
-            throw new IgniteSQLException("The query was cancelled while 
executing.", IgniteQueryErrorCode.QUERY_CANCELED);
+            throw sqlException(ex.get());
         }
     }
 
     /** */
-    public enum State {
-        /** */
-        RUNNING,
-
-        /** */
-        CANCELLED,
+    private IgniteSQLException sqlException(Throwable e) {
+        return e instanceof IgniteSQLException
+            ? (IgniteSQLException)e
+            : new IgniteSQLException("An error occurred while query 
executing.", IgniteQueryErrorCode.UNKNOWN, e);
+    }
 
-        /** */
-        END
+    /** */
+    private enum State {
+        RUNNING, CANCELLED, END, CLOSED
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index 72df1e4..fae9530 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -27,7 +27,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.util.Commons;
 /**
  * Scan node.
  */
-public class ScanNode<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, AutoCloseable {
+public class ScanNode<Row> extends AbstractNode<Row> implements 
SingleNode<Row> {
     /** */
     private final Iterable<Row> src;
 
@@ -54,6 +54,9 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     @Override public void request(int rowsCnt) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert rowsCnt > 0 && requested == 0;
 
         requested = rowsCnt;
@@ -63,15 +66,17 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     }
 
     /** {@inheritDoc} */
-    @Override public void cancel() {
-        checkThread();
-        context().markCancelled();
-        close();
-    }
-
-    /** {@inheritDoc} */
     @Override public void close() {
+        if (isClosed())
+            return;
+
         Commons.closeQuiet(it);
+
+        it = null;
+
+        Commons.closeQuiet(src);
+
+        super.close();
     }
 
     /** {@inheritDoc} */
@@ -86,6 +91,9 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
 
     /** */
     private void pushInternal() {
+        if (isClosed())
+            return;
+
         inLoop = true;
         try {
             if (it == null)
@@ -96,7 +104,7 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
             Thread thread = Thread.currentThread();
 
             while (requested > 0 && it.hasNext()) {
-                if (context().cancelled())
+                if (isClosed())
                     return;
 
                 if (thread.isInterrupted())
@@ -117,16 +125,27 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
                 downstream.end();
                 requested = 0;
 
-                close();
+                Commons.closeQuiet(it);
+
+                it = null;
             }
         }
         catch (Throwable e) {
-            close();
-
-            downstream.onError(e);
+            onError(e);
         }
         finally {
             inLoop = false;
         }
     }
+
+    /** */
+    private void onError(Throwable e) {
+        checkThread();
+
+        assert downstream != null;
+
+        downstream.onError(e);
+
+        close();
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 79943b3..64c236d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -60,23 +60,28 @@ public class SortNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     @Override public void request(int rowsCnt) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert !F.isEmpty(sources) && sources.size() == 1;
         assert rowsCnt > 0 && requested == 0;
+        assert waiting <= 0;
 
         requested = rowsCnt;
 
-        if (waiting == -1 && !inLoop)
-            context().execute(this::flushFromBuffer);
-        else if (waiting == 0)
+        if (waiting == 0)
             F.first(sources).request(waiting = IN_BUFFER_SIZE);
-        else
-            throw new AssertionError();
+        else if (!inLoop)
+            context().execute(this::flushFromBuffer);
     }
 
     /** {@inheritDoc} */
     @Override public void push(Row row) {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waiting > 0;
 
@@ -97,6 +102,9 @@ public class SortNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     @Override public void end() {
         checkThread();
 
+        if (isClosed())
+            return;
+
         assert downstream != null;
         assert waiting > 0;
 
@@ -123,28 +131,16 @@ public class SortNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     private void flushFromBuffer() {
         assert waiting == -1;
 
-        inLoop = true;
+        int processed = 0;
 
+        inLoop = true;
         try {
-            int processed = 0;
+            while (requested > 0 && !rows.isEmpty()) {
+                requested--;
 
-            while (requested > 0) {
-                int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
+                downstream.push(rows.poll());
 
-                for (int i = 0; i < toSnd; i++) {
-                    requested--;
-
-                    if (rows.isEmpty())
-                        break;
-
-                    Row row = rows.poll();
-
-                    downstream.push(row);
-
-                    processed++;
-                }
-
-                if (processed >= IN_BUFFER_SIZE && requested > 0) {
+                if (++processed >= IN_BUFFER_SIZE && requested > 0) {
                     // allow others to do their job
                     context().execute(this::flushFromBuffer);
 
@@ -156,7 +152,6 @@ public class SortNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
                 downstream.end();
                 requested = 0;
             }
-
         }
         finally {
             inLoop = false;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
similarity index 70%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
index 933d656..de45c18 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
@@ -20,13 +20,16 @@ package 
org.apache.ignite.internal.processors.query.calcite.message;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  *
  */
-public class InboxCancelMessage implements ExecutionContextAware {
+public class ErrorMessage implements MarshalableMessage {
     /** */
     private UUID queryId;
 
@@ -34,44 +37,47 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
     private long fragmentId;
 
     /** */
-    private long exchangeId;
+    private byte[] errBytes;
 
     /** */
-    private int batchId;
+    @GridDirectTransient
+    private Throwable err;
 
     /** */
-    public InboxCancelMessage(){}
+    public ErrorMessage() {
+        // No-op.
+    }
 
     /** */
-    public InboxCancelMessage(UUID queryId, long fragmentId, long exchangeId, 
int batchId) {
+    public ErrorMessage(UUID queryId, long fragmentId, Throwable err) {
+        assert err != null;
+
         this.queryId = queryId;
         this.fragmentId = fragmentId;
-        this.exchangeId = exchangeId;
-        this.batchId = batchId;
+        this.err = err;
     }
 
-    /** {@inheritDoc} */
-    @Override public UUID queryId() {
+    /**
+     * @return Query ID.
+     */
+    public UUID queryId() {
         return queryId;
     }
 
-    /** {@inheritDoc} */
-    @Override public long fragmentId() {
-        return fragmentId;
-    }
-
     /**
-     * @return Exchange ID.
+     * @return Fragment ID.
      */
-    public long exchangeId() {
-        return exchangeId;
+    public long fragmentId() {
+        return fragmentId;
     }
 
     /**
-     * @return Batch ID.
+     * @return Marshaled Throwable.
      */
-    public int batchId() {
-        return batchId;
+    public Throwable error() {
+        assert err != null;
+
+        return err;
     }
 
     /** {@inheritDoc} */
@@ -87,24 +93,18 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeInt("batchId", batchId))
+                if (!writer.writeByteArray("errBytes", errBytes))
                     return false;
 
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeLong("exchangeId", exchangeId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
                 if (!writer.writeLong("fragmentId", fragmentId))
                     return false;
 
                 writer.incrementState();
 
-            case 3:
+            case 2:
                 if (!writer.writeUuid("queryId", queryId))
                     return false;
 
@@ -123,8 +123,8 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
             return false;
 
         switch (reader.state()) {
-            case 0:
-                batchId = reader.readInt("batchId");
+             case 0:
+                errBytes = reader.readByteArray("errBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -132,14 +132,6 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
                 reader.incrementState();
 
             case 1:
-                exchangeId = reader.readLong("exchangeId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
                 fragmentId = reader.readLong("fragmentId");
 
                 if (!reader.isLastRead())
@@ -147,7 +139,7 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
 
                 reader.incrementState();
 
-            case 3:
+            case 2:
                 queryId = reader.readUuid("queryId");
 
                 if (!reader.isLastRead())
@@ -157,16 +149,26 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
 
         }
 
-        return reader.afterMessageRead(InboxCancelMessage.class);
+        return reader.afterMessageRead(ErrorMessage.class);
     }
 
     /** {@inheritDoc} */
     @Override public MessageType type() {
-        return MessageType.QUERY_INBOX_CANCEL_MESSAGE;
+        return MessageType.QUERY_ERROR_MESSAGE;
     }
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(Marshaller marshaller) throws 
IgniteCheckedException {
+        errBytes = marshaller.marshal(err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader 
loader) throws IgniteCheckedException {
+        err = marshaller.unmarshal(errBytes, loader);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
similarity index 79%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
index 933d656..4398306 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCancelMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
@@ -26,7 +26,7 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  *
  */
-public class InboxCancelMessage implements ExecutionContextAware {
+public class InboxCloseMessage implements CalciteMessage {
     /** */
     private UUID queryId;
 
@@ -37,26 +37,28 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
     private long exchangeId;
 
     /** */
-    private int batchId;
-
-    /** */
-    public InboxCancelMessage(){}
+    public InboxCloseMessage() {
+        // No-op.
+    }
 
     /** */
-    public InboxCancelMessage(UUID queryId, long fragmentId, long exchangeId, 
int batchId) {
+    public InboxCloseMessage(UUID queryId, long fragmentId, long exchangeId) {
         this.queryId = queryId;
         this.fragmentId = fragmentId;
         this.exchangeId = exchangeId;
-        this.batchId = batchId;
     }
 
-    /** {@inheritDoc} */
-    @Override public UUID queryId() {
+    /**
+     * @return Query ID.
+     */
+    public UUID queryId() {
         return queryId;
     }
 
-    /** {@inheritDoc} */
-    @Override public long fragmentId() {
+    /**
+     * @return Fragment ID.
+     */
+    public long fragmentId() {
         return fragmentId;
     }
 
@@ -67,13 +69,6 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
         return exchangeId;
     }
 
-    /**
-     * @return Batch ID.
-     */
-    public int batchId() {
-        return batchId;
-    }
-
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -87,24 +82,18 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeInt("batchId", batchId))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
                 if (!writer.writeLong("exchangeId", exchangeId))
                     return false;
 
                 writer.incrementState();
 
-            case 2:
+            case 1:
                 if (!writer.writeLong("fragmentId", fragmentId))
                     return false;
 
                 writer.incrementState();
 
-            case 3:
+            case 2:
                 if (!writer.writeUuid("queryId", queryId))
                     return false;
 
@@ -124,14 +113,6 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
 
         switch (reader.state()) {
             case 0:
-                batchId = reader.readInt("batchId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
                 exchangeId = reader.readLong("exchangeId");
 
                 if (!reader.isLastRead())
@@ -139,7 +120,7 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
 
                 reader.incrementState();
 
-            case 2:
+            case 1:
                 fragmentId = reader.readLong("fragmentId");
 
                 if (!reader.isLastRead())
@@ -147,7 +128,7 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
 
                 reader.incrementState();
 
-            case 3:
+            case 2:
                 queryId = reader.readUuid("queryId");
 
                 if (!reader.isLastRead())
@@ -157,7 +138,7 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
 
         }
 
-        return reader.afterMessageRead(InboxCancelMessage.class);
+        return reader.afterMessageRead(InboxCloseMessage.class);
     }
 
     /** {@inheritDoc} */
@@ -167,6 +148,6 @@ public class InboxCancelMessage implements 
ExecutionContextAware {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 3;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
index 3444347..1376c00 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
@@ -21,6 +21,7 @@ import java.util.UUID;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.apache.ignite.marshaller.Marshaller;
 
 /**
  *
@@ -35,10 +36,28 @@ public interface MessageService extends Service {
     void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
 
     /**
+     * Checks whether a node with given ID is alive.
+     *
+     * @param nodeId Node ID.
+     * @return {@code True} if node is alive.
+     */
+    boolean alive(UUID nodeId);
+
+    /**
      * Registers a listener for messages of a given type.
      *
      * @param lsnr Listener.
      * @param type Message type.
      */
     void register(MessageListener lsnr, MessageType type);
+
+    /**
+     * @return Message marshaller.
+     */
+    Marshaller marshaller();
+
+    /**
+     * @return Message class loader.
+     */
+    ClassLoader classLoader();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index e8c3caa..1061c6e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -105,10 +106,8 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
         this.classLoader = classLoader;
     }
 
-    /**
-     * @return Class loader.
-     */
-    public ClassLoader classLoader() {
+    /** {@inheritDoc} */
+    @Override public ClassLoader classLoader() {
         return classLoader;
     }
 
@@ -133,10 +132,8 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
         this.marsh = marsh;
     }
 
-    /**
-     * @return Marshaller.
-     */
-    public Marshaller marshaller() {
+    /** {@inheritDoc} */
+    @Override public Marshaller marshaller() {
         return marsh;
     }
 
@@ -211,6 +208,16 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
         assert old == null : old;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean alive(UUID nodeId) {
+        try {
+            return !ioManager().checkNodeLeft(nodeId, null, false);
+        }
+        catch (IgniteClientDisconnectedCheckedException e) {
+            throw new AssertionError(e);
+        }
+    }
+
     /** */
     protected void prepareMarshal(Message msg) throws IgniteCheckedException {
         try {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index 9c1446c..6979b06 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -28,11 +28,13 @@ import 
org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescr
 public enum MessageType {
     QUERY_START_REQUEST(300, QueryStartRequest::new),
     QUERY_START_RESPONSE(301, QueryStartResponse::new),
-    QUERY_CANCEL_REQUEST(302, QueryCancelRequest::new),
+    QUERY_ERROR_MESSAGE(302, ErrorMessage::new),
     QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new),
     QUERY_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage::new),
-    QUERY_INBOX_CANCEL_MESSAGE(305, InboxCancelMessage::new),
-    GENERIC_ROW_MESSAGE(306, GenericRowMessage::new),
+    QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new),
+    QUERY_OUTBOX_CANCEL_MESSAGE(306, OutboxCloseMessage::new),
+    GENERIC_ROW_MESSAGE(307, GenericRowMessage::new),
+
     NODES_MAPPING(350, NodesMapping::new),
     FRAGMENT_DESCRIPTION(351, FragmentDescription::new);
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java
similarity index 62%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java
index 0a3766e..ba6021c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCancelRequest.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/OutboxCloseMessage.java
@@ -26,16 +26,26 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  *
  */
-public class QueryCancelRequest implements CalciteMessage {
+public class OutboxCloseMessage implements CalciteMessage {
     /** */
     private UUID queryId;
 
     /** */
-    QueryCancelRequest(){}
+    private long fragmentId;
 
     /** */
-    public QueryCancelRequest(UUID queryId) {
+    private long exchangeId;
+
+    /** */
+    public OutboxCloseMessage() {
+        // No-op.
+    }
+
+    /** */
+    public OutboxCloseMessage(UUID queryId, long fragmentId, long exchangeId) {
         this.queryId = queryId;
+        this.fragmentId = fragmentId;
+        this.exchangeId = exchangeId;
     }
 
     /**
@@ -45,6 +55,20 @@ public class QueryCancelRequest implements CalciteMessage {
         return queryId;
     }
 
+    /**
+     * @return Fragment ID.
+     */
+    public long fragmentId() {
+        return fragmentId;
+    }
+
+    /**
+     * @return Exchange ID.
+     */
+    public long exchangeId() {
+        return exchangeId;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -58,6 +82,18 @@ public class QueryCancelRequest implements CalciteMessage {
 
         switch (writer.state()) {
             case 0:
+                if (!writer.writeLong("exchangeId", exchangeId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("fragmentId", fragmentId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
                 if (!writer.writeUuid("queryId", queryId))
                     return false;
 
@@ -77,6 +113,22 @@ public class QueryCancelRequest implements CalciteMessage {
 
         switch (reader.state()) {
             case 0:
+                exchangeId = reader.readLong("exchangeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                fragmentId = reader.readLong("fragmentId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
                 queryId = reader.readUuid("queryId");
 
                 if (!reader.isLastRead())
@@ -86,16 +138,16 @@ public class QueryCancelRequest implements CalciteMessage {
 
         }
 
-        return reader.afterMessageRead(QueryCancelRequest.class);
+        return reader.afterMessageRead(OutboxCloseMessage.class);
     }
 
     /** {@inheritDoc} */
     @Override public MessageType type() {
-        return MessageType.QUERY_CANCEL_REQUEST;
+        return MessageType.QUERY_OUTBOX_CANCEL_MESSAGE;
     }
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 1;
+        return 3;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
index 2d3c6b2..c5aacc8 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
@@ -31,7 +31,7 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  *
  */
-public class QueryStartRequest implements MarshalableMessage {
+public class QueryStartRequest implements MarshalableMessage, 
ExecutionContextAware {
     /** */
     private String schema;
 
@@ -76,13 +76,16 @@ public class QueryStartRequest implements 
MarshalableMessage {
         return schema;
     }
 
-    /**
-     * @return Query ID.
-     */
-    public UUID queryId() {
+    /** {@inheritDoc} */
+    @Override public UUID queryId() {
         return qryId;
     }
 
+    /** {@inheritDoc} */
+    @Override public long fragmentId() {
+        return fragmentDesc.fragmentId();
+    }
+
     /**
      * @return Fragment description.
      */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RemoteException.java
similarity index 50%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RemoteException.java
index 3444347..9327034 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RemoteException.java
@@ -15,30 +15,54 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.message;
+package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import java.util.UUID;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.query.calcite.util.Service;
-
 /**
  *
  */
-public interface MessageService extends Service {
+public class RemoteException extends RuntimeException {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private final UUID queryId;
+
+    /** */
+    private final long fragmentId;
+
     /**
-     * Sends a message to given node.
-     *
+     * @param cause Cause.
      * @param nodeId Node ID.
-     * @param msg Message.
+     * @param queryId Query ID.
+     * @param fragmentId Fragment ID.
+     */
+    public RemoteException(UUID nodeId, UUID queryId, long fragmentId, 
Throwable cause) {
+        super("Remote query execution", cause);
+        this.nodeId = nodeId;
+        this.queryId = queryId;
+        this.fragmentId = fragmentId;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Query ID.
      */
-    void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
+    public UUID queryId() {
+        return queryId;
+    }
 
     /**
-     * Registers a listener for messages of a given type.
-     *
-     * @param lsnr Listener.
-     * @param type Message type.
+     * @return Fragment ID.
      */
-    void register(MessageListener lsnr, MessageType type);
+    public long fragmentId() {
+        return fragmentId;
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 28ce580..551a816 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -207,6 +207,7 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
             this.root = root;
         }
 
+        /** */
         Fragment build() {
             return new Fragment(id, root, remotes.build());
         }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index a53b357..214dcc9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -216,20 +216,6 @@ public final class Commons {
             U.closeQuiet((AutoCloseable) o);
     }
 
-    /**
-     * @param o Object to close.
-     * @param e Exception, what causes close.
-     */
-    public static void closeQuiet(Object o, @Nullable Exception e) {
-        if (!(o instanceof AutoCloseable))
-            return;
-
-        if (e != null)
-            U.closeWithSuppressingException((AutoCloseable) o, e);
-        else
-            U.closeQuiet((AutoCloseable) o);
-    }
-
     /** */
     public static RelDataType combinedRowType(IgniteTypeFactory typeFactory, 
RelDataType... types) {
         RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(typeFactory);
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
new file mode 100644
index 0000000..fd83b3b
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Cancel query test.
+ */
+public class CancelTest extends GridCommonAbstractTest {
+    /** Partition release timeout. */
+    private static final long PART_RELEASE_TIMEOUT = 5000L;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(2);
+
+        IgniteCache<Integer, String> c = grid(0).cache("TEST");
+
+        fillCache(c, 5000);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        QueryEntity ePart = new QueryEntity()
+            .setTableName("TEST")
+            .setKeyType(Integer.class.getName())
+            .setValueType(String.class.getName())
+            .setKeyFieldName("id")
+            .setValueFieldName("val")
+            .addQueryField("id", Integer.class.getName(), null)
+            .addQueryField("val", String.class.getName(), null);;
+
+        return super.getConfiguration(igniteInstanceName)
+            .setCacheConfiguration(
+                new CacheConfiguration<>(ePart.getTableName())
+                    .setAffinity(new RendezvousAffinityFunction(false, 8))
+                    .setCacheMode(CacheMode.PARTITIONED)
+                    .setQueryEntities(singletonList(ePart))
+                    .setSqlSchema("PUBLIC"));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testCancel() throws Exception {
+        QueryEngine engine = Commons.lookupComponent(grid(0).context(), 
QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> cursors =
+            engine.query(null, "PUBLIC",
+                "SELECT * FROM TEST",
+                X.EMPTY_OBJECT_ARRAY);
+
+        Iterator<List<?>> it = cursors.get(0).iterator();
+
+        it.next();
+
+        cursors.forEach(QueryCursor::close);
+
+        GridTestUtils.assertThrows(log, () -> {
+                it.next();
+
+                return null;
+            },
+            IgniteSQLException.class, "The query was cancelled while executing"
+        );
+
+        awaitReservationsRelease("TEST");
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testNotOriginatorNodeStop() throws Exception {
+        QueryEngine engine = Commons.lookupComponent(grid(0).context(), 
QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> cursors =
+            engine.query(null, "PUBLIC",
+                "SELECT * FROM TEST",
+                X.EMPTY_OBJECT_ARRAY);
+
+        Iterator<List<?>> it = cursors.get(0).iterator();
+
+        it.next();
+
+        stopGrid(1);
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> {
+                while (it.hasNext())
+                    it.next();
+
+                return null;
+            },
+            ClusterTopologyCheckedException.class, "Node left"
+        );
+
+        awaitReservationsRelease(grid(0), "TEST");
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testOriginatorNodeStop() throws Exception {
+        QueryEngine engine = Commons.lookupComponent(grid(0).context(), 
QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> cursors =
+            engine.query(null, "PUBLIC",
+                "SELECT * FROM TEST",
+                X.EMPTY_OBJECT_ARRAY);
+
+        Iterator<List<?>> it = cursors.get(0).iterator();
+
+        it.next();
+
+        stopGrid(0);
+
+        awaitReservationsRelease(grid(1), "TEST");
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testReadToEnd() throws Exception {
+        QueryEngine engine = Commons.lookupComponent(grid(0).context(), 
QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> cursors =
+            engine.query(null, "PUBLIC",
+                "SELECT * FROM TEST WHERE ID < 1",
+                X.EMPTY_OBJECT_ARRAY);
+
+        Iterator<List<?>> it = cursors.get(0).iterator();
+
+        it.next();
+
+        GridTestUtils.assertThrows(log, () -> {
+                it.next();
+
+                return null;
+            },
+            NoSuchElementException.class, null
+        );
+
+        GridTestUtils.assertThrows(log, () -> {
+                it.next();
+
+                return null;
+            },
+            NoSuchElementException.class, null
+        );
+
+        // Checks that all partition are unreserved.
+        awaitReservationsRelease("TEST");
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testFullReadToEnd() throws Exception {
+        QueryEngine engine = Commons.lookupComponent(grid(0).context(), 
QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> cursors =
+            engine.query(null, "PUBLIC",
+                "SELECT * FROM TEST WHERE ID < 1",
+                X.EMPTY_OBJECT_ARRAY);
+
+        cursors.get(0).getAll();
+
+        // Checks that all partition are unreserved.
+        awaitReservationsRelease("TEST");
+    }
+
+    /**
+     * @param c Cache.
+     * @param rows Rows count.
+     */
+    private void fillCache(IgniteCache c, int rows) throws 
InterruptedException {
+        c.clear();
+
+        for (int i = 0; i < rows; ++i)
+            c.put(i, "val_" + i);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     *
+     */
+    private void startNewNode() throws Exception {
+        startGrid(2);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * @param cacheName Cache to check
+     * @throws IgniteInterruptedCheckedException
+     */
+    void awaitReservationsRelease(String cacheName) throws 
IgniteInterruptedCheckedException {
+        for (Ignite ign : G.allGrids())
+            awaitReservationsRelease((IgniteEx)ign, "TEST");
+    }
+
+    /**
+     * @param node Node to check reservation.
+     * @param cacheName Cache to check reservations.
+     */
+    void awaitReservationsRelease(IgniteEx node, String cacheName) throws 
IgniteInterruptedCheckedException {
+        GridDhtAtomicCache c = 
GridTestUtils.getFieldValue(node.cachex(cacheName), "delegate");
+
+        List<GridDhtLocalPartition> parts = c.topology().localPartitions();
+
+        GridTestUtils.waitForCondition(() -> {
+            for (GridDhtLocalPartition p : parts) {
+                if (p.reservations() > 0)
+                    return false;
+            }
+
+            return true;
+        }, PART_RELEASE_TIMEOUT);
+
+        for (GridDhtLocalPartition p : parts)
+            assertEquals("Partition is reserved: " + p, 0, p.reservations());
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index 7968f0a..aed1f7c 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -1323,7 +1323,7 @@ public class PlannerTest extends GridCommonAbstractTest {
             exec = new LogicalRelImplementor<>(ectx, c1 -> r1 -> 0, 
mailboxRegistry, exchangeSvc,
                 new TestFailureProcessor(kernal)).go(fragment.root());
 
-            RootNode<Object[]> consumer = new RootNode<>(ectx, r -> {});
+            RootNode<Object[]> consumer = new RootNode<>(ectx);
             consumer.register(exec);
 
             //// Remote part
@@ -1568,7 +1568,7 @@ public class PlannerTest extends GridCommonAbstractTest {
             exec = new LogicalRelImplementor<>(ectx, c1 -> r1 -> 0, 
mailboxRegistry, exchangeSvc,
                 new TestFailureProcessor(kernal)).go(fragment.root());
 
-            RootNode<Object[]> consumer = new RootNode<>(ectx, r -> {});
+            RootNode<Object[]> consumer = new RootNode<>(ectx);
             consumer.register(exec);
 
             //// Remote part
@@ -2780,6 +2780,11 @@ public class PlannerTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean alive(UUID nodeId) {
+            return true;
+        }
+
+        /** {@inheritDoc} */
         @Override protected void prepareMarshal(Message msg) {
             // No-op;
         }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index c3c7f9a..c2c625c 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -180,6 +180,11 @@ public class AbstractExecutionTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean alive(UUID nodeId) {
+            return true;
+        }
+
+        /** {@inheritDoc} */
         @Override protected void prepareMarshal(Message msg) {
             // No-op;
         }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
index cf4f8c5..ae07e61 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
@@ -142,7 +142,7 @@ public class ContinuousExecutionTest extends 
AbstractExecutionTest {
 
         inbox.init(ectx, nodes.subList(1, nodes.size()), null);
 
-        RootNode<Object[]> node = new RootNode<>(ectx, r -> {});
+        RootNode<Object[]> node = new RootNode<>(ectx);
 
         node.register(inbox);
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index 21bd420..086225a 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -96,7 +96,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         FilterNode<Object[]> filter = new FilterNode<>(ctx, r -> (Integer) 
r[0] >= 2);
         filter.register(project);
 
-        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        RootNode<Object[]> node = new RootNode<>(ctx);
         node.register(filter);
 
         assert node.hasNext();
@@ -140,7 +140,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         UnionAllNode<Object[]> union = new UnionAllNode<>(ctx);
         union.register(F.asList(scan1, scan2, scan3));
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(union);
 
         assertTrue(root.hasNext());
@@ -193,7 +193,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[0], r[1], r[4]});
         project.register(join);
 
-        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        RootNode<Object[]> node = new RootNode<>(ctx);
         node.register(project);
 
         assert node.hasNext();
@@ -252,7 +252,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[2], r[3], r[1]});
         project.register(join);
 
-        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        RootNode<Object[]> node = new RootNode<>(ctx);
         node.register(project);
 
         assert node.hasNext();
@@ -320,7 +320,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[0], r[1], r[4]});
         project.register(join);
 
-        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        RootNode<Object[]> node = new RootNode<>(ctx);
         node.register(project);
 
         assert node.hasNext();
@@ -371,7 +371,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[1]});
         project.register(join);
 
-        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        RootNode<Object[]> node = new RootNode<>(ctx);
         node.register(project);
 
         assert node.hasNext();
@@ -419,7 +419,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         ProjectNode<Object[]> project = new ProjectNode<>(ctx, r -> new 
Object[]{r[1]});
         project.register(join);
 
-        RootNode<Object[]> node = new RootNode<>(ctx, r -> {});
+        RootNode<Object[]> node = new RootNode<>(ctx);
         node.register(project);
 
         assert node.hasNext();
@@ -471,7 +471,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, 
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
         reduce.register(map);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(reduce);
 
         assertTrue(root.hasNext());
@@ -517,7 +517,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, 
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
         reduce.register(map);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(reduce);
 
         assertTrue(root.hasNext());
@@ -562,7 +562,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, 
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
         reduce.register(map);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(reduce);
 
         assertTrue(root.hasNext());
@@ -607,7 +607,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, 
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
         reduce.register(map);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(reduce);
 
         assertTrue(root.hasNext());
@@ -652,7 +652,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> reduce = new AggregateNode<>(ctx, REDUCE, 
grpSets, accFactory(ctx, call, REDUCE, rowType), rowFactory());
         reduce.register(map);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(reduce);
 
         assertTrue(root.hasNext());
@@ -694,7 +694,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(agg);
 
         assertTrue(root.hasNext());
@@ -738,7 +738,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(agg);
 
         assertTrue(root.hasNext());
@@ -780,7 +780,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(agg);
 
         assertTrue(root.hasNext());
@@ -822,7 +822,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(agg);
 
         assertTrue(root.hasNext());
@@ -864,7 +864,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(agg);
 
         assertTrue(root.hasNext());
@@ -907,7 +907,7 @@ public class ExecutionTest extends AbstractExecutionTest {
         AggregateNode<Object[]> agg = new AggregateNode<>(ctx, SINGLE, 
grpSets, accFactory(ctx, call, SINGLE, rowType), rowFactory());
         agg.register(scan);
 
-        RootNode<Object[]> root = new RootNode<>(ctx, c -> {});
+        RootNode<Object[]> root = new RootNode<>(ctx);
         root.register(agg);
 
         assertTrue(root.hasNext());
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 26b66c9..912d8dc 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import 
org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
+import org.apache.ignite.internal.processors.query.calcite.CancelTest;
 import org.apache.ignite.internal.processors.query.calcite.PlannerTest;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
@@ -40,7 +41,8 @@ import org.junit.runners.Suite;
     ContinuousExecutionTest.class,
     CalciteQueryProcessorTest.class,
     JdbcQueryTest.class,
-    CalciteBasicSecondaryIndexIntegrationTest.class
+    CalciteBasicSecondaryIndexIntegrationTest.class,
+    CancelTest.class
 })
 public class IgniteCalciteTestSuite {
 }

Reply via email to