This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6455bc3a7f9 IGNITE-19998 SQL Calcite: Add support of setting
partitions in SqlFieldsQuery (#10870)
6455bc3a7f9 is described below
commit 6455bc3a7f9d992eea4c85453561db7f307f8978
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Sat Aug 5 01:59:40 2023 +0300
IGNITE-19998 SQL Calcite: Add support of setting partitions in
SqlFieldsQuery (#10870)
---
.../query/calcite/CalciteQueryProcessor.java | 1 +
.../processors/query/calcite/RootQuery.java | 4 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 16 +-
.../query/calcite/metadata/ColocationGroup.java | 59 +++--
.../query/calcite/metadata/FragmentMapping.java | 16 +-
.../query/calcite/prepare/BaseQueryContext.java | 33 ++-
.../processors/query/calcite/prepare/Fragment.java | 7 +-
.../QueryWithPartitionsIntegrationTest.java | 280 +++++++++++++++++++++
.../ignite/testsuites/IntegrationTestSuite.java | 2 +
9 files changed, 391 insertions(+), 27 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 1ffdb85a7d1..3a675e8882c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -564,6 +564,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
params,
qryCtx,
fldsQry != null && fldsQry.isLocal(),
+ fldsQry != null ? fldsQry.getPartitions() : null,
exchangeSvc,
(q, ex) -> qryReg.unregister(q.id(), ex),
log,
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index 7bf9df35469..cd6290e9495 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -107,6 +107,7 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
Object[] params,
QueryContext qryCtx,
boolean isLocal,
+ int[] parts,
ExchangeService exch,
BiConsumer<Query<RowT>, Throwable> unregister,
IgniteLogger log,
@@ -144,6 +145,7 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
.build()
)
.local(isLocal)
+ .partitions(parts)
.logger(log)
.build();
}
@@ -157,7 +159,7 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
* @param schema new schema.
*/
public RootQuery<RowT> childQuery(SchemaPlus schema) {
- return new RootQuery<>(sql, schema, params, QueryContext.of(cancel),
ctx.isLocal(), exch, unregister, log,
+ return new RootQuery<>(sql, schema, params, QueryContext.of(cancel),
ctx.isLocal(), ctx.partitions(), exch, unregister, log,
plannerTimeout, totalTimeout);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 0b5e4c48935..5472042b80a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -75,8 +75,10 @@ import
org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
import
org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
import
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException;
import
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
@@ -564,6 +566,17 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
List<Fragment> fragments = plan.fragments();
+ if (!F.isEmpty(qry.context().partitions())) {
+ fragments = Commons.transform(fragments, f -> {
+ try {
+ return f.filterByPartitions(qry.context().partitions());
+ }
+ catch (ColocationMappingException e) {
+ throw new FragmentMappingException("Failed to calculate
physical distribution", f, f.root(), e);
+ }
+ });
+ }
+
// Local execution
Fragment fragment = F.first(fragments);
@@ -576,7 +589,8 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
List<UUID> nodes = mapping.nodeIds();
- assert nodes != null && nodes.size() == 1 &&
F.first(nodes).equals(localNodeId());
+ assert nodes != null && nodes.size() == 1 &&
F.first(nodes).equals(localNodeId())
+ : "nodes=" + nodes + ", localNode=" + localNodeId();
}
long timeout = qry.remainingTime();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index ffe91b17cb3..962f6a793f3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
/** */
public class ColocationGroup implements MarshalableMessage {
@@ -114,7 +113,13 @@ public class ColocationGroup implements MarshalableMessage
{
* {@link GridDhtPartitionState#OWNING} state, calculated for distributed
tables, involved in query execution.
*/
public List<List<UUID>> assignments() {
- return assignments == null ? Collections.emptyList() : assignments;
+ if (assignments != null)
+ return assignments;
+
+ if (!F.isEmpty(nodeIds))
+ return
nodeIds.stream().map(Collections::singletonList).collect(Collectors.toList());
+
+ return Collections.emptyList();
}
/**
@@ -208,39 +213,50 @@ public class ColocationGroup implements
MarshalableMessage {
}
/** */
- public ColocationGroup finalaze() {
- if (assignments == null && nodeIds == null)
+ public ColocationGroup finalizeMapping() {
+ if (assignments == null)
return this;
- if (assignments != null) {
+ List<List<UUID>> assignments = new
ArrayList<>(this.assignments.size());
+ Set<UUID> nodes = new HashSet<>();
+
+ for (List<UUID> assignment : this.assignments) {
+ UUID first = F.first(assignment);
+ if (first != null)
+ nodes.add(first);
+ assignments.add(first != null ? Collections.singletonList(first) :
Collections.emptyList());
+ }
+
+ return new ColocationGroup(sourceIds, new ArrayList<>(nodes),
assignments);
+ }
+
+ /** */
+ public ColocationGroup filterByPartitions(int[] parts) {
+ if (!F.isEmpty(assignments)) {
List<List<UUID>> assignments = new
ArrayList<>(this.assignments.size());
Set<UUID> nodes = new HashSet<>();
- for (List<UUID> assignment : this.assignments) {
- UUID first = F.first(assignment);
+
+ if (F.isEmpty(parts))
+ return this;
+
+ for (int i = 0; i < this.assignments.size(); ++i) {
+ UUID first = Arrays.binarySearch(parts, i) >= 0 ?
F.first(this.assignments.get(i)) : null;
+
if (first != null)
nodes.add(first);
- assignments.add(first != null ?
Collections.singletonList(first) : Collections.emptyList());
+
+ assignments.add(first != null ? this.assignments.get(i) :
Collections.emptyList());
}
return new ColocationGroup(sourceIds, new ArrayList<>(nodes),
assignments);
}
- return forNodes0(nodeIds);
+ return this;
}
/** */
public ColocationGroup mapToNodes(List<UUID> nodeIds) {
- return !F.isEmpty(this.nodeIds) ? this : forNodes0(nodeIds);
- }
-
- /** */
- @NotNull private ColocationGroup forNodes0(List<UUID> nodeIds) {
- List<List<UUID>> assignments = new ArrayList<>(nodeIds.size());
-
- for (UUID nodeId : nodeIds)
- assignments.add(Collections.singletonList(nodeId));
-
- return new ColocationGroup(sourceIds, nodeIds, assignments);
+ return !F.isEmpty(this.nodeIds) ? this : new
ColocationGroup(sourceIds, nodeIds, null);
}
/**
@@ -250,6 +266,9 @@ public class ColocationGroup implements MarshalableMessage {
* @return List of partitions to scan on the given node.
*/
public int[] partitions(UUID nodeId) {
+ if (F.isEmpty(assignments))
+ return null;
+
GridIntList parts = new GridIntList(assignments.size());
for (int i = 0; i < assignments.size(); i++) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
index b82896bc93b..7d023558c64 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
@@ -132,19 +132,31 @@ public class FragmentMapping implements
MarshalableMessage {
}
/** */
- public FragmentMapping finalize(Supplier<List<UUID>> nodesSource) {
+ public FragmentMapping finalizeMapping(Supplier<List<UUID>> nodesSource) {
if (colocationGroups.isEmpty())
return this;
List<ColocationGroup> colocationGroups = this.colocationGroups;
- colocationGroups = Commons.transform(colocationGroups,
ColocationGroup::finalaze);
+ colocationGroups = Commons.transform(colocationGroups,
ColocationGroup::finalizeMapping);
List<UUID> nodes = nodeIds(), nodes0 = nodes.isEmpty() ?
nodesSource.get() : nodes;
colocationGroups = Commons.transform(colocationGroups, g ->
g.mapToNodes(nodes0));
return new FragmentMapping(colocationGroups);
}
+ /** */
+ public FragmentMapping filterByPartitions(int[] parts) throws
ColocationMappingException {
+ List<ColocationGroup> colocationGroups = this.colocationGroups;
+
+ if (!F.isEmpty(parts) && colocationGroups.size() > 1)
+ throw new ColocationMappingException("Execution of non-collocated
query with partition parameter is not possible");
+
+ colocationGroups = Commons.transform(colocationGroups, g ->
g.filterByPartitions(parts));
+
+ return new FragmentMapping(colocationGroups);
+ }
+
/** */
public @NotNull ColocationGroup findGroup(long sourceId) {
List<ColocationGroup> groups = colocationGroups.stream()
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
index c58926e70e6..e5f81d91d08 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.lang.reflect.Method;
+import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import com.google.common.collect.Multimap;
@@ -159,6 +160,9 @@ public final class BaseQueryContext extends
AbstractQueryContext {
/** */
private final boolean isLocal;
+ /** */
+ private final int[] parts;
+
/**
* Private constructor, used by a builder.
*/
@@ -166,7 +170,8 @@ public final class BaseQueryContext extends
AbstractQueryContext {
FrameworkConfig cfg,
Context parentCtx,
IgniteLogger log,
- boolean isLocal
+ boolean isLocal,
+ int[] parts
) {
super(Contexts.chain(parentCtx, cfg.getContext()));
@@ -177,6 +182,8 @@ public final class BaseQueryContext extends
AbstractQueryContext {
this.isLocal = isLocal;
+ this.parts = parts;
+
qryCancel = unwrap(GridQueryCancel.class);
typeFactory = TYPE_FACTORY;
@@ -276,6 +283,14 @@ public final class BaseQueryContext extends
AbstractQueryContext {
return isLocal;
}
+ /** */
+ public int[] partitions() {
+ if (parts != null)
+ return Arrays.copyOf(parts, parts.length);
+
+ return null;
+ }
+
/**
* Query context builder.
*/
@@ -299,6 +314,9 @@ public final class BaseQueryContext extends
AbstractQueryContext {
/** */
private boolean isLocal = false;
+ /** */
+ private int[] parts = null;
+
/**
* @param frameworkCfg Framework config.
* @return Builder for chaining.
@@ -335,13 +353,24 @@ public final class BaseQueryContext extends
AbstractQueryContext {
return this;
}
+ /**
+ * @param parts Array of partitions' numbers.
+ * @return Builder for chaining.
+ */
+ public Builder partitions(int[] parts) {
+ if (parts != null)
+ this.parts = Arrays.copyOf(parts, parts.length);
+
+ return this;
+ }
+
/**
* Builds planner context.
*
* @return Planner context.
*/
public BaseQueryContext build() {
- return new BaseQueryContext(frameworkCfg, parentCtx, log, isLocal);
+ return new BaseQueryContext(frameworkCfg, parentCtx, log, isLocal,
parts);
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 6aeb944e0a1..153a2a96472 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -125,6 +125,11 @@ public class Fragment {
return root.getCluster() == cluster ? this : new
Cloner(cluster).go(this);
}
+ /** */
+ public Fragment filterByPartitions(int[] parts) throws
ColocationMappingException {
+ return new Fragment(id, root, remotes, rootSer,
mapping.filterByPartitions(parts));
+ }
+
/**
* Mapps the fragment to its data location.
* @param ctx Planner context.
@@ -157,7 +162,7 @@ public class Fragment {
.get(ThreadLocalRandom.current().nextInt(mapping.nodeIds().size()))).colocate(mapping);
}
- return mapping.finalize(nodesSource);
+ return mapping.finalizeMapping(nodesSource);
}
catch (NodeMappingException e) {
throw new FragmentMappingException("Failed to calculate physical
distribution", this, e.node(), e);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
new file mode 100644
index 00000000000..16655187dc8
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** */
+@RunWith(Parameterized.class)
+public class QueryWithPartitionsIntegrationTest extends
AbstractBasicIntegrationTest {
+ /** */
+ private static final int ENTRIES_COUNT = 10000;
+
+ /** */
+ private volatile int[] parts;
+
+ /** */
+ @Parameterized.Parameter()
+ public boolean local;
+
+ /** */
+ @Parameterized.Parameters(name = "local = {0}")
+ public static List<Object[]> parameters() {
+ return ImmutableList.of(
+ new Object[]{true},
+ new Object[]{false}
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeTest() throws Exception {
+ super.beforeTest();
+
+ List<Integer> parts0 = IntStream.range(0,
1024).boxed().collect(Collectors.toList());
+ Collections.shuffle(parts0);
+ parts = Ints.toArray(parts0.subList(0, 20));
+
+ log.info("Running tests with parts=" + Arrays.toString(parts));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ cfg.getSqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected QueryChecker assertQuery(String qry) {
+ return assertQuery(grid(0), qry);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected QueryContext queryContext() {
+ return QueryContext.of(new
SqlFieldsQuery("").setLocal(local).setTimeout(10, TimeUnit.SECONDS)
+ .setPartitions(parts));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<List<?>> sql(String sql, Object... params) {
+ return sql(local ? grid(0) : client, sql, params);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ sql("CREATE TABLE T1(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL VARCHAR)
WITH cache_name=t1_cache,backups=1");
+ sql("CREATE TABLE T2(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL VARCHAR)
WITH cache_name=t2_cache,backups=1");
+ sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL
VARCHAR) WITH template=replicated,cache_name=dict_cache");
+
+ sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)");
+ sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)");
+ sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)");
+
+ Stream.of("T1", "T2", "DICT").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < ENTRIES_COUNT; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ // Skip super method to keep caches after each test.
+ }
+
+ /** */
+ @Test
+ public void testSingle() {
+ Stream.of(Pair.of("SELECT * FROM T1", null),
+ Pair.of("SELECT * FROM T1 WHERE ID < ?", ENTRIES_COUNT))
+ .forEach(query -> {
+ long cnt = sql(query.left, query.right).size();
+
+ assertEquals(cacheSize("T1_CACHE", parts), cnt);
+ });
+
+ Stream.of(Pair.of("SELECT count(*) FROM T1", null),
+ Pair.of("SELECT count(*) FROM T1 WHERE ID < ?",
ENTRIES_COUNT))
+ .forEach(query -> {
+ Long cnt = (Long)sql(query.left, query.right).get(0).get(0);
+
+ assertEquals(cacheSize("T1_CACHE", parts), cnt.longValue());
+ });
+ }
+
+ /** */
+ @Test
+ public void testReplicated() {
+ Stream.of(Pair.of("select * from DICT", null),
+ Pair.of("select * from DICT where id < ?", ENTRIES_COUNT))
+ .forEach(query -> {
+ List<List<?>> res = sql(query.left, query.right);
+
+ assertEquals(res.size(), cacheSize("DICT_CACHE"));
+ });
+
+ Stream.of(Pair.of("select count(*) from DICT", null),
+ Pair.of("select count(*) from DICT where id < ?",
ENTRIES_COUNT))
+ .forEach(query -> {
+ Long size = (Long)sql(query.left, query.right).get(0).get(0);
+
+ assertEquals(cacheSize("DICT_CACHE"), size.longValue());
+ });
+ }
+
+ /** */
+ @Test
+ public void testJoin() {
+ Stream.of("ID", "IDX_VAL", "VAL").forEach(col -> testJoin("T1", "T2",
col));
+ }
+
+ /** */
+ @Test
+ public void testJoinReplicated() {
+ Stream.of("ID", "IDX_VAL", "VAL").forEach(col -> testJoin("T1",
"DICT", col));
+ }
+
+ /** */
+ private void testJoin(String table1, String table2, String joinCol) {
+ String sqlStr = "select * from " + table1 + " join " + table2 +
+ " on " + table1 + "." + joinCol + "=" + table2 + "." + joinCol;
+
+ List<?> res = sql(sqlStr);
+
+ assertEquals(res.size(), cacheSize(table1 + "_CACHE", parts));
+ }
+
+ /** */
+ @Test
+ public void testInsertFromSelect() {
+ Stream.of(Pair.of("SELECT ID, IDX_VAL, VAL FROM T1 WHERE ID < ?",
ENTRIES_COUNT),
+ Pair.of("SELECT ID, IDX_VAL, VAL FROM T1", null))
+ .forEach(query -> {
+ try {
+ sql("CREATE TABLE T3(ID INT PRIMARY KEY, IDX_VAL VARCHAR,
VAL VARCHAR) WITH cache_name=t3_cache,backups=1");
+
+ sql("INSERT INTO T3(ID, IDX_VAL, VAL) " + query.left,
query.right);
+
+ assertEquals(cacheSize("T1_CACHE", parts),
cacheSize("T3_CACHE"));
+ }
+ finally {
+ client.cache("T3_CACHE").destroy();
+ }
+ });
+ }
+
+ /** */
+ @Test
+ public void testDelete() {
+ Stream.of(Pair.of("DELETE FROM T3 WHERE ID < ?", ENTRIES_COUNT),
Pair.of("DELETE FROM T3", null))
+ .forEach(query -> {
+ try {
+ sql("CREATE TABLE T3(ID INT PRIMARY KEY, IDX_VAL VARCHAR,
VAL VARCHAR) WITH cache_name=t3_cache,backups=1");
+
+ sql("INSERT INTO T3(ID, IDX_VAL, VAL) SELECT ID, IDX_VAL,
VAL FROM DICT");
+
+ assertEquals(ENTRIES_COUNT, cacheFullSize("T3_CACHE"));
+
+ long partsCnt = cacheSize("T3_CACHE", parts);
+
+ sql(query.left, query.right);
+
+ assertEquals(ENTRIES_COUNT - partsCnt,
cacheFullSize("T3_CACHE"));
+ }
+ finally {
+ client.cache("T3_CACHE").destroy();
+ }
+
+ });
+ }
+
+ /** */
+ @Test
+ public void testCreateTableAsSelect() {
+ Stream.of(Pair.of("SELECT ID, IDX_VAL, VAL FROM T1 WHERE ID < ?",
ENTRIES_COUNT),
+ Pair.of("SELECT ID, IDX_VAL, VAL FROM T1", null))
+ .forEach(query -> {
+ try {
+ sql("CREATE TABLE T3(ID, IDX_VAL, VAL) WITH
cache_name=t3_cache,backups=1 AS " + query.left, query.right);
+
+ assertEquals(cacheSize("T1_CACHE", parts),
cacheFullSize("T3_CACHE"));
+ }
+ finally {
+ client.cache("T3_CACHE").destroy();
+ }
+ });
+ }
+
+ /** */
+ private long cacheFullSize(String cacheName) {
+ return client.cache(cacheName).sizeLong(CachePeekMode.PRIMARY);
+ }
+
+ /** */
+ private long cacheSize(String cacheName, int... parts) {
+ IgniteCache<?, ?> cache = grid(0).cache(cacheName);
+
+ GridCacheContext<?, ?> ctx = grid(0).cachex(cacheName).context();
+
+ if (F.isEmpty(parts))
+ return local && ctx.isPartitioned() ?
+ cache.localSizeLong(CachePeekMode.PRIMARY) :
cache.sizeLong(CachePeekMode.PRIMARY);
+
+ return IntStream.of(parts).mapToLong(p -> {
+ if (local && ctx.isPartitioned())
+ return cache.localSizeLong(p, CachePeekMode.PRIMARY);
+ else
+ return cache.sizeLong(p, CachePeekMode.PRIMARY);
+ }).sum();
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index bbe6c64f1ea..25178aaef39 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.processors.query.calcite.integration.MemoryQuo
import
org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.QueryEngineConfigurationIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.QueryMetadataIntegrationTest;
+import
org.apache.ignite.internal.processors.query.calcite.integration.QueryWithPartitionsIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.RunningQueriesIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.SearchSargOnIndexIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.ServerStatisticsIntegrationTest;
@@ -117,6 +118,7 @@ import org.junit.runners.Suite;
SearchSargOnIndexIntegrationTest.class,
KeepBinaryIntegrationTest.class,
LocalQueryIntegrationTest.class,
+ QueryWithPartitionsIntegrationTest.class,
QueryMetadataIntegrationTest.class,
MemoryQuotasIntegrationTest.class,
LocalDateTimeSupportTest.class,