This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 9551879  IGNITE-16315 Execution workflow optimization - Fixes #9751.
9551879 is described below

commit 9551879df991a05a7b864d2b3ff304302bc7d14e
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Feb 3 10:14:29 2022 +0300

    IGNITE-16315 Execution workflow optimization - Fixes #9751.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 modules/benchmarks/pom.xml                         |  12 +
 .../benchmarks/jmh/sql/JmhSqlBenchmark.java        | 285 +++++++++++++++++++++
 modules/calcite/pom.xml                            |   2 +-
 .../query/calcite/CalciteQueryProcessor.java       |   2 +-
 .../internal/processors/query/calcite/Query.java   |  66 ++++-
 .../processors/query/calcite/RootQuery.java        |  59 ++++-
 .../query/calcite/exec/ExchangeService.java        |  10 +
 .../query/calcite/exec/ExchangeServiceImpl.java    |  39 ++-
 .../query/calcite/exec/ExecutionContext.java       |   3 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   |  48 +++-
 .../calcite/exec/exp/ExpressionFactoryImpl.java    |   9 +-
 .../processors/query/calcite/exec/rel/Inbox.java   |  13 +-
 .../processors/query/calcite/exec/rel/Outbox.java  |   9 +
 .../query/calcite/exec/rel/ScanNode.java           |  20 +-
 .../query/calcite/message/QueryStartRequest.java   |  47 +++-
 .../query/calcite/metadata/ColocationGroup.java    | 260 ++++++++++++++++---
 .../calcite/prepare/AbstractMultiStepPlan.java     |   2 +-
 .../query/calcite/prepare/BaseQueryContext.java    |  14 +-
 .../calcite/schema/CacheTableDescriptorImpl.java   |  18 +-
 .../schema/SystemViewTableDescriptorImpl.java      |  10 +-
 .../calcite/exec/rel/AbstractExecutionTest.java    |   2 +
 .../integration/AbstractBasicIntegrationTest.java  |  11 +-
 .../integration/RunningQueriesIntegrationTest.java | 138 ++++++----
 .../query/calcite/planner/PlannerTest.java         |   5 +
 .../processors/query/GridQueryProcessor.java       |   7 +-
 25 files changed, 934 insertions(+), 157 deletions(-)

diff --git a/modules/benchmarks/pom.xml b/modules/benchmarks/pom.xml
index 4d55310..93b305d 100644
--- a/modules/benchmarks/pom.xml
+++ b/modules/benchmarks/pom.xml
@@ -52,6 +52,18 @@
         </dependency>
 
         <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>ignite-calcite</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.javassist</groupId>
+            <artifactId>javassist</artifactId>
+            <version>${javassist.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.openjdk.jol</groupId>
             <artifactId>jol-core</artifactId>
             <version>0.9</version>
diff --git 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
new file mode 100644
index 0000000..0a97a8c
--- /dev/null
+++ 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.sql;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark simple SQL queries.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Warmup(iterations = 3, time = 5)
+@Measurement(iterations = 3, time = 5)
+public class JmhSqlBenchmark {
+    /** Count of server nodes. */
+    private static final int SRV_NODES_CNT = 3;
+
+    /** Keys count. */
+    private static final int KEYS_CNT = 100000;
+
+    /** Size of batch. */
+    private static final int BATCH_SIZE = 1000;
+
+    /** Partitions count. */
+    private static final int PARTS_CNT = 1024;
+
+    /** IP finder shared across nodes. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Query engine. */
+    @Param({"H2", "CALCITE"})
+    private String engine;
+
+    /** Ignite client. */
+    private Ignite client;
+
+    /** Servers. */
+    private final Ignite[] servers = new Ignite[SRV_NODES_CNT];
+
+    /** Cache. */
+    private IgniteCache<Integer, Item> cache;
+
+    /**
+     * Create Ignite configuration.
+     *
+     * @param igniteInstanceName Ignite instance name.
+     * @return Configuration.
+     */
+    private IgniteConfiguration configuration(String igniteInstanceName) {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setIgniteInstanceName(igniteInstanceName);
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        return cfg;
+    }
+
+    /**
+     * Initiate Ignite and caches.
+     */
+    @Setup(Level.Trial)
+    public void setup() {
+        if ("CALCITE".equals(engine))
+            
System.setProperty(IgniteSystemProperties.IGNITE_EXPERIMENTAL_SQL_ENGINE, 
"true");
+        else
+            
System.clearProperty(IgniteSystemProperties.IGNITE_EXPERIMENTAL_SQL_ENGINE);
+
+        for (int i = 0; i < SRV_NODES_CNT; i++)
+            servers[i] = Ignition.start(configuration("server" + i));
+
+        client = Ignition.start(configuration("client").setClientMode(true));
+
+        cache = client.getOrCreateCache(new CacheConfiguration<Integer, 
Item>("CACHE")
+            .setIndexedTypes(Integer.class, Item.class)
+            .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
+        );
+
+        try (IgniteDataStreamer<Integer, Item> ds = 
client.dataStreamer("CACHE")) {
+            for (int i = 0; i < KEYS_CNT; i++)
+                ds.addData(i, new Item(i));
+        }
+    }
+
+    /**
+     * Stop Ignite instance.
+     */
+    @TearDown
+    public void tearDown() {
+        client.close();
+
+        for (Ignite ignite : servers)
+            ignite.close();
+    }
+
+    /**
+     * Query unique value (full scan).
+     */
+    @Benchmark
+    public void querySimpleUnique() {
+        int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+        List<?> res = executeSql("SELECT name FROM Item WHERE fld=?", key);
+
+        assert res.size() == 1;
+    }
+
+    /**
+     * Query unique value (indexed).
+     */
+    @Benchmark
+    public void querySimpleUniqueIndexed() {
+        int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+        List<?> res = executeSql("SELECT name FROM Item WHERE fldIdx=?", key);
+
+        assert res.size() == 1;
+    }
+
+    /**
+     * Query batch (full scan).
+     */
+    @Benchmark
+    public void querySimpleBatch() {
+        int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+        List<?> res = executeSql("SELECT name FROM Item WHERE fldBatch=?", key 
/ BATCH_SIZE);
+
+        assert res.size() == BATCH_SIZE;
+    }
+
+    /**
+     * Query batch (indexed).
+     */
+    @Benchmark
+    public void querySimpleBatchIndexed() {
+        int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+        List<?> res = executeSql("SELECT name FROM Item WHERE fldIdxBatch=?", 
key / BATCH_SIZE);
+
+        assert res.size() == BATCH_SIZE;
+    }
+
+    /**
+     * Query with group by and aggregate.
+     */
+    @Benchmark
+    public void queryGroupBy() {
+        List<?> res = executeSql("SELECT fldBatch, AVG(fld) FROM Item GROUP BY 
fldBatch");
+
+        assert res.size() == KEYS_CNT / BATCH_SIZE;
+    }
+
+    /**
+     * Query with indexed field group by and aggregate.
+     */
+    @Benchmark
+    public void queryGroupByIndexed() {
+        List<?> res = executeSql("SELECT fldIdxBatch, AVG(fld) FROM Item GROUP 
BY fldIdxBatch");
+
+        assert res.size() == KEYS_CNT / BATCH_SIZE;
+    }
+
+    /**
+     * Query with sorting (full set).
+     */
+    @Benchmark
+    public void queryOrderByFull() {
+        List<?> res = executeSql("SELECT name, fld FROM Item ORDER BY fld 
DESC");
+
+        assert res.size() == KEYS_CNT;
+    }
+
+    /**
+     * Query with sorting (batch).
+     */
+    @Benchmark
+    public void queryOrderByBatch() {
+        int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+        List<?> res = executeSql("SELECT name, fld FROM Item WHERE 
fldIdxBatch=? ORDER BY fld DESC", key / BATCH_SIZE);
+
+        assert res.size() == BATCH_SIZE;
+    }
+
+    /** */
+    private List<?> executeSql(String sql, Object... args) {
+        List<List<?>> res = cache.query(new 
SqlFieldsQuery(sql).setArgs(args)).getAll();
+
+        return res.get(0);
+    }
+
+    /**
+     * Run benchmarks.
+     *
+     * @param args Args.
+     * @throws Exception Exception.
+     */
+    public static void main(String[] args) throws Exception {
+        final Options options = new OptionsBuilder()
+            .include(JmhSqlBenchmark.class.getSimpleName())
+            .build();
+
+        new Runner(options).run();
+    }
+
+    /** */
+    private static class Item {
+        /** */
+        @QuerySqlField
+        private final String name;
+
+        /** */
+        @QuerySqlField
+        private final int fld;
+
+        /** */
+        @QuerySqlField
+        private final int fldBatch;
+
+        /** */
+        @QuerySqlField(index = true)
+        private final int fldIdx;
+
+        /** */
+        @QuerySqlField(index = true)
+        private final int fldIdxBatch;
+
+        /** */
+        public Item(int val) {
+            name = "name" + val;
+            fld = val;
+            fldBatch = val / BATCH_SIZE;
+            fldIdx = val;
+            fldIdxBatch = val / BATCH_SIZE;
+        }
+    }
+}
diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index dba2262..19c6bd0 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -30,7 +30,7 @@
         <janino.version>3.1.6</janino.version>
         <avatica.version>1.19.0</avatica.version>
         <jsonpath.version>2.4.0</jsonpath.version>
-        <reflections.version>0.9.10</reflections.version>
+        <reflections.version>0.10.2</reflections.version>
         <javassist.version>3.28.0-GA</javassist.version>
         <esri.geometry.version>2.2.0</esri.geometry.version>
         <checker.version>3.10.0</checker.version>
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 24ba82a..26b35fe 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -361,7 +361,7 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
             try {
                 if (qryList.size() == 1) {
                     plan = queryPlanCache().queryPlan(
-                        new CacheKey(schemaName, qry.sql()),
+                        new CacheKey(schemaName, sql), // Use source SQL to 
avoid redundant parsing next time.
                         () -> prepareSvc.prepareSingle(sqlNode, 
qry.planningContext()));
                 }
                 else
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
index 1c48bd1..872b0c3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
@@ -19,14 +19,13 @@ package org.apache.ignite.internal.processors.query.calcite;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -40,9 +39,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 
 /** */
 public class Query<RowT> implements RunningQuery {
-    /** Completable futures empty array. */
-    private static final CompletableFuture<?>[] 
COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture<?>[0];
-
     /** */
     private final UUID initNodeId;
 
@@ -67,6 +63,15 @@ public class Query<RowT> implements RunningQuery {
     /** */
     protected final ExchangeService exch;
 
+    /** */
+    protected final int totalFragmentsCnt;
+
+    /** */
+    protected final AtomicInteger finishedFragmentsCnt = new AtomicInteger();
+
+    /** */
+    protected final Set<Long> initNodeStartedExchanges = new HashSet<>();
+
     /** Logger. */
     protected final IgniteLogger log;
 
@@ -77,7 +82,8 @@ public class Query<RowT> implements RunningQuery {
         GridQueryCancel cancel,
         ExchangeService exch,
         Consumer<Query<RowT>> unregister,
-        IgniteLogger log
+        IgniteLogger log,
+        int totalFragmentsCnt
     ) {
         this.id = id;
         this.unregister = unregister;
@@ -88,6 +94,7 @@ public class Query<RowT> implements RunningQuery {
         this.cancel = cancel != null ? cancel : new GridQueryCancel();
 
         fragments = Collections.newSetFromMap(new ConcurrentHashMap<>());
+        this.totalFragmentsCnt = totalFragmentsCnt;
     }
 
     /** */
@@ -181,6 +188,53 @@ public class Query<RowT> implements RunningQuery {
             cancel();
     }
 
+    /**
+     * Callback after the first batch of the query fragment from the node is 
received.
+     */
+    public void onInboundExchangeStarted(UUID nodeId, long exchangeId) {
+        // No-op.
+    }
+
+    /**
+     * Callback after the last batch of the query fragment from the node is 
processed.
+     */
+    public void onInboundExchangeFinished(UUID nodeId, long exchangeId) {
+        // No-op.
+    }
+
+    /**
+     * Callback after the first batch of the query fragment from the node is 
sent.
+     */
+    public void onOutboundExchangeStarted(UUID nodeId, long exchangeId) {
+        if (initNodeId.equals(nodeId))
+            initNodeStartedExchanges.add(exchangeId);
+    }
+
+    /**
+     * Callback after the last batch of the query fragment is sent to all 
nodes.
+     */
+    public void onOutboundExchangeFinished(long exchangeId) {
+        if (finishedFragmentsCnt.incrementAndGet() == totalFragmentsCnt) {
+            QueryState state0;
+
+            synchronized (mux) {
+                state0 = state;
+
+                if (state0 == QueryState.EXECUTING)
+                    state = QueryState.CLOSED;
+            }
+
+            if (state0 == QueryState.EXECUTING)
+                tryClose();
+        }
+    }
+
+    /** */
+    public boolean isExchangeWithInitNodeStarted(long fragmentId) {
+        // On remote node exchange ID is the same as fragment ID.
+        return initNodeStartedExchanges.contains(fragmentId);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(Query.class, this, "state", state, "fragments", 
fragments);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index 449b050..7cca5e1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -17,14 +17,16 @@
 
 package org.apache.ignite.internal.processors.query.calcite;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.Frameworks;
@@ -65,10 +67,10 @@ public class RootQuery<RowT> extends Query<RowT> {
     /** Parameters. */
     private final Object[] params;
 
-    /** remote nodes */
-    private final Set<UUID> remotes;
+    /** Remote nodes unfinished fragments count. AtomicInteger used just as 
int holder, there is no concurrency here. */
+    private final Map<UUID, AtomicInteger> remoteFragments;
 
-    /** node to fragment */
+    /** Node to fragment. */
     private final Set<RemoteFragmentKey> waiting;
 
     /** */
@@ -96,13 +98,14 @@ public class RootQuery<RowT> extends Query<RowT> {
             qryCtx != null ? qryCtx.unwrap(GridQueryCancel.class) : null,
             exch,
             unregister,
-            log
+            log,
+            0 // Total fragments count not used for RootQuery.
         );
 
         this.sql = sql;
         this.params = params;
 
-        remotes = new HashSet<>();
+        remoteFragments = new HashMap<>();
         waiting = new HashSet<>();
 
         Context parent = Commons.convert(qryCtx);
@@ -184,7 +187,15 @@ public class RootQuery<RowT> extends Query<RowT> {
                 Fragment fragment = plan.fragments().get(i);
                 List<UUID> nodes = plan.mapping(fragment).nodeIds();
 
-                remotes.addAll(nodes);
+                nodes.forEach(n -> remoteFragments.compute(n, (id, cnt) -> {
+                    if (cnt == null)
+                        return new AtomicInteger(1);
+                    else {
+                        cnt.incrementAndGet();
+
+                        return cnt;
+                    }
+                }));
 
                 for (UUID node : nodes)
                     waiting.add(new RemoteFragmentKey(node, 
fragment.fragmentId()));
@@ -225,14 +236,16 @@ public class RootQuery<RowT> extends Query<RowT> {
             try {
                 IgniteException wrpEx = null;
 
-                for (UUID nodeId : remotes) {
+                for (Map.Entry<UUID, AtomicInteger> entry : 
remoteFragments.entrySet()) {
                     try {
-                        if (!nodeId.equals(root.context().localNodeId()))
-                            exch.closeQuery(nodeId, id());
+                        // Don't send close message if all remote fragments 
are finished (query is self-closed on the
+                        // remote node in this case).
+                        if 
(!entry.getKey().equals(root.context().localNodeId()) && entry.getValue().get() 
> 0)
+                            exch.closeQuery(entry.getKey(), id());
                     }
                     catch (IgniteCheckedException e) {
                         if (wrpEx == null)
-                            wrpEx = new IgniteException("Failed to send cancel 
message. [nodeId=" + nodeId + ']', e);
+                            wrpEx = new IgniteException("Failed to send cancel 
message. [nodeId=" + entry.getKey() + ']', e);
                         else
                             wrpEx.addSuppressed(e);
                     }
@@ -341,6 +354,30 @@ public class RootQuery<RowT> extends Query<RowT> {
         tryClose();
     }
 
+    /** {@inheritDoc} */
+    @Override public void onInboundExchangeStarted(UUID nodeId, long 
exchangeId) {
+        onResponse(nodeId, exchangeId, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onInboundExchangeFinished(UUID nodeId, long 
exchangeId) {
+        AtomicInteger cnt = remoteFragments.get(nodeId);
+
+        assert cnt != null : nodeId;
+
+        cnt.decrementAndGet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onOutboundExchangeStarted(UUID nodeId, long 
exchangeId) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onOutboundExchangeFinished(long exchangeId) {
+        // No-op.
+    }
+
     /** */
     @Override public String toString() {
         return S.toString(RootQuery.class, this);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index ffbf5bc..5c618d2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -80,4 +80,14 @@ public interface ExchangeService extends Service {
      * @return {@code true} if node is alive, {@code false} otherwise.
      */
     boolean alive(UUID nodeId);
+
+    /**
+     * Callback after the last batch of the query fragment is sent.
+     */
+    void onOutboundExchangeFinished(UUID qryId, long exchangeId);
+
+    /**
+     * Callback after the last batch of the query fragment from the node is 
processed.
+     */
+    void onInboundExchangeFinished(UUID nodeId, UUID qryId, long exchangeId);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index abb45d2..2a2a246 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.RunningQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.Query;
 import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
@@ -123,6 +124,13 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
     @Override public <Row> void sendBatch(UUID nodeId, UUID qryId, long 
fragmentId, long exchangeId, int batchId,
         boolean last, List<Row> rows) throws IgniteCheckedException {
         messageService().send(nodeId, new QueryBatchMessage(qryId, fragmentId, 
exchangeId, batchId, last, Commons.cast(rows)));
+
+        if (batchId == 0) {
+            Query<?> qry = (Query<?>)qryRegistry.query(qryId);
+
+            if (qry != null)
+                qry.onOutboundExchangeStarted(nodeId, exchangeId);
+        }
     }
 
     /** {@inheritDoc} */
@@ -172,6 +180,22 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
         return messageService().alive(nodeId);
     }
 
+    /** {@inheritDoc} */
+    @Override public void onOutboundExchangeFinished(UUID qryId, long 
exchangeId) {
+        Query<?> qry = (Query<?>)qryRegistry.query(qryId);
+
+        if (qry != null)
+            qry.onOutboundExchangeFinished(exchangeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onInboundExchangeFinished(UUID nodeId, UUID qryId, 
long exchangeId) {
+        Query<?> qry = (Query<?>)qryRegistry.query(qryId);
+
+        if (qry != null)
+            qry.onInboundExchangeFinished(nodeId, exchangeId);
+    }
+
     /** */
     protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
         Collection<Inbox<?>> inboxes = 
mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
@@ -196,9 +220,11 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
         if (qry != null)
             qry.cancel();
         else {
-            log.warning("Stale query close message received: [" +
-                "nodeId=" + nodeId +
-                ", queryId=" + msg.queryId() + "]");
+            if (log.isDebugEnabled()) {
+                log.debug("Stale query close message received: [" +
+                    "nodeId=" + nodeId +
+                    ", queryId=" + msg.queryId() + "]");
+            }
         }
     }
 
@@ -241,6 +267,13 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
 
         if (inbox != null) {
             try {
+                if (msg.batchId() == 0) {
+                    Query<?> qry = (Query<?>)qryRegistry.query(msg.queryId());
+
+                    if (qry != null)
+                        qry.onInboundExchangeStarted(nodeId, msg.exchangeId());
+                }
+
                 inbox.onBatchReceived(nodeId, msg.batchId(), msg.last(), 
Commons.cast(msg.rows()));
             }
             catch (Throwable e) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 73a0472..dad750c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -119,7 +119,8 @@ public class ExecutionContext<Row> extends 
AbstractQueryContext implements DataC
         expressionFactory = new ExpressionFactoryImpl<>(
             this,
             qctx.typeFactory(),
-            qctx.config().getParserConfig().conformance()
+            qctx.config().getParserConfig().conformance(),
+            qctx.rexBuilder()
         );
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 2f429a5..1909f47 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -18,8 +18,11 @@
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
@@ -446,7 +449,7 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
                 );
 
             case EXPLAIN:
-                return executeExplain((ExplainPlan)plan);
+                return executeExplain(qry, (ExplainPlan)plan);
 
             case DDL:
                 return executeDdl(qry, (DdlPlan)plan);
@@ -534,7 +537,12 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
 
         qry.run(ectx, plan, node);
 
-        // start remote execution
+        Map<UUID, Long> fragmentsPerNode = fragments.stream()
+            .skip(1)
+            .flatMap(f -> f.mapping().nodeIds().stream())
+            .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
+
+        // Start remote execution.
         for (int i = 1; i < fragments.size(); i++) {
             fragment = fragments.get(i);
             fragmentDesc = new FragmentDescription(
@@ -544,6 +552,8 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
                 plan.remotes(fragment));
 
             Throwable ex = null;
+            byte[] parametersMarshalled = null;
+
             for (UUID nodeId : fragmentDesc.nodeIds()) {
                 if (ex != null)
                     qry.onResponse(nodeId, fragment.fragmentId(), ex);
@@ -555,9 +565,16 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
                             fragment.serialized(),
                             ectx.topologyVersion(),
                             fragmentDesc,
-                            qry.parameters());
+                            fragmentsPerNode.get(nodeId).intValue(),
+                            qry.parameters(),
+                            parametersMarshalled
+                        );
 
                         messageService().send(nodeId, req);
+
+                        // Avoid marshaling of the same parameters for other 
nodes.
+                        if (parametersMarshalled == null)
+                            parametersMarshalled = req.parametersMarshalled();
                     }
                     catch (Throwable e) {
                         qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
@@ -570,10 +587,12 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
     }
 
     /** */
-    private FieldsQueryCursor<List<?>> executeExplain(ExplainPlan plan) {
+    private FieldsQueryCursor<List<?>> executeExplain(RootQuery<Row> qry, 
ExplainPlan plan) {
         QueryCursorImpl<List<?>> cur = new 
QueryCursorImpl<>(singletonList(singletonList(plan.plan())));
         
cur.fieldsMeta(plan.fieldsMeta().queryFieldsMetadata(Commons.typeFactory()));
 
+        qryReg.unregister(qry.id());
+
         return cur;
     }
 
@@ -592,16 +611,18 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
 
         qry.addFragment(new RunningFragment<>(plan.root(), node, ectx));
 
-        try {
-            messageService().send(origNodeId, new QueryStartResponse(qry.id(), 
ectx.fragmentId()));
-        }
-        catch (IgniteCheckedException e) {
-            IgniteException wrpEx = new IgniteException("Failed to send reply. 
[nodeId=" + origNodeId + ']', e);
+        node.init();
 
-            throw wrpEx;
-        }
+        if (!qry.isExchangeWithInitNodeStarted(ectx.fragmentId())) {
+            try {
+                messageService().send(origNodeId, new 
QueryStartResponse(qry.id(), ectx.fragmentId()));
+            }
+            catch (IgniteCheckedException e) {
+                IgniteException wrpEx = new IgniteException("Failed to send 
reply. [nodeId=" + origNodeId + ']', e);
 
-        node.init();
+                throw wrpEx;
+            }
+        }
     }
 
     /** */
@@ -624,7 +645,8 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
                     null,
                     exchangeSvc,
                     (q) -> qryReg.unregister(q.id()),
-                    log
+                    log,
+                    msg.totalFragmentsCount()
                 )
             );
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
index 7d8e8df..31c9fd5 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java
@@ -95,12 +95,17 @@ public class ExpressionFactoryImpl<Row> implements 
ExpressionFactory<Row> {
     private final ExecutionContext<Row> ctx;
 
     /** */
-    public ExpressionFactoryImpl(ExecutionContext<Row> ctx, IgniteTypeFactory 
typeFactory, SqlConformance conformance) {
+    public ExpressionFactoryImpl(
+        ExecutionContext<Row> ctx,
+        IgniteTypeFactory typeFactory,
+        SqlConformance conformance,
+        RexBuilder rexBuilder
+    ) {
         this.ctx = ctx;
         this.typeFactory = typeFactory;
         this.conformance = conformance;
+        this.rexBuilder = rexBuilder;
 
-        rexBuilder = new RexBuilder(this.typeFactory);
         emptyType = new RelDataTypeFactory.Builder(this.typeFactory).build();
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index 7e12665..5d90422 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -213,6 +213,9 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Singl
                     break;
                 case END:
                     it.remove();
+
+                    exchange.onInboundExchangeFinished(buf.nodeId, queryId(), 
exchangeId);
+
                     break;
                 case WAITING:
                     return false;
@@ -252,6 +255,8 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Singl
                     case END:
                         buffers.remove(buf);
 
+                        exchange.onInboundExchangeFinished(buf.nodeId, 
queryId(), exchangeId);
+
                         break;
                     case READY:
                         heap.offer(Pair.of(buf.peek(), buf));
@@ -290,6 +295,8 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Singl
                     case END:
                         buffers.remove(idx--);
 
+                        exchange.onInboundExchangeFinished(buf.nodeId, 
queryId(), exchangeId);
+
                         break;
                     case READY:
                         noProgress = 0;
@@ -491,11 +498,11 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Singl
 
             Row row = curr.rows.set(curr.idx++, null);
 
-            if (curr.idx == curr.rows.size()) {
+            if (curr.idx == curr.rows.size() && !curr.last) {
+                // Don't send acknowledge for the last batch, since outbox 
already should be closed after the last batch.
                 acknowledge(nodeId, curr.batchId);
 
-                if (!isEnd())
-                    curr = pollBatch();
+                curr = pollBatch();
             }
 
             return row;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index 6b7ea84..0d4082f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -64,6 +64,9 @@ public class Outbox<Row> extends AbstractNode<Row> implements 
Mailbox<Row>, Sing
     /** */
     private int waiting;
 
+    /** */
+    private boolean exchangeFinished;
+
     /**
      * @param ctx Execution context.
      * @param exchange Exchange service.
@@ -254,6 +257,12 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Sing
         else if (waiting == -1) {
             for (UUID node : dest.targets())
                 getOrCreateBuffer(node).end();
+
+            if (!exchangeFinished) {
+                exchange.onOutboundExchangeFinished(queryId(), exchangeId);
+
+                exchangeFinished = true;
+            }
         }
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index c6ed6d5..ef761a7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
 import java.util.Iterator;
 import java.util.List;
-
 import org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -40,6 +39,9 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
     /** */
     private boolean inLoop;
 
+    /** */
+    private boolean firstReq = true;
+
     /**
      * @param ctx Execution context.
      * @param src Source.
@@ -58,8 +60,20 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
 
         requested = rowsCnt;
 
-        if (!inLoop)
-            context().execute(this::push, this::onError);
+        if (!inLoop) {
+            if (firstReq) {
+                try {
+                    push(); // Make first request sync to reduce latency in 
simple cases.
+                }
+                catch (Throwable e) {
+                    onError(e);
+                }
+
+                firstReq = false;
+            }
+            else
+                context().execute(this::push, this::onError);
+        }
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
index afc5126..e71b1d0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
@@ -25,6 +25,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -45,6 +46,9 @@ public class QueryStartRequest implements MarshalableMessage, 
ExecutionContextAw
     /** */
     private String root;
 
+    /** Total count of fragments in query for this node. */
+    private int totalFragmentsCnt;
+
     /** */
     @GridDirectTransient
     private Object[] params;
@@ -54,14 +58,24 @@ public class QueryStartRequest implements 
MarshalableMessage, ExecutionContextAw
 
     /** */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
-    public QueryStartRequest(UUID qryId, String schema, String root, 
AffinityTopologyVersion ver,
-        FragmentDescription fragmentDesc, Object[] params) {
+    public QueryStartRequest(
+        UUID qryId,
+        String schema,
+        String root,
+        AffinityTopologyVersion ver,
+        FragmentDescription fragmentDesc,
+        int totalFragmentsCnt,
+        Object[] params,
+        @Nullable byte[] paramsBytes
+    ) {
         this.qryId = qryId;
         this.schema = schema;
         this.root = root;
         this.ver = ver;
         this.fragmentDesc = fragmentDesc;
+        this.totalFragmentsCnt = totalFragmentsCnt;
         this.params = params;
+        this.paramsBytes = paramsBytes; // If we already have marshalled 
params, use it.
     }
 
     /** */
@@ -106,6 +120,13 @@ public class QueryStartRequest implements 
MarshalableMessage, ExecutionContextAw
     }
 
     /**
+     * @return Total count of fragments in query for this node.
+     */
+    public int totalFragmentsCount() {
+        return totalFragmentsCnt;
+    }
+
+    /**
      * @return Query parameters.
      */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
@@ -113,6 +134,14 @@ public class QueryStartRequest implements 
MarshalableMessage, ExecutionContextAw
         return params;
     }
 
+    /**
+     * @return Query parameters marshalled.
+     */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    public byte[] parametersMarshalled() {
+        return paramsBytes;
+    }
+
     /** {@inheritDoc} */
     @Override public void prepareMarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
         if (paramsBytes == null && params != null)
@@ -177,6 +206,11 @@ public class QueryStartRequest implements 
MarshalableMessage, ExecutionContextAw
 
                 writer.incrementState();
 
+            case 6:
+                if (!writer.writeInt("totalFragmentsCnt", totalFragmentsCnt))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -238,6 +272,13 @@ public class QueryStartRequest implements 
MarshalableMessage, ExecutionContextAw
 
                 reader.incrementState();
 
+            case 6:
+                totalFragmentsCnt = reader.readInt("totalFragmentsCnt");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(QueryStartRequest.class);
@@ -250,6 +291,6 @@ public class QueryStartRequest implements 
MarshalableMessage, ExecutionContextAw
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index ab9aed1..64a6d10 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -19,13 +19,16 @@ package 
org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import org.apache.ignite.IgniteSystemProperties;
+import java.util.stream.LongStream;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
@@ -34,11 +37,10 @@ import 
org.apache.ignite.internal.processors.query.calcite.message.MarshallingCo
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.GridIntIterator;
 import org.apache.ignite.internal.util.GridIntList;
-import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -47,12 +49,7 @@ import org.jetbrains.annotations.NotNull;
 /** */
 public class ColocationGroup implements MarshalableMessage {
     /** */
-    private static final int SYNTHETIC_PARTITIONS_COUNT =
-        
IgniteSystemProperties.getInteger("IGNITE_CALCITE_SYNTHETIC_PARTITIONS_COUNT", 
512);
-
-    /** */
-    @GridDirectCollection(Long.class)
-    private List<Long> sourceIds;
+    private long[] sourceIds;
 
     /** */
     @GridDirectCollection(UUID.class)
@@ -62,9 +59,8 @@ public class ColocationGroup implements MarshalableMessage {
     @GridDirectTransient
     private List<List<UUID>> assignments;
 
-    /** */
-    @GridDirectCollection(Message.class)
-    private List<UUIDCollectionMessage> assignments0;
+    /** Marshalled assignments. */
+    private int[] marshalledAssignments;
 
     /** */
     public static ColocationGroup forNodes(List<UUID> nodeIds) {
@@ -78,7 +74,7 @@ public class ColocationGroup implements MarshalableMessage {
 
     /** */
     public static ColocationGroup forSourceId(long sourceId) {
-        return new ColocationGroup(Collections.singletonList(sourceId), null, 
null);
+        return new ColocationGroup(new long[] {sourceId}, null, null);
     }
 
     /** */
@@ -86,20 +82,13 @@ public class ColocationGroup implements MarshalableMessage {
     }
 
     /** */
-    private ColocationGroup(List<Long> sourceIds, List<UUID> nodeIds, 
List<List<UUID>> assignments) {
+    private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, 
List<List<UUID>> assignments) {
         this.sourceIds = sourceIds;
         this.nodeIds = nodeIds;
         this.assignments = assignments;
     }
 
     /**
-     * @return Lists of colocation group sources.
-     */
-    public List<Long> sourceIds() {
-        return sourceIds == null ? Collections.emptyList() : sourceIds;
-    }
-
-    /**
      * @return Lists of nodes capable to execute a query fragment for what the 
mapping is calculated.
      */
     public List<UUID> nodeIds() {
@@ -126,7 +115,15 @@ public class ColocationGroup implements MarshalableMessage 
{
 
     /** */
     public boolean belongs(long sourceId) {
-        return sourceIds != null && sourceIds.contains(sourceId);
+        if (sourceIds == null)
+            return false;
+
+        for (long i : sourceIds) {
+            if (i == sourceId)
+                return true;
+        }
+
+        return false;
     }
 
     /**
@@ -137,11 +134,11 @@ public class ColocationGroup implements 
MarshalableMessage {
      * being calculated fragment.
      */
     public ColocationGroup colocate(ColocationGroup other) throws 
ColocationMappingException {
-        List<Long> sourceIds;
+        long[] sourceIds;
         if (this.sourceIds == null || other.sourceIds == null)
             sourceIds = U.firstNotNull(this.sourceIds, other.sourceIds);
         else
-            sourceIds = Commons.combine(this.sourceIds, other.sourceIds);
+            sourceIds = LongStream.concat(Arrays.stream(this.sourceIds), 
Arrays.stream(other.sourceIds)).distinct().toArray();
 
         List<UUID> nodeIds;
         if (this.nodeIds == null || other.nodeIds == null)
@@ -224,9 +221,11 @@ public class ColocationGroup implements MarshalableMessage 
{
 
     /** */
     @NotNull private ColocationGroup forNodes0(List<UUID> nodeIds) {
-        List<List<UUID>> assignments = new 
ArrayList<>(SYNTHETIC_PARTITIONS_COUNT);
-        for (int i = 0; i < SYNTHETIC_PARTITIONS_COUNT; i++)
-            assignments.add(F.asList(nodeIds.get(i % nodeIds.size())));
+        List<List<UUID>> assignments = new ArrayList<>(nodeIds.size());
+
+        for (UUID nodeId : nodeIds)
+            assignments.add(Collections.singletonList(nodeId));
+
         return new ColocationGroup(sourceIds, nodeIds, assignments);
     }
 
@@ -266,7 +265,7 @@ public class ColocationGroup implements MarshalableMessage {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeCollection("assignments0", assignments0, 
MessageCollectionItemType.MSG))
+                if (!writer.writeIntArray("marshalledAssignments", 
marshalledAssignments))
                     return false;
 
                 writer.incrementState();
@@ -278,7 +277,7 @@ public class ColocationGroup implements MarshalableMessage {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeCollection("sourceIds", sourceIds, 
MessageCollectionItemType.LONG))
+                if (!writer.writeLongArray("sourceIds", sourceIds))
                     return false;
 
                 writer.incrementState();
@@ -297,7 +296,7 @@ public class ColocationGroup implements MarshalableMessage {
 
         switch (reader.state()) {
             case 0:
-                assignments0 = reader.readCollection("assignments0", 
MessageCollectionItemType.MSG);
+                marshalledAssignments = 
reader.readIntArray("marshalledAssignments");
 
                 if (!reader.isLastRead())
                     return false;
@@ -313,7 +312,7 @@ public class ColocationGroup implements MarshalableMessage {
                 reader.incrementState();
 
             case 2:
-                sourceIds = reader.readCollection("sourceIds", 
MessageCollectionItemType.LONG);
+                sourceIds = reader.readLongArray("sourceIds");
 
                 if (!reader.isLastRead())
                     return false;
@@ -332,23 +331,202 @@ public class ColocationGroup implements 
MarshalableMessage {
 
     /** {@inheritDoc} */
     @Override public void prepareMarshal(MarshallingContext ctx) {
-        if (assignments != null && assignments0 == null)
-            assignments0 = Commons.transform(assignments, this::transform);
+        if (assignments != null && marshalledAssignments == null) {
+            Map<UUID, Integer> nodeIdxs = new HashMap<>();
+
+            for (int i = 0; i < nodeIds.size(); i++)
+                nodeIdxs.put(nodeIds.get(i), i);
+
+            int bitsPerPart = Integer.SIZE - 
Integer.numberOfLeadingZeros(nodeIds.size());
+
+            CompactedIntArray.Builder builder = 
CompactedIntArray.builder(bitsPerPart, assignments.size());
+
+            for (List<UUID> assignment : assignments) {
+                assert F.isEmpty(assignment) || assignment.size() == 1;
+
+                if (F.isEmpty(assignment))
+                    builder.add(nodeIds.size());
+                else {
+                    Integer nodeIdx = nodeIdxs.get(assignment.get(0));
+
+                    builder.add(nodeIdx);
+                }
+            }
+
+            marshalledAssignments = builder.build().buffer();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void prepareUnmarshal(MarshallingContext ctx) {
-        if (assignments0 != null && assignments == null)
-            assignments = Commons.transform(assignments0, this::transform);
-    }
+        if (marshalledAssignments != null && assignments == null) {
+            int bitsPerPart = Integer.SIZE - 
Integer.numberOfLeadingZeros(nodeIds.size());
 
-    /** */
-    private List<UUID> transform(UUIDCollectionMessage message) {
-        return message.uuids() instanceof List ? (List<UUID>)message.uuids() : 
new ArrayList<>(message.uuids());
+            CompactedIntArray compactedArr = CompactedIntArray.of(bitsPerPart, 
marshalledAssignments);
+
+            assignments = new ArrayList<>(compactedArr.size());
+
+            for (GridIntIterator iter = compactedArr.iterator(); 
iter.hasNext(); ) {
+                int nodeIdx = iter.next();
+
+                assignments.add(nodeIdx >= nodeIds.size() ? 
Collections.emptyList() :
+                    Collections.singletonList(nodeIds.get(nodeIdx)));
+            }
+        }
     }
 
     /** */
-    private UUIDCollectionMessage transform(List<UUID> uuids) {
-        return new UUIDCollectionMessage(uuids);
+    private static class CompactedIntArray {
+        /** */
+        protected static final int BUF_POS_MASK = Integer.SIZE - 1;
+
+        /** */
+        protected static final int BUF_POS_LOG2 = Integer.SIZE - 
Integer.numberOfLeadingZeros(Integer.SIZE - 1);
+
+        /** */
+        protected static final int[] BIT_MASKS = new int[Integer.SIZE];
+
+        static {
+            for (int i = 0; i < Integer.SIZE; i++)
+                BIT_MASKS[i] = ~(-1 << i);
+        }
+
+        /** Buffer. */
+        private final int[] buf;
+
+        /** Bits count per each item. */
+        private final int bitsPerItem;
+
+        /** Ctor. */
+        private CompactedIntArray(int bitsPerItem, int[] buf) {
+            this.bitsPerItem = bitsPerItem;
+            this.buf = buf;
+        }
+
+        /** */
+        public int[] buffer() {
+            return buf;
+        }
+
+        /** */
+        public int size() {
+            return buf[0];
+        }
+
+        /** */
+        public GridIntIterator iterator() {
+            return new Iterator();
+        }
+
+        /** */
+        public static CompactedIntArray of(int bitsPerItem, int[] buf) {
+            return new CompactedIntArray(bitsPerItem, buf);
+        }
+
+        /** */
+        public static Builder builder(int bitsPerItem, int capacity) {
+            return new Builder(bitsPerItem, capacity);
+        }
+
+        /** */
+        private static class Builder {
+            /** Current bit position. */
+            private int bitPos = Integer.SIZE; // Skip first element.
+
+            /** Current size. */
+            private int size;
+
+            /** Buffer. */
+            protected final int[] buf;
+
+            /** Bits count per each item. */
+            protected final int bitsPerItem;
+
+            /** Ctor. */
+            public Builder(int bitsPerItem, int capacity) {
+                this.bitsPerItem = bitsPerItem;
+                buf = new int[(capacity * bitsPerItem + Integer.SIZE - 1) / 
Integer.SIZE + 1];
+                buf[0] = capacity;
+            }
+
+            /** Add the next item. */
+            public void add(int val) {
+                assert size < buf[0];
+
+                int bitsToWrite = bitsPerItem;
+                int bitPos = this.bitPos;
+
+                do {
+                    int bitsToWriteCurBuf = Math.min(bitsToWrite, Integer.SIZE 
- (bitPos & BUF_POS_MASK));
+
+                    int writeVal = (val & BIT_MASKS[bitsToWriteCurBuf]) << 
(bitPos & BUF_POS_MASK);
+
+                    val >>= bitsToWriteCurBuf;
+
+                    buf[bitPos >> BUF_POS_LOG2] |= writeVal;
+
+                    bitPos += bitsToWriteCurBuf;
+
+                    bitsToWrite -= bitsToWriteCurBuf;
+                }
+                while (bitsToWrite > 0);
+
+                this.bitPos = bitPos;
+
+                size++;
+            }
+
+            /** */
+            public CompactedIntArray build() {
+                buf[0] = size;
+
+                return new CompactedIntArray(bitsPerItem, buf);
+            }
+        }
+
+        /** */
+        private class Iterator implements GridIntIterator {
+            /** Current bit position. */
+            private int bitPos = Integer.SIZE; // Skip first element.
+
+            /** Current item position. */
+            private int pos;
+
+            /** Array size. */
+            private final int size = buf[0];
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return pos < size;
+            }
+
+            /** {@inheritDoc} */
+            @Override public int next() {
+                assert pos < size;
+
+                int bitPos = this.bitPos;
+
+                int bitsFirstBuf = Math.min(bitsPerItem, Integer.SIZE - 
(bitPos & BUF_POS_MASK));
+
+                int val = ((buf[bitPos >> BUF_POS_LOG2] >> (bitPos & 
BUF_POS_MASK)) & BIT_MASKS[bitsFirstBuf]);
+
+                bitPos += bitsFirstBuf;
+
+                if (bitsFirstBuf < bitsPerItem) {
+                    int bitsSecondBuf = bitsPerItem - bitsFirstBuf;
+
+                    val |= ((buf[bitPos >> BUF_POS_LOG2] >> (bitPos & 
BUF_POS_MASK)) & BIT_MASKS[bitsSecondBuf])
+                        << bitsFirstBuf;
+
+                    bitPos += bitsSecondBuf;
+                }
+
+                this.bitPos = bitPos;
+
+                pos++;
+
+                return val;
+            }
+        }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 76318e9..b86f226 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -61,7 +61,7 @@ public abstract class AbstractMultiStepPlan implements 
MultiStepPlan {
 
     /** {@inheritDoc} */
     @Override public FragmentMapping mapping(Fragment fragment) {
-        return mapping(fragment.fragmentId());
+        return fragment.mapping();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
index a920c95..076701f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
@@ -71,7 +71,7 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
     private static final VolcanoPlanner EMPTY_PLANNER;
 
     /** */
-    private static final RexBuilder DFLT_REX_BUILDER;
+    private static final RexBuilder REX_BUILDER;
 
     /** */
     public static final RelOptCluster CLUSTER;
@@ -104,9 +104,9 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         RelDataTypeSystem typeSys = 
CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, 
FRAMEWORK_CONFIG.getTypeSystem());
         TYPE_FACTORY = new IgniteTypeFactory(typeSys);
 
-        DFLT_REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+        REX_BUILDER = new RexBuilder(TYPE_FACTORY);
 
-        CLUSTER = RelOptCluster.create(EMPTY_PLANNER, DFLT_REX_BUILDER);
+        CLUSTER = RelOptCluster.create(EMPTY_PLANNER, REX_BUILDER);
 
         // Forbid using the empty cluster in any planning or mapping 
procedures to prevent memory leaks.
         String cantBeUsedMsg = "Empty cluster can't be used for planning or 
mapping";
@@ -169,13 +169,11 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
 
         this.log = log;
 
-        RelDataTypeSystem typeSys = 
CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, 
cfg.getTypeSystem());
-
-        typeFactory = new IgniteTypeFactory(typeSys);
-
         qryCancel = unwrap(GridQueryCancel.class);
 
-        rexBuilder = new RexBuilder(typeFactory);
+        typeFactory = TYPE_FACTORY;
+
+        rexBuilder = REX_BUILDER;
     }
 
     /**
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
index ef7f8af..fc4868f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java
@@ -550,7 +550,7 @@ public class CacheTableDescriptorImpl extends 
NullInitializerExpressionFactory
                 b.add(descriptors[i].name(), 
descriptors[i].logicalType(factory));
         }
 
-        return TypeUtils.sqlType(factory, b.build());
+        return b.build();
     }
 
     /** {@inheritDoc} */
@@ -644,6 +644,9 @@ public class CacheTableDescriptorImpl extends 
NullInitializerExpressionFactory
         private final Class<?> storageType;
 
         /** */
+        private volatile RelDataType logicalType;
+
+        /** */
         private KeyValDescriptor(String name, Class<?> type, boolean isKey, 
int fieldIdx) {
             this.name = name;
             this.isKey = isKey;
@@ -684,7 +687,10 @@ public class CacheTableDescriptorImpl extends 
NullInitializerExpressionFactory
 
         /** {@inheritDoc} */
         @Override public RelDataType logicalType(IgniteTypeFactory f) {
-            return f.toSql(f.createJavaType(storageType));
+            if (logicalType == null)
+                logicalType = TypeUtils.sqlType(f, 
f.createJavaType(storageType));
+
+            return logicalType;
         }
 
         /** {@inheritDoc} */
@@ -718,6 +724,9 @@ public class CacheTableDescriptorImpl extends 
NullInitializerExpressionFactory
         private final Class<?> storageType;
 
         /** */
+        private volatile RelDataType logicalType;
+
+        /** */
         private FieldDescriptor(GridQueryProperty desc, int fieldIdx) {
             this.desc = desc;
             this.fieldIdx = fieldIdx;
@@ -758,7 +767,10 @@ public class CacheTableDescriptorImpl extends 
NullInitializerExpressionFactory
 
         /** {@inheritDoc} */
         @Override public RelDataType logicalType(IgniteTypeFactory f) {
-            return f.toSql(f.createJavaType(storageType));
+            if (logicalType == null)
+                logicalType = TypeUtils.sqlType(f, 
f.createJavaType(storageType));
+
+            return logicalType;
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
index f9d32fc..753dfce 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java
@@ -179,7 +179,7 @@ public class SystemViewTableDescriptorImpl<ViewRow> extends 
NullInitializerExpre
                 b.add(descriptors[i].name(), 
descriptors[i].logicalType(factory));
         }
 
-        return TypeUtils.sqlType(factory, b.build());
+        return b.build();
     }
 
     /** {@inheritDoc} */
@@ -220,6 +220,9 @@ public class SystemViewTableDescriptorImpl<ViewRow> extends 
NullInitializerExpre
         private final boolean isFiltrable;
 
         /** */
+        private volatile RelDataType logicalType;
+
+        /** */
         private SystemViewColumnDescriptorImpl(String name, Class<?> type, int 
fieldIdx, boolean isFiltrable) {
             originalName = name;
             sqlName = toSqlName(name);
@@ -255,7 +258,10 @@ public class SystemViewTableDescriptorImpl<ViewRow> 
extends NullInitializerExpre
 
         /** {@inheritDoc} */
         @Override public RelDataType logicalType(IgniteTypeFactory f) {
-            return f.toSql(f.createJavaType(type));
+            if (logicalType == null)
+                logicalType = TypeUtils.sqlType(f, f.createJavaType(type));
+
+            return logicalType;
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index aa0fc11..6647016 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -39,6 +39,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.QueryRegistryImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
@@ -181,6 +182,7 @@ public class AbstractExecutionTest extends 
GridCommonAbstractTest {
             exchangeSvc.taskExecutor(taskExecutor);
             exchangeSvc.messageService(msgSvc);
             exchangeSvc.mailboxRegistry(mailboxRegistry);
+            exchangeSvc.queryRegistry(new QueryRegistryImpl(log));
             exchangeSvc.init();
 
             exchangeServices.put(uuid, exchangeSvc);
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index b733b6f..0da2463 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.List;
-
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.QueryEntity;
@@ -37,6 +36,7 @@ import 
org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  *
@@ -55,9 +55,18 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
+        // Wait for pending queries before destroying caches. If some error 
occurs during query execution, client code
+        // can get control earlier than query leave the running queries 
registry (need some time for async message
+        // exchange), but eventually, all queries should be closed.
+        assertTrue("Not finished queries found on client", waitForCondition(
+            () -> 
queryProcessor(client).queryRegistry().runningQueries().isEmpty(), 1_000L));
+
         for (Ignite ign : G.allGrids()) {
             for (String cacheName : ign.cacheNames())
                 ign.destroyCache(cacheName);
+
+            assertEquals("Not finished queries found [ignite=" + ign.name() + 
']',
+                0, 
queryProcessor((IgniteEx)ign).queryRegistry().runningQueries().size());
         }
 
         awaitPartitionMapExchange();
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
index 380a1c2..9dfe622 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -19,9 +19,16 @@
 package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.IntStream;
-
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -29,8 +36,15 @@ import 
org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryEngine;
 import org.apache.ignite.internal.processors.query.QueryState;
 import org.apache.ignite.internal.processors.query.RunningQuery;
+import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl;
+import 
org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -94,62 +108,82 @@ public class RunningQueriesIntegrationTest extends 
AbstractBasicIntegrationTest
     }
 
     /**
-     * Execute query with a lot of JOINs to produce very long excution phase.
+     * Execute query with a latch on excution phase.
      * Cancel query on execution phase and check query registry is empty on 
the all nodes of the cluster.
      */
     @Test
-    public void testCancelAtExecutionPhase() throws IgniteCheckedException {
-        QueryEngine cliEngine = queryProcessor(client);
-        QueryEngine srvEngine = queryProcessor(srv);
-        int cnt = 6;
+    public void testCancelAtExecutionPhase() throws Exception {
+        CalciteQueryProcessor cliEngine = queryProcessor(client);
+        CalciteQueryProcessor srvEngine = queryProcessor(srv);
 
         sql("CREATE TABLE person (id int, val varchar)");
-
-        String data = IntStream.range(0, 1000).mapToObj((i) -> "(" + i + "," + 
i + ")").collect(joining(", "));
-        String insertSql = "INSERT INTO person (id, val) VALUES " + data;
-
-        sql(insertSql);
-
-        String bigJoin = IntStream.range(0, cnt).mapToObj((i) -> "person p" + 
i).collect(joining(", "));
-        String sql = "SELECT * FROM " + bigJoin;
-
-        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> 
sql(sql));
-
-        // The query is executing on client.
-        Assert.assertTrue(GridTestUtils.waitForCondition(
-            () -> {
-                Collection<? extends RunningQuery> queries = 
cliEngine.runningQueries();
-
-                return !queries.isEmpty() && F.first(queries).state() == 
QueryState.EXECUTING;
-            },
-            TIMEOUT_IN_MS));
-
-        // The query is executing on sever.
-        Assert.assertTrue(GridTestUtils.waitForCondition(
-            () -> {
-                Collection<? extends RunningQuery> queries = 
srvEngine.runningQueries();
-
-                return !queries.isEmpty() && F.first(queries).state() == 
QueryState.EXECUTING;
-            },
-            TIMEOUT_IN_MS));
-
-        Collection<? extends RunningQuery> running = 
cliEngine.runningQueries();
-
-        assertEquals(1, running.size());
-
-        RunningQuery qry = F.first(running);
-
-        assertSame(qry, cliEngine.runningQuery(qry.id()));
-
-        qry.cancel();
-
-        Assert.assertTrue(GridTestUtils.waitForCondition(
-            () -> cliEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
-
-        Assert.assertTrue(GridTestUtils.waitForCondition(
-            () -> srvEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
-
-        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), 
IgniteSQLException.class, "The query was cancelled while executing.");
+        sql("INSERT INTO person (id, val) VALUES (?, ?)", 0, "val0");
+
+        IgniteCacheTable oldTbl = 
(IgniteCacheTable)srvEngine.schemaHolder().schema("PUBLIC").getTable("PERSON");
+
+        CountDownLatch scanLatch = new CountDownLatch(1);
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteCacheTable newTbl = new CacheTableImpl(srv.context(), 
oldTbl.descriptor()) {
+            @Override public <Row> Iterable<Row> scan(
+                ExecutionContext<Row> execCtx,
+                ColocationGroup grp,
+                Predicate<Row> filter,
+                Function<Row, Row> rowTransformer,
+                @Nullable ImmutableBitSet usedColumns
+            ) {
+                return new Iterable<Row>() {
+                    @NotNull @Override public Iterator<Row> iterator() {
+                        scanLatch.countDown();
+
+                        return new Iterator<Row>() {
+                            @Override public boolean hasNext() {
+                                // Produce rows until stopped.
+                                return !stop.get();
+                            }
+
+                            @Override public Row next() {
+                                if (stop.get())
+                                    throw new NoSuchElementException();
+
+                                return execCtx.rowHandler().factory().create();
+                            }
+                        };
+                    }
+                };
+            }
+        };
+
+        srvEngine.schemaHolder().schema("PUBLIC").add("PERSON", newTbl);
+
+        IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> 
sql("SELECT * FROM person"));
+
+        try {
+            scanLatch.await(TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);
+
+            // Check state on server.
+            assertEquals(1, srvEngine.runningQueries().size());
+            assertEquals(QueryState.EXECUTING, 
F.first(srvEngine.runningQueries()).state());
+
+            // Check state on client.
+            assertEquals(1, cliEngine.runningQueries().size());
+            RunningQuery qry = F.first(cliEngine.runningQueries());
+            assertEquals(QueryState.EXECUTING, qry.state());
+
+            qry.cancel();
+
+            Assert.assertTrue(GridTestUtils.waitForCondition(
+                () -> srvEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+
+            Assert.assertTrue(GridTestUtils.waitForCondition(
+                () -> cliEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+        }
+        finally {
+            stop.set(true);
+        }
+
+        GridTestUtils.assertThrowsAnyCause(log,
+            () -> fut.get(100), IgniteSQLException.class, "The query was 
cancelled while executing.");
     }
 
     /**
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 7710ba1..3f79059 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -37,6 +37,7 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.QueryRegistryImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
@@ -328,6 +329,7 @@ public class PlannerTest extends AbstractPlannerTest {
         exchangeSvc.taskExecutor(taskExecutor);
         exchangeSvc.messageService(msgSvc);
         exchangeSvc.mailboxRegistry(mailboxRegistry);
+        exchangeSvc.queryRegistry(new QueryRegistryImpl(log));
         exchangeSvc.init();
 
         ectx = new ExecutionContext<>(
@@ -385,6 +387,7 @@ public class PlannerTest extends AbstractPlannerTest {
         exchangeSvc.taskExecutor(taskExecutor);
         exchangeSvc.messageService(msgSvc);
         exchangeSvc.mailboxRegistry(mailboxRegistry);
+        exchangeSvc.queryRegistry(new QueryRegistryImpl(log));
         exchangeSvc.init();
 
         ectx = new ExecutionContext<>(
@@ -553,6 +556,7 @@ public class PlannerTest extends AbstractPlannerTest {
         exchangeSvc.taskExecutor(taskExecutor);
         exchangeSvc.messageService(msgSvc);
         exchangeSvc.mailboxRegistry(mailboxRegistry);
+        exchangeSvc.queryRegistry(new QueryRegistryImpl(log));
         exchangeSvc.init();
 
         ectx = new ExecutionContext<>(
@@ -609,6 +613,7 @@ public class PlannerTest extends AbstractPlannerTest {
         exchangeSvc.taskExecutor(taskExecutor);
         exchangeSvc.messageService(msgSvc);
         exchangeSvc.mailboxRegistry(mailboxRegistry);
+        exchangeSvc.queryRegistry(new QueryRegistryImpl(log));
         exchangeSvc.init();
 
         ectx = new ExecutionContext<>(
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index de8d507..2e74824 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2896,8 +2896,11 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             throw new CacheException("Execution of local SqlFieldsQuery on 
client node disallowed.");
 
         if (experimentalQueryEngine != null && useExperimentalSqlEngine) {
-            return experimentalQueryEngine.query(QueryContext.of(qry, cliCtx), 
qry.getSchema(), qry.getSql(),
-                X.EMPTY_OBJECT_ARRAY);
+            return experimentalQueryEngine.query(
+                QueryContext.of(qry, cliCtx),
+                qry.getSchema() == null ? schemaName(cctx) : qry.getSchema(),
+                qry.getSql(),
+                qry.getArgs() == null ? X.EMPTY_OBJECT_ARRAY : qry.getArgs());
         }
 
         return executeQuerySafe(cctx, () -> {

Reply via email to