This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6455bc3a7f9 IGNITE-19998 SQL Calcite: Add support of setting 
partitions in SqlFieldsQuery (#10870)
6455bc3a7f9 is described below

commit 6455bc3a7f9d992eea4c85453561db7f307f8978
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Sat Aug 5 01:59:40 2023 +0300

    IGNITE-19998 SQL Calcite: Add support of setting partitions in 
SqlFieldsQuery (#10870)
---
 .../query/calcite/CalciteQueryProcessor.java       |   1 +
 .../processors/query/calcite/RootQuery.java        |   4 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   |  16 +-
 .../query/calcite/metadata/ColocationGroup.java    |  59 +++--
 .../query/calcite/metadata/FragmentMapping.java    |  16 +-
 .../query/calcite/prepare/BaseQueryContext.java    |  33 ++-
 .../processors/query/calcite/prepare/Fragment.java |   7 +-
 .../QueryWithPartitionsIntegrationTest.java        | 280 +++++++++++++++++++++
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 9 files changed, 391 insertions(+), 27 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 1ffdb85a7d1..3a675e8882c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -564,6 +564,7 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
             params,
             qryCtx,
             fldsQry != null && fldsQry.isLocal(),
+            fldsQry != null ? fldsQry.getPartitions() : null,
             exchangeSvc,
             (q, ex) -> qryReg.unregister(q.id(), ex),
             log,
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index 7bf9df35469..cd6290e9495 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -107,6 +107,7 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
         Object[] params,
         QueryContext qryCtx,
         boolean isLocal,
+        int[] parts,
         ExchangeService exch,
         BiConsumer<Query<RowT>, Throwable> unregister,
         IgniteLogger log,
@@ -144,6 +145,7 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
                     .build()
             )
             .local(isLocal)
+            .partitions(parts)
             .logger(log)
             .build();
     }
@@ -157,7 +159,7 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
      * @param schema new schema.
      */
     public RootQuery<RowT> childQuery(SchemaPlus schema) {
-        return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), 
ctx.isLocal(), exch, unregister, log,
+        return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), 
ctx.isLocal(), ctx.partitions(), exch, unregister, log,
             plannerTimeout, totalTimeout);
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 0b5e4c48935..5472042b80a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -75,8 +75,10 @@ import 
org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
@@ -564,6 +566,17 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
 
         List<Fragment> fragments = plan.fragments();
 
+        if (!F.isEmpty(qry.context().partitions())) {
+            fragments = Commons.transform(fragments, f -> {
+                try {
+                    return f.filterByPartitions(qry.context().partitions());
+                }
+                catch (ColocationMappingException e) {
+                    throw new FragmentMappingException("Failed to calculate 
physical distribution", f, f.root(), e);
+                }
+            });
+        }
+
         // Local execution
         Fragment fragment = F.first(fragments);
 
@@ -576,7 +589,8 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
 
             List<UUID> nodes = mapping.nodeIds();
 
-            assert nodes != null && nodes.size() == 1 && 
F.first(nodes).equals(localNodeId());
+            assert nodes != null && nodes.size() == 1 && 
F.first(nodes).equals(localNodeId())
+                    : "nodes=" + nodes + ", localNode=" + localNodeId();
         }
 
         long timeout = qry.remainingTime();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index ffe91b17cb3..962f6a793f3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
 
 /** */
 public class ColocationGroup implements MarshalableMessage {
@@ -114,7 +113,13 @@ public class ColocationGroup implements MarshalableMessage 
{
      * {@link GridDhtPartitionState#OWNING} state, calculated for distributed 
tables, involved in query execution.
      */
     public List<List<UUID>> assignments() {
-        return assignments == null ? Collections.emptyList() : assignments;
+        if (assignments != null)
+            return assignments;
+
+        if (!F.isEmpty(nodeIds))
+            return 
nodeIds.stream().map(Collections::singletonList).collect(Collectors.toList());
+
+        return Collections.emptyList();
     }
 
     /**
@@ -208,39 +213,50 @@ public class ColocationGroup implements 
MarshalableMessage {
     }
 
     /** */
-    public ColocationGroup finalaze() {
-        if (assignments == null && nodeIds == null)
+    public ColocationGroup finalizeMapping() {
+        if (assignments == null)
             return this;
 
-        if (assignments != null) {
+        List<List<UUID>> assignments = new 
ArrayList<>(this.assignments.size());
+        Set<UUID> nodes = new HashSet<>();
+
+        for (List<UUID> assignment : this.assignments) {
+            UUID first = F.first(assignment);
+            if (first != null)
+                nodes.add(first);
+            assignments.add(first != null ? Collections.singletonList(first) : 
Collections.emptyList());
+        }
+
+        return new ColocationGroup(sourceIds, new ArrayList<>(nodes), 
assignments);
+    }
+
+    /** */
+    public ColocationGroup filterByPartitions(int[] parts) {
+        if (!F.isEmpty(assignments)) {
             List<List<UUID>> assignments = new 
ArrayList<>(this.assignments.size());
             Set<UUID> nodes = new HashSet<>();
-            for (List<UUID> assignment : this.assignments) {
-                UUID first = F.first(assignment);
+
+            if (F.isEmpty(parts))
+                return this;
+
+            for (int i = 0; i < this.assignments.size(); ++i) {
+                UUID first = Arrays.binarySearch(parts, i) >= 0 ? 
F.first(this.assignments.get(i)) : null;
+
                 if (first != null)
                     nodes.add(first);
-                assignments.add(first != null ? 
Collections.singletonList(first) : Collections.emptyList());
+
+                assignments.add(first != null ? this.assignments.get(i) : 
Collections.emptyList());
             }
 
             return new ColocationGroup(sourceIds, new ArrayList<>(nodes), 
assignments);
         }
 
-        return forNodes0(nodeIds);
+        return this;
     }
 
     /** */
     public ColocationGroup mapToNodes(List<UUID> nodeIds) {
-        return !F.isEmpty(this.nodeIds) ? this : forNodes0(nodeIds);
-    }
-
-    /** */
-    @NotNull private ColocationGroup forNodes0(List<UUID> nodeIds) {
-        List<List<UUID>> assignments = new ArrayList<>(nodeIds.size());
-
-        for (UUID nodeId : nodeIds)
-            assignments.add(Collections.singletonList(nodeId));
-
-        return new ColocationGroup(sourceIds, nodeIds, assignments);
+        return !F.isEmpty(this.nodeIds) ? this : new 
ColocationGroup(sourceIds, nodeIds, null);
     }
 
     /**
@@ -250,6 +266,9 @@ public class ColocationGroup implements MarshalableMessage {
      * @return List of partitions to scan on the given node.
      */
     public int[] partitions(UUID nodeId) {
+        if (F.isEmpty(assignments))
+            return null;
+
         GridIntList parts = new GridIntList(assignments.size());
 
         for (int i = 0; i < assignments.size(); i++) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
index b82896bc93b..7d023558c64 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
@@ -132,19 +132,31 @@ public class FragmentMapping implements 
MarshalableMessage {
     }
 
     /** */
-    public FragmentMapping finalize(Supplier<List<UUID>> nodesSource) {
+    public FragmentMapping finalizeMapping(Supplier<List<UUID>> nodesSource) {
         if (colocationGroups.isEmpty())
             return this;
 
         List<ColocationGroup> colocationGroups = this.colocationGroups;
 
-        colocationGroups = Commons.transform(colocationGroups, 
ColocationGroup::finalaze);
+        colocationGroups = Commons.transform(colocationGroups, 
ColocationGroup::finalizeMapping);
         List<UUID> nodes = nodeIds(), nodes0 = nodes.isEmpty() ? 
nodesSource.get() : nodes;
         colocationGroups = Commons.transform(colocationGroups, g -> 
g.mapToNodes(nodes0));
 
         return new FragmentMapping(colocationGroups);
     }
 
+    /** */
+    public FragmentMapping filterByPartitions(int[] parts) throws 
ColocationMappingException {
+        List<ColocationGroup> colocationGroups = this.colocationGroups;
+
+        if (!F.isEmpty(parts) && colocationGroups.size() > 1)
+            throw new ColocationMappingException("Execution of non-collocated 
query with partition parameter is not possible");
+
+        colocationGroups = Commons.transform(colocationGroups, g -> 
g.filterByPartitions(parts));
+
+        return new FragmentMapping(colocationGroups);
+    }
+
     /** */
     public @NotNull ColocationGroup findGroup(long sourceId) {
         List<ColocationGroup> groups = colocationGroups.stream()
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
index c58926e70e6..e5f81d91d08 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.lang.reflect.Method;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import com.google.common.collect.Multimap;
@@ -159,6 +160,9 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
     /** */
     private final boolean isLocal;
 
+    /** */
+    private final int[] parts;
+
     /**
      * Private constructor, used by a builder.
      */
@@ -166,7 +170,8 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         FrameworkConfig cfg,
         Context parentCtx,
         IgniteLogger log,
-        boolean isLocal
+        boolean isLocal,
+        int[] parts
     ) {
         super(Contexts.chain(parentCtx, cfg.getContext()));
 
@@ -177,6 +182,8 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
 
         this.isLocal = isLocal;
 
+        this.parts = parts;
+
         qryCancel = unwrap(GridQueryCancel.class);
 
         typeFactory = TYPE_FACTORY;
@@ -276,6 +283,14 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         return isLocal;
     }
 
+    /** */
+    public int[] partitions() {
+        if (parts != null)
+            return Arrays.copyOf(parts, parts.length);
+
+        return null;
+    }
+
     /**
      * Query context builder.
      */
@@ -299,6 +314,9 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         /** */
         private boolean isLocal = false;
 
+        /** */
+        private int[] parts = null;
+
         /**
          * @param frameworkCfg Framework config.
          * @return Builder for chaining.
@@ -335,13 +353,24 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
             return this;
         }
 
+        /**
+         * @param parts Array of partitions' numbers.
+         * @return Builder for chaining.
+         */
+        public Builder partitions(int[] parts) {
+            if (parts != null)
+                this.parts = Arrays.copyOf(parts, parts.length);
+
+            return this;
+        }
+
         /**
          * Builds planner context.
          *
          * @return Planner context.
          */
         public BaseQueryContext build() {
-            return new BaseQueryContext(frameworkCfg, parentCtx, log, isLocal);
+            return new BaseQueryContext(frameworkCfg, parentCtx, log, isLocal, 
parts);
         }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 6aeb944e0a1..153a2a96472 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -125,6 +125,11 @@ public class Fragment {
         return root.getCluster() == cluster ? this : new 
Cloner(cluster).go(this);
     }
 
+    /** */
+    public Fragment filterByPartitions(int[] parts) throws 
ColocationMappingException {
+        return new Fragment(id, root, remotes, rootSer, 
mapping.filterByPartitions(parts));
+    }
+
     /**
      * Mapps the fragment to its data location.
      * @param ctx Planner context.
@@ -157,7 +162,7 @@ public class Fragment {
                     
.get(ThreadLocalRandom.current().nextInt(mapping.nodeIds().size()))).colocate(mapping);
             }
 
-            return mapping.finalize(nodesSource);
+            return mapping.finalizeMapping(nodesSource);
         }
         catch (NodeMappingException e) {
             throw new FragmentMappingException("Failed to calculate physical 
distribution", this, e.node(), e);
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
new file mode 100644
index 00000000000..16655187dc8
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** */
+@RunWith(Parameterized.class)
+public class QueryWithPartitionsIntegrationTest extends 
AbstractBasicIntegrationTest {
+    /** */
+    private static final int ENTRIES_COUNT = 10000;
+
+    /** */
+    private volatile int[] parts;
+
+    /** */
+    @Parameterized.Parameter()
+    public boolean local;
+
+    /** */
+    @Parameterized.Parameters(name = "local = {0}")
+    public static List<Object[]> parameters() {
+        return ImmutableList.of(
+                new Object[]{true},
+                new Object[]{false}
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeTest() throws Exception {
+        super.beforeTest();
+
+        List<Integer> parts0 = IntStream.range(0, 
1024).boxed().collect(Collectors.toList());
+        Collections.shuffle(parts0);
+        parts = Ints.toArray(parts0.subList(0, 20));
+
+        log.info("Running tests with parts=" + Arrays.toString(parts));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+        cfg.getSqlConfiguration().setQueryEnginesConfiguration(new 
CalciteQueryEngineConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected QueryChecker assertQuery(String qry) {
+        return assertQuery(grid(0), qry);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected QueryContext queryContext() {
+        return QueryContext.of(new 
SqlFieldsQuery("").setLocal(local).setTimeout(10, TimeUnit.SECONDS)
+                .setPartitions(parts));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected List<List<?>> sql(String sql, Object... params) {
+        return sql(local ? grid(0) : client, sql, params);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        sql("CREATE TABLE T1(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL VARCHAR) 
WITH cache_name=t1_cache,backups=1");
+        sql("CREATE TABLE T2(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL VARCHAR) 
WITH cache_name=t2_cache,backups=1");
+        sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL 
VARCHAR) WITH template=replicated,cache_name=dict_cache");
+
+        sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)");
+        sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)");
+        sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)");
+
+        Stream.of("T1", "T2", "DICT").forEach(tableName -> {
+            StringBuilder sb = new StringBuilder("INSERT INTO 
").append(tableName)
+                    .append("(ID, IDX_VAL, VAL) VALUES ");
+
+            for (int i = 0; i < ENTRIES_COUNT; ++i) {
+                sb.append("(").append(i).append(", ")
+                        .append("'name_").append(i).append("', ")
+                        .append("'name_").append(i).append("')");
+
+                if (i < ENTRIES_COUNT - 1)
+                    sb.append(",");
+            }
+
+            sql(sb.toString());
+
+            assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName + 
"_CACHE").size(CachePeekMode.PRIMARY));
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        // Skip super method to keep caches after each test.
+    }
+
+    /** */
+    @Test
+    public void testSingle() {
+        Stream.of(Pair.of("SELECT * FROM T1", null),
+                  Pair.of("SELECT * FROM T1 WHERE ID < ?", ENTRIES_COUNT))
+            .forEach(query -> {
+                long cnt = sql(query.left, query.right).size();
+
+                assertEquals(cacheSize("T1_CACHE", parts), cnt);
+            });
+
+        Stream.of(Pair.of("SELECT count(*) FROM T1", null),
+                  Pair.of("SELECT count(*) FROM T1 WHERE ID < ?", 
ENTRIES_COUNT))
+            .forEach(query -> {
+                Long cnt = (Long)sql(query.left, query.right).get(0).get(0);
+
+                assertEquals(cacheSize("T1_CACHE", parts), cnt.longValue());
+            });
+    }
+
+    /** */
+    @Test
+    public void testReplicated() {
+        Stream.of(Pair.of("select * from DICT", null),
+                  Pair.of("select * from DICT where id < ?", ENTRIES_COUNT))
+            .forEach(query -> {
+                List<List<?>> res = sql(query.left, query.right);
+
+                assertEquals(res.size(), cacheSize("DICT_CACHE"));
+            });
+
+        Stream.of(Pair.of("select count(*) from DICT", null),
+                  Pair.of("select count(*) from DICT where id < ?", 
ENTRIES_COUNT))
+            .forEach(query -> {
+                Long size = (Long)sql(query.left, query.right).get(0).get(0);
+
+                assertEquals(cacheSize("DICT_CACHE"), size.longValue());
+            });
+    }
+
+    /** */
+    @Test
+    public void testJoin() {
+        Stream.of("ID", "IDX_VAL", "VAL").forEach(col -> testJoin("T1", "T2", 
col));
+    }
+
+    /** */
+    @Test
+    public void testJoinReplicated() {
+        Stream.of("ID", "IDX_VAL", "VAL").forEach(col -> testJoin("T1", 
"DICT", col));
+    }
+
+    /** */
+    private void testJoin(String table1, String table2, String joinCol) {
+        String sqlStr = "select * from " + table1 + " join " + table2 +
+                " on " + table1 + "." + joinCol + "=" + table2 + "." + joinCol;
+
+        List<?> res = sql(sqlStr);
+
+        assertEquals(res.size(), cacheSize(table1 + "_CACHE", parts));
+    }
+
+    /** */
+    @Test
+    public void testInsertFromSelect() {
+        Stream.of(Pair.of("SELECT ID, IDX_VAL, VAL FROM T1 WHERE ID < ?", 
ENTRIES_COUNT),
+                  Pair.of("SELECT ID, IDX_VAL, VAL FROM T1", null))
+            .forEach(query -> {
+                try {
+                    sql("CREATE TABLE T3(ID INT PRIMARY KEY, IDX_VAL VARCHAR, 
VAL VARCHAR) WITH cache_name=t3_cache,backups=1");
+
+                    sql("INSERT INTO T3(ID, IDX_VAL, VAL) " + query.left, 
query.right);
+
+                    assertEquals(cacheSize("T1_CACHE", parts), 
cacheSize("T3_CACHE"));
+                }
+                finally {
+                    client.cache("T3_CACHE").destroy();
+                }
+            });
+    }
+
+    /** */
+    @Test
+    public void testDelete() {
+        Stream.of(Pair.of("DELETE FROM T3 WHERE ID < ?", ENTRIES_COUNT), 
Pair.of("DELETE FROM T3", null))
+            .forEach(query -> {
+                try {
+                    sql("CREATE TABLE T3(ID INT PRIMARY KEY, IDX_VAL VARCHAR, 
VAL VARCHAR) WITH cache_name=t3_cache,backups=1");
+
+                    sql("INSERT INTO T3(ID, IDX_VAL, VAL) SELECT ID, IDX_VAL, 
VAL FROM DICT");
+
+                    assertEquals(ENTRIES_COUNT, cacheFullSize("T3_CACHE"));
+
+                    long partsCnt = cacheSize("T3_CACHE", parts);
+
+                    sql(query.left, query.right);
+
+                    assertEquals(ENTRIES_COUNT - partsCnt, 
cacheFullSize("T3_CACHE"));
+                }
+                finally {
+                    client.cache("T3_CACHE").destroy();
+                }
+
+            });
+    }
+
+    /** */
+    @Test
+    public void testCreateTableAsSelect() {
+        Stream.of(Pair.of("SELECT ID, IDX_VAL, VAL FROM T1 WHERE ID < ?", 
ENTRIES_COUNT),
+                  Pair.of("SELECT ID, IDX_VAL, VAL FROM T1", null))
+            .forEach(query -> {
+                try {
+                    sql("CREATE TABLE T3(ID, IDX_VAL, VAL) WITH 
cache_name=t3_cache,backups=1 AS " + query.left, query.right);
+
+                    assertEquals(cacheSize("T1_CACHE", parts), 
cacheFullSize("T3_CACHE"));
+                }
+                finally {
+                    client.cache("T3_CACHE").destroy();
+                }
+            });
+    }
+
+    /** */
+    private long cacheFullSize(String cacheName) {
+        return client.cache(cacheName).sizeLong(CachePeekMode.PRIMARY);
+    }
+
+    /** */
+    private long cacheSize(String cacheName, int... parts) {
+        IgniteCache<?, ?> cache = grid(0).cache(cacheName);
+
+        GridCacheContext<?, ?> ctx = grid(0).cachex(cacheName).context();
+
+        if (F.isEmpty(parts))
+            return local && ctx.isPartitioned() ?
+                    cache.localSizeLong(CachePeekMode.PRIMARY) : 
cache.sizeLong(CachePeekMode.PRIMARY);
+
+        return IntStream.of(parts).mapToLong(p -> {
+            if (local && ctx.isPartitioned())
+                return cache.localSizeLong(p, CachePeekMode.PRIMARY);
+            else
+                return cache.sizeLong(p, CachePeekMode.PRIMARY);
+        }).sum();
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index bbe6c64f1ea..25178aaef39 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.integration.MemoryQuo
 import 
org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.QueryEngineConfigurationIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.QueryMetadataIntegrationTest;
+import 
org.apache.ignite.internal.processors.query.calcite.integration.QueryWithPartitionsIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.RunningQueriesIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.SearchSargOnIndexIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.ServerStatisticsIntegrationTest;
@@ -117,6 +118,7 @@ import org.junit.runners.Suite;
     SearchSargOnIndexIntegrationTest.class,
     KeepBinaryIntegrationTest.class,
     LocalQueryIntegrationTest.class,
+    QueryWithPartitionsIntegrationTest.class,
     QueryMetadataIntegrationTest.class,
     MemoryQuotasIntegrationTest.class,
     LocalDateTimeSupportTest.class,

Reply via email to