This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new baccd5fb4fd IGNITE-19725 SQL Calcite: Implement local flag support 
(#10788)
baccd5fb4fd is described below

commit baccd5fb4fd33fddb4bf881b48166e43563b6c59
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Sun Jul 16 22:07:54 2023 +0300

    IGNITE-19725 SQL Calcite: Implement local flag support (#10788)
---
 .../query/calcite/CalciteQueryProcessor.java       |  28 ++-
 .../processors/query/calcite/RootQuery.java        |   4 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   |   3 +-
 .../query/calcite/metadata/ColocationGroup.java    |  14 ++
 .../query/calcite/metadata/FragmentMapping.java    |   8 +
 .../query/calcite/prepare/BaseQueryContext.java    |  27 ++-
 .../processors/query/calcite/prepare/Fragment.java |   8 +-
 .../query/calcite/prepare/MappingQueryContext.java |  14 ++
 .../query/calcite/rule/IndexCountRule.java         |  13 +-
 .../calcite/rule/LogicalScanConverterRule.java     |  19 +-
 .../processors/query/calcite/util/Commons.java     |   7 +-
 .../integration/AbstractBasicIntegrationTest.java  |  10 +-
 .../integration/LocalQueryIntegrationTest.java     | 205 +++++++++++++++++++++
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 14 files changed, 342 insertions(+), 20 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 6bc9cccf5fa..6f8d2f63e1f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -51,6 +51,7 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.SystemProperty;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
 import org.apache.ignite.configuration.QueryEngineConfiguration;
 import org.apache.ignite.events.SqlQueryExecutionEvent;
@@ -431,11 +432,13 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
                     if (plan == null) {
                         AtomicBoolean miss = new AtomicBoolean();
 
-                        plan = queryPlanCache().queryPlan(new 
CacheKey(schema.getName(), sql, null, params), () -> {
-                            miss.set(true);
+                        plan = queryPlanCache().queryPlan(
+                                new CacheKey(schema.getName(), sql, 
contextKey(qryCtx), params),
+                                () -> {
+                                    miss.set(true);
 
-                            return prepareSvc.prepareSingle(qryNode, 
qry.planningContext());
-                        });
+                                    return prepareSvc.prepareSingle(qryNode, 
qry.planningContext());
+                                });
 
                         if (miss.get())
                             parserMetrics.countCacheMiss();
@@ -470,7 +473,7 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
 
         assert schema != null : "Schema not found: " + schemaName;
 
-        QueryPlan plan = queryPlanCache().queryPlan(new 
CacheKey(schema.getName(), sql, null, params));
+        QueryPlan plan = queryPlanCache().queryPlan(new 
CacheKey(schema.getName(), sql, contextKey(qryCtx), params));
 
         if (plan != null) {
             parserMetrics.countCacheHit();
@@ -493,7 +496,7 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
                 if (qryList.size() == 1) {
                     plan0 = queryPlanCache().queryPlan(
                         // Use source SQL to avoid redundant parsing next time.
-                        new CacheKey(schema.getName(), sql, null, params),
+                        new CacheKey(schema.getName(), sql, 
contextKey(qryCtx), params),
                         () -> prepareSvc.prepareSingle(sqlNode, 
qry.planningContext())
                     );
                 }
@@ -524,6 +527,16 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
         }
     }
 
+    /** */
+    private Object contextKey(QueryContext qryCtx) {
+        if (qryCtx == null)
+            return null;
+
+        SqlFieldsQuery sqlFieldsQry = qryCtx.unwrap(SqlFieldsQuery.class);
+
+        return sqlFieldsQry != null ? sqlFieldsQry.isLocal() : null;
+    }
+
     /** */
     private <T> T processQuery(
         @Nullable QueryContext qryCtx,
@@ -533,11 +546,14 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
         @Nullable List<RootQuery<Object[]>> qrys,
         Object... params
     ) {
+        SqlFieldsQuery fldsQry = qryCtx != null ? 
qryCtx.unwrap(SqlFieldsQuery.class) : null;
+
         RootQuery<Object[]> qry = new RootQuery<>(
             sql,
             schemaHolder.schema(schema),
             params,
             qryCtx,
+            fldsQry != null && fldsQry.isLocal(),
             exchangeSvc,
             (q, ex) -> qryReg.unregister(q.id(), ex),
             log,
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index 22c1764a77b..61e34288416 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -103,6 +103,7 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
         SchemaPlus schema,
         Object[] params,
         QueryContext qryCtx,
+        boolean isLocal,
         ExchangeService exch,
         BiConsumer<Query<RowT>, Throwable> unregister,
         IgniteLogger log,
@@ -137,6 +138,7 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
                     .defaultSchema(schema)
                     .build()
             )
+            .local(isLocal)
             .logger(log)
             .build();
     }
@@ -150,7 +152,7 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
      * @param schema new schema.
      */
     public RootQuery<RowT> childQuery(SchemaPlus schema) {
-        return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), 
exch, unregister, log, plannerTimeout);
+        return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), 
ctx.isLocal(), exch, unregister, log, plannerTimeout);
     }
 
     /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 92c0b70a10b..3b96368c964 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -558,7 +558,8 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
     ) {
         qry.mapping();
 
-        MappingQueryContext mapCtx = Commons.mapContext(locNodeId, 
topologyVersion());
+        MappingQueryContext mapCtx = Commons.mapContext(locNodeId, 
topologyVersion(), qry.context().isLocal());
+
         plan.init(mappingSvc, mapCtx);
 
         List<Fragment> fragments = plan.fragments();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index 64a6d1072af..ffe91b17cb3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
@@ -77,6 +78,19 @@ public class ColocationGroup implements MarshalableMessage {
         return new ColocationGroup(new long[] {sourceId}, null, null);
     }
 
+    /** */
+    public ColocationGroup local(UUID nodeId) {
+        List<List<UUID>> localAssignments = null;
+        if (assignments != null) {
+            localAssignments = assignments.stream()
+                    .map(l -> nodeId.equals(l.get(0)) ? l : 
Collections.<UUID>emptyList())
+                    .collect(Collectors.toList());
+        }
+
+        return new ColocationGroup(Arrays.copyOf(sourceIds, sourceIds.length), 
Collections.singletonList(nodeId),
+                localAssignments);
+    }
+
     /** */
     public ColocationGroup() {
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
index f0f3235bf47..b82896bc93b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
@@ -116,6 +116,14 @@ public class FragmentMapping implements MarshalableMessage 
{
             return new FragmentMapping(first.colocate(second));
     }
 
+    /** */
+    public FragmentMapping local(UUID nodeId) throws 
ColocationMappingException {
+        if (colocationGroups.isEmpty())
+            return create(nodeId).colocate(this);
+
+        return new FragmentMapping(Commons.transform(colocationGroups, c -> 
c.local(nodeId)));
+    }
+
     /** */
     public List<UUID> nodeIds() {
         return colocationGroups.stream()
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
index d288ad921a2..c58926e70e6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
@@ -156,13 +156,17 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
     /** */
     private final GridQueryCancel qryCancel;
 
+    /** */
+    private final boolean isLocal;
+
     /**
      * Private constructor, used by a builder.
      */
     private BaseQueryContext(
         FrameworkConfig cfg,
         Context parentCtx,
-        IgniteLogger log
+        IgniteLogger log,
+        boolean isLocal
     ) {
         super(Contexts.chain(parentCtx, cfg.getContext()));
 
@@ -171,6 +175,8 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
 
         this.log = log;
 
+        this.isLocal = isLocal;
+
         qryCancel = unwrap(GridQueryCancel.class);
 
         typeFactory = TYPE_FACTORY;
@@ -265,6 +271,11 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         return EMPTY_CONTEXT;
     }
 
+    /** */
+    public boolean isLocal() {
+        return isLocal;
+    }
+
     /**
      * Query context builder.
      */
@@ -285,6 +296,9 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         /** */
         private IgniteLogger log = new NullLogger();
 
+        /** */
+        private boolean isLocal = false;
+
         /**
          * @param frameworkCfg Framework config.
          * @return Builder for chaining.
@@ -312,13 +326,22 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
             return this;
         }
 
+        /**
+         * @param isLocal Local execution flag.
+         * @return Builder for chaining.
+         */
+        public Builder local(boolean isLocal) {
+            this.isLocal = isLocal;
+            return this;
+        }
+
         /**
          * Builds planner context.
          *
          * @return Planner context.
          */
         public BaseQueryContext build() {
-            return new BaseQueryContext(frameworkCfg, parentCtx, log);
+            return new BaseQueryContext(frameworkCfg, parentCtx, log, isLocal);
         }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 5ff0676ab44..6aeb944e0a1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -142,8 +142,12 @@ public class Fragment {
         try {
             FragmentMapping mapping = 
IgniteMdFragmentMapping._fragmentMapping(root, mq, ctx);
 
-            if (rootFragment())
-                mapping = 
FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
+            if (rootFragment()) {
+                if (ctx.isLocal())
+                    mapping = mapping.local(ctx.localNodeId());
+                else
+                    mapping = 
FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
+            }
 
             if (single() && mapping.nodeIds().size() > 1) {
                 // this is possible when the fragment contains scan of a 
replicated cache, which brings
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
index 73f0e91de6c..2bdd1a0c913 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
@@ -38,10 +38,19 @@ public class MappingQueryContext {
     /** */
     private RelOptCluster cluster;
 
+    /** */
+    private final boolean isLocal;
+
     /** */
     public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer) 
{
+        this(locNodeId, topVer, false);
+    }
+
+    /** */
+    public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer, 
boolean isLocal) {
         this.locNodeId = locNodeId;
         this.topVer = topVer;
+        this.isLocal = isLocal;
     }
 
     /** */
@@ -54,6 +63,11 @@ public class MappingQueryContext {
         return topVer;
     }
 
+    /** */
+    public boolean isLocal() {
+        return isLocal;
+    }
+
     /** Creates a cluster. */
     RelOptCluster cluster() {
         if (cluster == null) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IndexCountRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IndexCountRule.java
index 28449d38213..67a9eecbe1a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IndexCountRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IndexCountRule.java
@@ -31,6 +31,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
@@ -101,10 +102,18 @@ public class IndexCountRule extends 
RelRule<IndexCountRule.Config> {
         if (idx == null)
             return;
 
+        RelDistribution distribution;
+        BaseQueryContext baseQryCtx = 
call.getPlanner().getContext().unwrap(BaseQueryContext.class);
+        if (baseQryCtx.isLocal())
+            distribution = IgniteDistributions.single();
+        else if (table.distribution().getType() == 
RelDistribution.Type.HASH_DISTRIBUTED)
+            distribution = IgniteDistributions.random();
+        else
+            distribution = table.distribution();
+
         RelTraitSet idxTraits = aggr.getTraitSet()
             .replace(IgniteConvention.INSTANCE)
-            .replace(table.distribution().getType() == 
RelDistribution.Type.HASH_DISTRIBUTED ?
-                IgniteDistributions.random() : table.distribution())
+            .replace(distribution)
             .replace(RewindabilityTrait.REWINDABLE);
 
         IgniteIndexCount idxCnt = new IgniteIndexCount(
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
index 28053b9c701..3d3f98c9186 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
@@ -36,6 +35,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.Mapping;
 import org.apache.calcite.util.mapping.Mappings;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLog
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -70,7 +71,13 @@ public abstract class LogicalScanConverterRule<T extends 
ProjectableFilterableTa
                     return null;
                 }
 
-                RelDistribution distribution = table.distribution();
+                RelDistribution distribution;
+                BaseQueryContext baseQryCtx = 
planner.getContext().unwrap(BaseQueryContext.class);
+                if (baseQryCtx.isLocal())
+                    distribution = IgniteDistributions.single();
+                else
+                    distribution = table.distribution();
+
                 RelCollation collation = idx.collation();
 
                 if (rel.projects() != null || rel.requiredColumns() != null) {
@@ -125,7 +132,13 @@ public abstract class LogicalScanConverterRule<T extends 
ProjectableFilterableTa
                 RelOptCluster cluster = rel.getCluster();
                 IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
 
-                RelDistribution distribution = table.distribution();
+                RelDistribution distribution;
+                BaseQueryContext baseQryCtx = 
planner.getContext().unwrap(BaseQueryContext.class);
+                if (baseQryCtx.isLocal())
+                    distribution = IgniteDistributions.single();
+                else
+                    distribution = table.distribution();
+
                 if (rel.requiredColumns() != null) {
                     Mappings.TargetMapping mapping = createMapping(
                         rel.projects(),
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 442ca86f000..fcafc98b301 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -444,6 +444,11 @@ public final class Commons {
 
     /** */
     public static MappingQueryContext mapContext(UUID locNodeId, 
AffinityTopologyVersion topVer) {
-        return new MappingQueryContext(locNodeId, topVer);
+        return mapContext(locNodeId, topVer, false);
+    }
+
+    /** */
+    public static MappingQueryContext mapContext(UUID locNodeId, 
AffinityTopologyVersion topVer, boolean isLocal) {
+        return new MappingQueryContext(locNodeId, topVer, isLocal);
     }
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index a4582842e4f..6348af14038 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.QueryEngine;
 import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
 import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
@@ -136,13 +137,18 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
     protected List<List<?>> executeSql(String sql, Object... args) {
         CalciteQueryProcessor qryProc = 
Commons.lookupComponent(client.context(), CalciteQueryProcessor.class);
 
-        List<FieldsQueryCursor<List<?>>> cur = qryProc.query(null, "PUBLIC", 
sql, args);
+        List<FieldsQueryCursor<List<?>>> cur = qryProc.query(queryContext(), 
"PUBLIC", sql, args);
 
         try (QueryCursor<List<?>> srvCursor = cur.get(0)) {
             return srvCursor.getAll();
         }
     }
 
+    /** */
+    protected QueryContext queryContext() {
+        return null;
+    }
+
     /**
      * Asserts that executeSql throws an exception.
      *
@@ -196,7 +202,7 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
 
     /** */
     protected List<List<?>> sql(IgniteEx ignite, String sql, Object... params) 
{
-        List<FieldsQueryCursor<List<?>>> cur = 
queryProcessor(ignite).query(null, "PUBLIC", sql, params);
+        List<FieldsQueryCursor<List<?>>> cur = 
queryProcessor(ignite).query(queryContext(), "PUBLIC", sql, params);
 
         try (QueryCursor<List<?>> srvCursor = cur.get(0)) {
             return srvCursor.getAll();
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/LocalQueryIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/LocalQueryIntegrationTest.java
new file mode 100644
index 00000000000..7302eb6bd73
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/LocalQueryIntegrationTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.junit.Test;
+
+/** */
+public class LocalQueryIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    private static final int ENTRIES_COUNT = 10000;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+        cfg.getSqlConfiguration().setQueryEnginesConfiguration(new 
CalciteQueryEngineConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected QueryChecker assertQuery(String qry) {
+        return assertQuery(grid(0), qry);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected QueryContext queryContext() {
+        return QueryContext.of(new SqlFieldsQuery("").setLocal(true));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected List<List<?>> sql(String sql, Object... params) {
+        return sql(grid(0), sql, params);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        sql("CREATE TABLE T1(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL VARCHAR) 
WITH cache_name=t1_cache");
+        sql("CREATE TABLE T2(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL VARCHAR) 
WITH cache_name=t2_cache");
+        sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL 
VARCHAR) WITH template=replicated,cache_name=dict_cache");
+
+        sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)");
+        sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)");
+        sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)");
+
+        Stream.of("T1", "T2", "DICT").forEach(tableName -> {
+            StringBuilder sb = new StringBuilder("INSERT INTO 
").append(tableName)
+                    .append("(ID, IDX_VAL, VAL) VALUES ");
+
+            for (int i = 0; i < 10000; ++i) {
+                sb.append("(").append(i).append(", ")
+                        .append("'name_").append(i).append("', ")
+                        .append("'name_").append(i).append("')");
+
+                if (i < ENTRIES_COUNT - 1)
+                    sb.append(",");
+            }
+
+            sql(sb.toString());
+
+            assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName + 
"_CACHE").size(CachePeekMode.PRIMARY));
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        // Skip super method to keep caches after each test.
+    }
+
+    /** */
+    @Test
+    public void testSingle() {
+        test("select * from T1 order by id", "T1_CACHE");
+    }
+
+    /** */
+    @Test
+    public void testReplicated() {
+        List<List<?>> res = sql("select * from DICT");
+
+        assertEquals(grid(0).cache("DICT_CACHE").size(CachePeekMode.PRIMARY), 
res.size());
+    }
+
+    /** */
+    @Test
+    public void testJoin() {
+        Stream.of("ID", "IDX_VAL", "VAL").forEach(col -> testJoin("T1", "T2", 
col));
+    }
+
+    /** */
+    @Test
+    public void testJoinReplicated() {
+        Stream.of("ID", "IDX_VAL", "VAL").forEach(col -> testJoin("T1", 
"DICT", col));
+    }
+
+    /** */
+    @Test
+    public void testInsertFromSelect() {
+        try {
+            sql("CREATE TABLE T3(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL 
VARCHAR) WITH cache_name=t3_cache");
+
+            sql("INSERT INTO T3(ID, IDX_VAL, VAL) SELECT ID, IDX_VAL, VAL FROM 
T1 WHERE ID < ?", ENTRIES_COUNT);
+
+            
assertEquals(grid(0).cache("T1_CACHE").localSizeLong(CachePeekMode.PRIMARY),
+                    client.cache("T3_CACHE").sizeLong(CachePeekMode.PRIMARY));
+        }
+        finally {
+            grid(0).cache("T3_CACHE").destroy();
+        }
+    }
+
+    /** */
+    @Test
+    public void testDelete() {
+        try {
+            sql("CREATE TABLE T3(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL 
VARCHAR) WITH cache_name=t3_cache");
+
+            sql("INSERT INTO T3(ID, IDX_VAL, VAL) SELECT ID, IDX_VAL, VAL FROM 
DICT");
+
+            assertEquals(ENTRIES_COUNT, 
client.cache("T3_CACHE").sizeLong(CachePeekMode.PRIMARY));
+
+            long localSize = 
grid(0).cache("T3_CACHE").localSizeLong(CachePeekMode.PRIMARY);
+
+            sql("DELETE FROM T3 WHERE ID < ?", ENTRIES_COUNT);
+
+            assertEquals(ENTRIES_COUNT - localSize, 
client.cache("T3_CACHE").sizeLong(CachePeekMode.PRIMARY));
+        }
+        finally {
+            grid(0).cache("T3_CACHE").destroy();
+        }
+    }
+
+    /** */
+    @Test
+    public void testCreateTableAsSelect() {
+        try {
+            sql("CREATE TABLE T3(ID, IDX_VAL, VAL) WITH cache_name=t3_cache AS 
SELECT ID, IDX_VAL, VAL FROM T1");
+
+            
assertEquals(grid(0).cache("T1_CACHE").localSizeLong(CachePeekMode.PRIMARY),
+                    client.cache("T3_CACHE").sizeLong(CachePeekMode.PRIMARY));
+        }
+        finally {
+            grid(0).cache("T3_CACHE").destroy();
+        }
+    }
+
+    /** */
+    @Test
+    public void testCount() {
+        Long cnt = (Long)sql("SELECT COUNT(*) FROM T1").get(0).get(0);
+
+        
assertEquals(grid(0).cache("T1_CACHE").localSizeLong(CachePeekMode.PRIMARY), 
cnt.longValue());
+    }
+
+    /** */
+    private void testJoin(String table1, String table2, String joinCol) {
+        String sql = "select * from " + table1 + " join " + table2 +
+                        " on " + table1 + "." + joinCol + "=" + table2 + "." + 
joinCol;
+
+        test(sql, table1 + "_CACHE");
+    }
+
+    /** */
+    private void test(String sql, String cacheName) {
+        List<List<?>> res = sql(sql);
+
+        Affinity<Object> aff = grid(0).affinity(cacheName);
+        ClusterNode localNode = grid(0).localNode();
+
+        List<?> primaries = res.stream().filter(l -> {
+                int part = aff.partition(l.get(0));
+
+                return aff.isPrimary(localNode, part);
+            }
+        ).collect(Collectors.toList());;
+
+        assertEquals(primaries.size(), res.size());
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index f2c51e0f97c..007f831350a 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.integration.KillComma
 import 
org.apache.ignite.internal.processors.query.calcite.integration.KillQueryCommandDdlIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.LimitOffsetIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.LocalDateTimeSupportTest;
+import 
org.apache.ignite.internal.processors.query.calcite.integration.LocalQueryIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.MemoryQuotasIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.QueryEngineConfigurationIntegrationTest;
@@ -114,6 +115,7 @@ import org.junit.runners.Suite;
     QueryEngineConfigurationIntegrationTest.class,
     SearchSargOnIndexIntegrationTest.class,
     KeepBinaryIntegrationTest.class,
+    LocalQueryIntegrationTest.class,
     QueryMetadataIntegrationTest.class,
     MemoryQuotasIntegrationTest.class,
     LocalDateTimeSupportTest.class,

Reply via email to