This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new baccd5fb4fd IGNITE-19725 SQL Calcite: Implement local flag support
(#10788)
baccd5fb4fd is described below
commit baccd5fb4fd33fddb4bf881b48166e43563b6c59
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Sun Jul 16 22:07:54 2023 +0300
IGNITE-19725 SQL Calcite: Implement local flag support (#10788)
---
.../query/calcite/CalciteQueryProcessor.java | 28 ++-
.../processors/query/calcite/RootQuery.java | 4 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 3 +-
.../query/calcite/metadata/ColocationGroup.java | 14 ++
.../query/calcite/metadata/FragmentMapping.java | 8 +
.../query/calcite/prepare/BaseQueryContext.java | 27 ++-
.../processors/query/calcite/prepare/Fragment.java | 8 +-
.../query/calcite/prepare/MappingQueryContext.java | 14 ++
.../query/calcite/rule/IndexCountRule.java | 13 +-
.../calcite/rule/LogicalScanConverterRule.java | 19 +-
.../processors/query/calcite/util/Commons.java | 7 +-
.../integration/AbstractBasicIntegrationTest.java | 10 +-
.../integration/LocalQueryIntegrationTest.java | 205 +++++++++++++++++++++
.../ignite/testsuites/IntegrationTestSuite.java | 2 +
14 files changed, 342 insertions(+), 20 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 6bc9cccf5fa..6f8d2f63e1f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -51,6 +51,7 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.QueryEngineConfiguration;
import org.apache.ignite.events.SqlQueryExecutionEvent;
@@ -431,11 +432,13 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
if (plan == null) {
AtomicBoolean miss = new AtomicBoolean();
- plan = queryPlanCache().queryPlan(new
CacheKey(schema.getName(), sql, null, params), () -> {
- miss.set(true);
+ plan = queryPlanCache().queryPlan(
+ new CacheKey(schema.getName(), sql,
contextKey(qryCtx), params),
+ () -> {
+ miss.set(true);
- return prepareSvc.prepareSingle(qryNode,
qry.planningContext());
- });
+ return prepareSvc.prepareSingle(qryNode,
qry.planningContext());
+ });
if (miss.get())
parserMetrics.countCacheMiss();
@@ -470,7 +473,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
assert schema != null : "Schema not found: " + schemaName;
- QueryPlan plan = queryPlanCache().queryPlan(new
CacheKey(schema.getName(), sql, null, params));
+ QueryPlan plan = queryPlanCache().queryPlan(new
CacheKey(schema.getName(), sql, contextKey(qryCtx), params));
if (plan != null) {
parserMetrics.countCacheHit();
@@ -493,7 +496,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
if (qryList.size() == 1) {
plan0 = queryPlanCache().queryPlan(
// Use source SQL to avoid redundant parsing next time.
- new CacheKey(schema.getName(), sql, null, params),
+ new CacheKey(schema.getName(), sql,
contextKey(qryCtx), params),
() -> prepareSvc.prepareSingle(sqlNode,
qry.planningContext())
);
}
@@ -524,6 +527,16 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
}
}
+ /** */
+ private Object contextKey(QueryContext qryCtx) {
+ if (qryCtx == null)
+ return null;
+
+ SqlFieldsQuery sqlFieldsQry = qryCtx.unwrap(SqlFieldsQuery.class);
+
+ return sqlFieldsQry != null ? sqlFieldsQry.isLocal() : null;
+ }
+
/** */
private <T> T processQuery(
@Nullable QueryContext qryCtx,
@@ -533,11 +546,14 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
@Nullable List<RootQuery<Object[]>> qrys,
Object... params
) {
+ SqlFieldsQuery fldsQry = qryCtx != null ?
qryCtx.unwrap(SqlFieldsQuery.class) : null;
+
RootQuery<Object[]> qry = new RootQuery<>(
sql,
schemaHolder.schema(schema),
params,
qryCtx,
+ fldsQry != null && fldsQry.isLocal(),
exchangeSvc,
(q, ex) -> qryReg.unregister(q.id(), ex),
log,
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index 22c1764a77b..61e34288416 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -103,6 +103,7 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
SchemaPlus schema,
Object[] params,
QueryContext qryCtx,
+ boolean isLocal,
ExchangeService exch,
BiConsumer<Query<RowT>, Throwable> unregister,
IgniteLogger log,
@@ -137,6 +138,7 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
.defaultSchema(schema)
.build()
)
+ .local(isLocal)
.logger(log)
.build();
}
@@ -150,7 +152,7 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
* @param schema new schema.
*/
public RootQuery<RowT> childQuery(SchemaPlus schema) {
- return new RootQuery<>(sql, schema, params, QueryContext.of(cancel),
exch, unregister, log, plannerTimeout);
+ return new RootQuery<>(sql, schema, params, QueryContext.of(cancel),
ctx.isLocal(), exch, unregister, log, plannerTimeout);
}
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 92c0b70a10b..3b96368c964 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -558,7 +558,8 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
) {
qry.mapping();
- MappingQueryContext mapCtx = Commons.mapContext(locNodeId,
topologyVersion());
+ MappingQueryContext mapCtx = Commons.mapContext(locNodeId,
topologyVersion(), qry.context().isLocal());
+
plan.init(mappingSvc, mapCtx);
List<Fragment> fragments = plan.fragments();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index 64a6d1072af..ffe91b17cb3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
@@ -77,6 +78,19 @@ public class ColocationGroup implements MarshalableMessage {
return new ColocationGroup(new long[] {sourceId}, null, null);
}
+ /** */
+ public ColocationGroup local(UUID nodeId) {
+ List<List<UUID>> localAssignments = null;
+ if (assignments != null) {
+ localAssignments = assignments.stream()
+ .map(l -> nodeId.equals(l.get(0)) ? l :
Collections.<UUID>emptyList())
+ .collect(Collectors.toList());
+ }
+
+ return new ColocationGroup(Arrays.copyOf(sourceIds, sourceIds.length),
Collections.singletonList(nodeId),
+ localAssignments);
+ }
+
/** */
public ColocationGroup() {
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
index f0f3235bf47..b82896bc93b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
@@ -116,6 +116,14 @@ public class FragmentMapping implements MarshalableMessage
{
return new FragmentMapping(first.colocate(second));
}
+ /** */
+ public FragmentMapping local(UUID nodeId) throws
ColocationMappingException {
+ if (colocationGroups.isEmpty())
+ return create(nodeId).colocate(this);
+
+ return new FragmentMapping(Commons.transform(colocationGroups, c ->
c.local(nodeId)));
+ }
+
/** */
public List<UUID> nodeIds() {
return colocationGroups.stream()
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
index d288ad921a2..c58926e70e6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
@@ -156,13 +156,17 @@ public final class BaseQueryContext extends
AbstractQueryContext {
/** */
private final GridQueryCancel qryCancel;
+ /** */
+ private final boolean isLocal;
+
/**
* Private constructor, used by a builder.
*/
private BaseQueryContext(
FrameworkConfig cfg,
Context parentCtx,
- IgniteLogger log
+ IgniteLogger log,
+ boolean isLocal
) {
super(Contexts.chain(parentCtx, cfg.getContext()));
@@ -171,6 +175,8 @@ public final class BaseQueryContext extends
AbstractQueryContext {
this.log = log;
+ this.isLocal = isLocal;
+
qryCancel = unwrap(GridQueryCancel.class);
typeFactory = TYPE_FACTORY;
@@ -265,6 +271,11 @@ public final class BaseQueryContext extends
AbstractQueryContext {
return EMPTY_CONTEXT;
}
+ /** */
+ public boolean isLocal() {
+ return isLocal;
+ }
+
/**
* Query context builder.
*/
@@ -285,6 +296,9 @@ public final class BaseQueryContext extends
AbstractQueryContext {
/** */
private IgniteLogger log = new NullLogger();
+ /** */
+ private boolean isLocal = false;
+
/**
* @param frameworkCfg Framework config.
* @return Builder for chaining.
@@ -312,13 +326,22 @@ public final class BaseQueryContext extends
AbstractQueryContext {
return this;
}
+ /**
+ * @param isLocal Local execution flag.
+ * @return Builder for chaining.
+ */
+ public Builder local(boolean isLocal) {
+ this.isLocal = isLocal;
+ return this;
+ }
+
/**
* Builds planner context.
*
* @return Planner context.
*/
public BaseQueryContext build() {
- return new BaseQueryContext(frameworkCfg, parentCtx, log);
+ return new BaseQueryContext(frameworkCfg, parentCtx, log, isLocal);
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 5ff0676ab44..6aeb944e0a1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -142,8 +142,12 @@ public class Fragment {
try {
FragmentMapping mapping =
IgniteMdFragmentMapping._fragmentMapping(root, mq, ctx);
- if (rootFragment())
- mapping =
FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
+ if (rootFragment()) {
+ if (ctx.isLocal())
+ mapping = mapping.local(ctx.localNodeId());
+ else
+ mapping =
FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
+ }
if (single() && mapping.nodeIds().size() > 1) {
// this is possible when the fragment contains scan of a
replicated cache, which brings
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
index 73f0e91de6c..2bdd1a0c913 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
@@ -38,10 +38,19 @@ public class MappingQueryContext {
/** */
private RelOptCluster cluster;
+ /** */
+ private final boolean isLocal;
+
/** */
public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer)
{
+ this(locNodeId, topVer, false);
+ }
+
+ /** */
+ public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer,
boolean isLocal) {
this.locNodeId = locNodeId;
this.topVer = topVer;
+ this.isLocal = isLocal;
}
/** */
@@ -54,6 +63,11 @@ public class MappingQueryContext {
return topVer;
}
+ /** */
+ public boolean isLocal() {
+ return isLocal;
+ }
+
/** Creates a cluster. */
RelOptCluster cluster() {
if (cluster == null) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IndexCountRule.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IndexCountRule.java
index 28449d38213..67a9eecbe1a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IndexCountRule.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IndexCountRule.java
@@ -31,6 +31,7 @@ import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.tools.RelBuilder;
import org.apache.ignite.internal.processors.query.QueryUtils;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount;
import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
@@ -101,10 +102,18 @@ public class IndexCountRule extends
RelRule<IndexCountRule.Config> {
if (idx == null)
return;
+ RelDistribution distribution;
+ BaseQueryContext baseQryCtx =
call.getPlanner().getContext().unwrap(BaseQueryContext.class);
+ if (baseQryCtx.isLocal())
+ distribution = IgniteDistributions.single();
+ else if (table.distribution().getType() ==
RelDistribution.Type.HASH_DISTRIBUTED)
+ distribution = IgniteDistributions.random();
+ else
+ distribution = table.distribution();
+
RelTraitSet idxTraits = aggr.getTraitSet()
.replace(IgniteConvention.INSTANCE)
- .replace(table.distribution().getType() ==
RelDistribution.Type.HASH_DISTRIBUTED ?
- IgniteDistributions.random() : table.distribution())
+ .replace(distribution)
.replace(RewindabilityTrait.REWINDABLE);
IgniteIndexCount idxCnt = new IgniteIndexCount(
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
index 28053b9c701..3d3f98c9186 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
@@ -36,6 +35,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.mapping.Mapping;
import org.apache.calcite.util.mapping.Mappings;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLog
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import
org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import org.apache.ignite.internal.util.typedef.F;
@@ -70,7 +71,13 @@ public abstract class LogicalScanConverterRule<T extends
ProjectableFilterableTa
return null;
}
- RelDistribution distribution = table.distribution();
+ RelDistribution distribution;
+ BaseQueryContext baseQryCtx =
planner.getContext().unwrap(BaseQueryContext.class);
+ if (baseQryCtx.isLocal())
+ distribution = IgniteDistributions.single();
+ else
+ distribution = table.distribution();
+
RelCollation collation = idx.collation();
if (rel.projects() != null || rel.requiredColumns() != null) {
@@ -125,7 +132,13 @@ public abstract class LogicalScanConverterRule<T extends
ProjectableFilterableTa
RelOptCluster cluster = rel.getCluster();
IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
- RelDistribution distribution = table.distribution();
+ RelDistribution distribution;
+ BaseQueryContext baseQryCtx =
planner.getContext().unwrap(BaseQueryContext.class);
+ if (baseQryCtx.isLocal())
+ distribution = IgniteDistributions.single();
+ else
+ distribution = table.distribution();
+
if (rel.requiredColumns() != null) {
Mappings.TargetMapping mapping = createMapping(
rel.projects(),
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 442ca86f000..fcafc98b301 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -444,6 +444,11 @@ public final class Commons {
/** */
public static MappingQueryContext mapContext(UUID locNodeId,
AffinityTopologyVersion topVer) {
- return new MappingQueryContext(locNodeId, topVer);
+ return mapContext(locNodeId, topVer, false);
+ }
+
+ /** */
+ public static MappingQueryContext mapContext(UUID locNodeId,
AffinityTopologyVersion topVer, boolean isLocal) {
+ return new MappingQueryContext(locNodeId, topVer, isLocal);
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index a4582842e4f..6348af14038 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryEngine;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
@@ -136,13 +137,18 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
protected List<List<?>> executeSql(String sql, Object... args) {
CalciteQueryProcessor qryProc =
Commons.lookupComponent(client.context(), CalciteQueryProcessor.class);
- List<FieldsQueryCursor<List<?>>> cur = qryProc.query(null, "PUBLIC",
sql, args);
+ List<FieldsQueryCursor<List<?>>> cur = qryProc.query(queryContext(),
"PUBLIC", sql, args);
try (QueryCursor<List<?>> srvCursor = cur.get(0)) {
return srvCursor.getAll();
}
}
+ /** */
+ protected QueryContext queryContext() {
+ return null;
+ }
+
/**
* Asserts that executeSql throws an exception.
*
@@ -196,7 +202,7 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
/** */
protected List<List<?>> sql(IgniteEx ignite, String sql, Object... params)
{
- List<FieldsQueryCursor<List<?>>> cur =
queryProcessor(ignite).query(null, "PUBLIC", sql, params);
+ List<FieldsQueryCursor<List<?>>> cur =
queryProcessor(ignite).query(queryContext(), "PUBLIC", sql, params);
try (QueryCursor<List<?>> srvCursor = cur.get(0)) {
return srvCursor.getAll();
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/LocalQueryIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/LocalQueryIntegrationTest.java
new file mode 100644
index 00000000000..7302eb6bd73
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/LocalQueryIntegrationTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.junit.Test;
+
+/** */
+public class LocalQueryIntegrationTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static final int ENTRIES_COUNT = 10000;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ cfg.getSqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected QueryChecker assertQuery(String qry) {
+ return assertQuery(grid(0), qry);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected QueryContext queryContext() {
+ return QueryContext.of(new SqlFieldsQuery("").setLocal(true));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<List<?>> sql(String sql, Object... params) {
+ return sql(grid(0), sql, params);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ sql("CREATE TABLE T1(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL VARCHAR)
WITH cache_name=t1_cache");
+ sql("CREATE TABLE T2(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL VARCHAR)
WITH cache_name=t2_cache");
+ sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL
VARCHAR) WITH template=replicated,cache_name=dict_cache");
+
+ sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)");
+ sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)");
+ sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)");
+
+ Stream.of("T1", "T2", "DICT").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ // Skip super method to keep caches after each test.
+ }
+
+ /** */
+ @Test
+ public void testSingle() {
+ test("select * from T1 order by id", "T1_CACHE");
+ }
+
+ /** */
+ @Test
+ public void testReplicated() {
+ List<List<?>> res = sql("select * from DICT");
+
+ assertEquals(grid(0).cache("DICT_CACHE").size(CachePeekMode.PRIMARY),
res.size());
+ }
+
+ /** */
+ @Test
+ public void testJoin() {
+ Stream.of("ID", "IDX_VAL", "VAL").forEach(col -> testJoin("T1", "T2",
col));
+ }
+
+ /** */
+ @Test
+ public void testJoinReplicated() {
+ Stream.of("ID", "IDX_VAL", "VAL").forEach(col -> testJoin("T1",
"DICT", col));
+ }
+
+ /** */
+ @Test
+ public void testInsertFromSelect() {
+ try {
+ sql("CREATE TABLE T3(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL
VARCHAR) WITH cache_name=t3_cache");
+
+ sql("INSERT INTO T3(ID, IDX_VAL, VAL) SELECT ID, IDX_VAL, VAL FROM
T1 WHERE ID < ?", ENTRIES_COUNT);
+
+
assertEquals(grid(0).cache("T1_CACHE").localSizeLong(CachePeekMode.PRIMARY),
+ client.cache("T3_CACHE").sizeLong(CachePeekMode.PRIMARY));
+ }
+ finally {
+ grid(0).cache("T3_CACHE").destroy();
+ }
+ }
+
+ /** */
+ @Test
+ public void testDelete() {
+ try {
+ sql("CREATE TABLE T3(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL
VARCHAR) WITH cache_name=t3_cache");
+
+ sql("INSERT INTO T3(ID, IDX_VAL, VAL) SELECT ID, IDX_VAL, VAL FROM
DICT");
+
+ assertEquals(ENTRIES_COUNT,
client.cache("T3_CACHE").sizeLong(CachePeekMode.PRIMARY));
+
+ long localSize =
grid(0).cache("T3_CACHE").localSizeLong(CachePeekMode.PRIMARY);
+
+ sql("DELETE FROM T3 WHERE ID < ?", ENTRIES_COUNT);
+
+ assertEquals(ENTRIES_COUNT - localSize,
client.cache("T3_CACHE").sizeLong(CachePeekMode.PRIMARY));
+ }
+ finally {
+ grid(0).cache("T3_CACHE").destroy();
+ }
+ }
+
+ /** */
+ @Test
+ public void testCreateTableAsSelect() {
+ try {
+ sql("CREATE TABLE T3(ID, IDX_VAL, VAL) WITH cache_name=t3_cache AS
SELECT ID, IDX_VAL, VAL FROM T1");
+
+
assertEquals(grid(0).cache("T1_CACHE").localSizeLong(CachePeekMode.PRIMARY),
+ client.cache("T3_CACHE").sizeLong(CachePeekMode.PRIMARY));
+ }
+ finally {
+ grid(0).cache("T3_CACHE").destroy();
+ }
+ }
+
+ /** */
+ @Test
+ public void testCount() {
+ Long cnt = (Long)sql("SELECT COUNT(*) FROM T1").get(0).get(0);
+
+
assertEquals(grid(0).cache("T1_CACHE").localSizeLong(CachePeekMode.PRIMARY),
cnt.longValue());
+ }
+
+ /** */
+ private void testJoin(String table1, String table2, String joinCol) {
+ String sql = "select * from " + table1 + " join " + table2 +
+ " on " + table1 + "." + joinCol + "=" + table2 + "." +
joinCol;
+
+ test(sql, table1 + "_CACHE");
+ }
+
+ /** */
+ private void test(String sql, String cacheName) {
+ List<List<?>> res = sql(sql);
+
+ Affinity<Object> aff = grid(0).affinity(cacheName);
+ ClusterNode localNode = grid(0).localNode();
+
+ List<?> primaries = res.stream().filter(l -> {
+ int part = aff.partition(l.get(0));
+
+ return aff.isPrimary(localNode, part);
+ }
+ ).collect(Collectors.toList());;
+
+ assertEquals(primaries.size(), res.size());
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index f2c51e0f97c..007f831350a 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -42,6 +42,7 @@ import
org.apache.ignite.internal.processors.query.calcite.integration.KillComma
import
org.apache.ignite.internal.processors.query.calcite.integration.KillQueryCommandDdlIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.LimitOffsetIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.LocalDateTimeSupportTest;
+import
org.apache.ignite.internal.processors.query.calcite.integration.LocalQueryIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.MemoryQuotasIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.QueryEngineConfigurationIntegrationTest;
@@ -114,6 +115,7 @@ import org.junit.runners.Suite;
QueryEngineConfigurationIntegrationTest.class,
SearchSargOnIndexIntegrationTest.class,
KeepBinaryIntegrationTest.class,
+ LocalQueryIntegrationTest.class,
QueryMetadataIntegrationTest.class,
MemoryQuotasIntegrationTest.class,
LocalDateTimeSupportTest.class,