This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9481a9654b IGNITE-19908 Sql. EXPLAIN should return plan that actually
being used for execution (#2764)
9481a9654b is described below
commit 9481a9654b506bc689b8fd55639ff17212cda706
Author: korlov42 <[email protected]>
AuthorDate: Mon Oct 30 18:08:38 2023 +0200
IGNITE-19908 Sql. EXPLAIN should return plan that actually being used for
execution (#2764)
---
.../ignite/internal/benchmark/InsertBenchmark.java | 18 ++-
.../ignite/internal/benchmark/SelectBenchmark.java | 12 +-
.../internal/sql/engine/SqlQueryProcessor.java | 5 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 57 ++++++--
.../sql/engine/exec/mapping/FragmentSplitter.java | 10 +-
.../{prepare => exec/mapping}/IdGenerator.java | 19 +--
.../sql/engine/exec/mapping/MappingContext.java | 8 +-
.../engine/exec/mapping/MappingServiceImpl.java | 52 +++++--
.../{prepare => exec/mapping}/QuerySplitter.java | 60 +++++++--
.../internal/sql/engine/externalize/RelJson.java | 2 +-
.../sql/engine/externalize/RelJsonReader.java | 4 +-
.../ignite/internal/sql/engine/prepare/Cloner.java | 86 ------------
.../internal/sql/engine/prepare/DdlPlan.java | 17 ++-
.../internal/sql/engine/prepare/ExplainPlan.java | 27 ++--
.../internal/sql/engine/prepare/Fragment.java | 24 +---
.../internal/sql/engine/prepare/MultiStepPlan.java | 32 ++---
.../ignite/internal/sql/engine/prepare/PlanId.java | 62 +++++++++
.../sql/engine/prepare/PrepareServiceImpl.java | 139 +++++++++++++++----
.../internal/sql/engine/prepare/QueryPlan.java | 13 +-
.../sql/engine/rel/IgniteTableFunctionScan.java | 2 +-
.../internal/sql/engine/rel/IgniteValues.java | 2 +-
.../rel/agg/IgniteColocatedSortAggregate.java | 2 -
.../engine/rel/agg/IgniteReduceSortAggregate.java | 6 +
.../ignite/internal/sql/engine/util/Cloner.java | 50 +++++++
.../ignite/internal/sql/engine/util/Commons.java | 33 ++++-
.../sql/engine/exec/ExecutionServiceImplTest.java | 2 +-
.../engine/exec/exp/ExpressionFactoryImplTest.java | 4 +-
.../exec/mapping/MappingServiceImplTest.java | 49 +++++--
.../sql/engine/framework/TestBuilders.java | 2 +-
.../sql/engine/framework/TestClusterTest.java | 26 ++--
.../sql/engine/planner/AbstractPlannerTest.java | 8 +-
.../engine/planner/MapReduceAggregatesTest.java | 2 +-
.../sql/engine/prepare/PrepareServiceImplTest.java | 149 +++++++++++++++++++++
.../sql/engine/util/EmptyCacheFactory.java | 2 +-
.../sql/engine/util/StatementCheckerTest.java | 2 +-
35 files changed, 696 insertions(+), 292 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
index 00ad6beadb..f9a828a180 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
@@ -34,6 +34,8 @@ import org.apache.ignite.sql.Statement;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
@@ -41,6 +43,7 @@ import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
@@ -51,14 +54,17 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
* Benchmark for insertion operation, comparing KV, JDBC and SQL APIs.
*/
@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class InsertBenchmark extends AbstractOneNodeBenchmark {
/**
* Benchmark for SQL insert via embedded client.
*/
@Benchmark
- @Warmup(iterations = 1, time = 10)
- @Measurement(iterations = 1, time = 20)
public void sqlInsert(SqlState state) {
state.executeQuery();
}
@@ -67,8 +73,6 @@ public class InsertBenchmark extends AbstractOneNodeBenchmark
{
* Benchmark for KV insert via embedded client.
*/
@Benchmark
- @Warmup(iterations = 1, time = 10)
- @Measurement(iterations = 1, time = 20)
public void kvInsert(KvState state) {
state.executeQuery();
}
@@ -77,8 +81,6 @@ public class InsertBenchmark extends AbstractOneNodeBenchmark
{
* Benchmark for JDBC insert.
*/
@Benchmark
- @Warmup(iterations = 1, time = 10)
- @Measurement(iterations = 1, time = 20)
public void jdbcInsert(JdbcState state) throws SQLException {
state.executeQuery();
}
@@ -87,8 +89,6 @@ public class InsertBenchmark extends AbstractOneNodeBenchmark
{
* Benchmark for SQL insert via thin client.
*/
@Benchmark
- @Warmup(iterations = 1, time = 10)
- @Measurement(iterations = 1, time = 20)
public void sqlThinInsert(SqlThinState state) {
state.executeQuery();
}
@@ -97,8 +97,6 @@ public class InsertBenchmark extends AbstractOneNodeBenchmark
{
* Benchmark for KV insert via thin client.
*/
@Benchmark
- @Warmup(iterations = 1, time = 10)
- @Measurement(iterations = 1, time = 20)
public void kvThinInsert(KvThinState state) {
state.executeQuery();
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
index 9081380e7e..c65639491d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
@@ -55,6 +55,8 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
@State(Scope.Benchmark)
@Fork(1)
@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@SuppressWarnings({"WeakerAccess", "unused"})
@@ -89,8 +91,6 @@ public class SelectBenchmark extends AbstractOneNodeBenchmark
{
* Benchmark for SQL select via embedded client.
*/
@Benchmark
- @Warmup(iterations = 1, time = 10)
- @Measurement(iterations = 1, time = 20)
public void sqlGet(SqlState sqlState) {
try (var rs = sqlState.sql(SELECT_ALL_FROM_USERTABLE,
random.nextInt(TABLE_SIZE))) {
rs.next();
@@ -101,8 +101,6 @@ public class SelectBenchmark extends
AbstractOneNodeBenchmark {
* Benchmark for SQL select via thin client.
*/
@Benchmark
- @Warmup(iterations = 1, time = 10)
- @Measurement(iterations = 1, time = 20)
public void sqlThinGet(SqlThinState sqlState) {
try (var rs = sqlState.sql(SELECT_ALL_FROM_USERTABLE,
random.nextInt(TABLE_SIZE))) {
rs.next();
@@ -113,8 +111,6 @@ public class SelectBenchmark extends
AbstractOneNodeBenchmark {
* Benchmark for JDBC get.
*/
@Benchmark
- @Warmup(iterations = 1, time = 10)
- @Measurement(iterations = 1, time = 20)
public void jdbcGet(JdbcState state) throws SQLException {
state.stmt.setInt(1, random.nextInt(TABLE_SIZE));
try (ResultSet r = state.stmt.executeQuery()) {
@@ -126,8 +122,6 @@ public class SelectBenchmark extends
AbstractOneNodeBenchmark {
* Benchmark for KV get via embedded client.
*/
@Benchmark
- @Warmup(iterations = 1, time = 10)
- @Measurement(iterations = 1, time = 20)
public void kvGet() {
keyValueView.get(null, Tuple.create().set("ycsb_key",
random.nextInt(TABLE_SIZE)));
}
@@ -136,8 +130,6 @@ public class SelectBenchmark extends
AbstractOneNodeBenchmark {
* Benchmark for KV get via thin client.
*/
@Benchmark
- @Warmup(iterations = 1, time = 10)
- @Measurement(iterations = 1, time = 20)
public void kvThinGet(KvThinState kvState) {
kvState.kvView().get(null, Tuple.create().set("ycsb_key",
random.nextInt(TABLE_SIZE)));
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 425d791d8e..59b17b5c7d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -310,7 +310,9 @@ public class SqlQueryProcessor implements QueryProcessor {
}
};
- var mappingService = new MappingServiceImpl(nodeName,
executionTargetProvider, taskExecutor);
+ var mappingService = new MappingServiceImpl(
+ nodeName, executionTargetProvider, CACHE_FACTORY,
PLAN_CACHE_SIZE, taskExecutor
+ );
logicalTopologyService.addEventListener(mappingService);
@@ -501,7 +503,6 @@ public class SqlQueryProcessor implements QueryProcessor {
var dataCursor = executionSrvc.executePlan(txWrapper.unwrap(), plan,
ctx);
SqlQueryType queryType = plan.type();
- assert queryType != null : "Expected a full plan but got a fragment: "
+ plan;
numberOfOpenCursors.incrementAndGet();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 4ea4bbf74e..7c545550a4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -39,6 +40,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -65,7 +68,6 @@ import
org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
import org.apache.ignite.internal.sql.engine.message.QueryStartResponse;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
-import org.apache.ignite.internal.sql.engine.prepare.Cloner;
import org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
@@ -80,6 +82,7 @@ import
org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.sql.engine.util.Cloner;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -98,9 +101,9 @@ import org.jetbrains.annotations.Nullable;
public class ExecutionServiceImpl<RowT> implements ExecutionService,
TopologyEventHandler {
private static final int CACHE_SIZE = 1024;
- private final ConcurrentMap<String, IgniteRel> physNodesCache =
Caffeine.newBuilder()
+ private final ConcurrentMap<FragmentCacheKey, IgniteRel> physNodesCache =
Caffeine.newBuilder()
.maximumSize(CACHE_SIZE)
- .<String, IgniteRel>build()
+ .<FragmentCacheKey, IgniteRel>build()
.asMap();
private static final IgniteLogger LOG =
Loggers.forClass(ExecutionServiceImpl.class);
@@ -248,10 +251,11 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
.build();
}
- private IgniteRel relationalTreeFromJsonString(String jsonFragment,
BaseQueryContext ctx) {
- IgniteRel plan = physNodesCache.computeIfAbsent(jsonFragment, ser ->
fromJson(ctx, ser));
-
- return new Cloner(Commons.cluster()).visit(plan);
+ private IgniteRel relationalTreeFromJsonString(int schemaVersion, String
jsonFragment, BaseQueryContext ctx) {
+ return physNodesCache.computeIfAbsent(
+ new FragmentCacheKey(schemaVersion, jsonFragment),
+ key -> fromJson(ctx, key.fragmentString)
+ );
}
/** {@inheritDoc} */
@@ -320,7 +324,11 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan,
@Nullable QueryPrefetchCallback callback) {
- List<List<Object>> res = List.of(List.of(plan.plan()));
+ IgniteRel clonedRoot = Cloner.clone(plan.plan().root(),
Commons.cluster());
+
+ String planString = RelOptUtil.toString(clonedRoot,
SqlExplainLevel.ALL_ATTRIBUTES);
+
+ List<List<Object>> res = List.of(List.of(planString));
if (callback != null) {
taskExecutor.execute(() -> callback.onPrefetchComplete(null));
@@ -616,7 +624,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
Executor exec = (r) -> context.execute(r::run, err ->
handleError(err, initiatorNode, desc.fragmentId()));
start.thenCompose(none -> {
- IgniteRel treeRoot =
relationalTreeFromJsonString(fragmentString, ctx);
+ IgniteRel treeRoot =
relationalTreeFromJsonString(schemaVersion, fragmentString, ctx);
return
dependencyResolver.resolveDependencies(List.of(treeRoot), schemaVersion)
.thenComposeAsync(deps -> executeFragment(treeRoot,
deps, context), exec);
@@ -975,4 +983,35 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
/** Creates the relational node implementor with the given context. */
LogicalRelImplementor<RowT> create(ExecutionContext<RowT> ctx,
ResolvedDependencies resolvedDependencies);
}
+
+ private static class FragmentCacheKey {
+ private final int schemaVersion;
+ private final String fragmentString;
+
+ FragmentCacheKey(int schemaVersion, String fragmentString) {
+ this.schemaVersion = schemaVersion;
+ this.fragmentString = fragmentString;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FragmentCacheKey that = (FragmentCacheKey) o;
+
+ return schemaVersion == that.schemaVersion
+ && fragmentString.equals(that.fragmentString);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaVersion, fragmentString);
+ }
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
index f23ed61d6a..2167fd819e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
@@ -28,7 +28,6 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
-import org.apache.ignite.internal.sql.engine.prepare.IdGenerator;
import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
@@ -50,13 +49,16 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
class FragmentSplitter extends IgniteRelShuttle {
private final Deque<FragmentProto> stack = new LinkedList<>();
+ private final IdGenerator idGenerator;
+
private RelNode cutPoint;
private FragmentProto curr;
private boolean correlated = false;
- FragmentSplitter(RelNode cutPoint) {
+ FragmentSplitter(IdGenerator idGenerator, RelNode cutPoint) {
+ this.idGenerator = idGenerator;
this.cutPoint = cutPoint;
}
@@ -64,7 +66,7 @@ class FragmentSplitter extends IgniteRelShuttle {
public List<Fragment> go(Fragment fragment) {
ArrayList<Fragment> res = new ArrayList<>();
- stack.push(new FragmentProto(IdGenerator.nextId(),
fragment.correlated(), fragment.root()));
+ stack.push(new FragmentProto(idGenerator.nextId(),
fragment.correlated(), fragment.root()));
while (!stack.isEmpty()) {
curr = stack.pop();
@@ -197,7 +199,7 @@ class FragmentSplitter extends IgniteRelShuttle {
RelNode input = rel instanceof IgniteTrimExchange ? rel.getInput(0) :
rel;
long targetFragmentId = curr.id;
- long sourceFragmentId = IdGenerator.nextId();
+ long sourceFragmentId = idGenerator.nextId();
long exchangeId = sourceFragmentId;
IgniteReceiver receiver = new IgniteReceiver(cluster, traits, rowType,
exchangeId, sourceFragmentId);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IdGenerator.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/IdGenerator.java
similarity index 69%
rename from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IdGenerator.java
rename to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/IdGenerator.java
index af5e54bfcc..bf9fb6c13f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IdGenerator.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/IdGenerator.java
@@ -15,21 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.prepare;
-
-import java.util.concurrent.atomic.AtomicLong;
+package org.apache.ignite.internal.sql.engine.exec.mapping;
/**
- * IdGenerator.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * A simple id generator that returns monotonically increasing long value
started from given initial id.
+ *
+ * <p>Not thread safe.
*/
public class IdGenerator {
- private static final AtomicLong ID_GEN = new AtomicLong();
+ private long currentId;
- private IdGenerator() {
+ public IdGenerator(long initialId) {
+ this.currentId = initialId;
}
- public static long nextId() {
- return ID_GEN.getAndIncrement();
+ /** Returns next id. */
+ long nextId() {
+ return currentId++;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
index 1e90649b5b..4d559694aa 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
@@ -19,10 +19,7 @@ package org.apache.ignite.internal.sql.engine.exec.mapping;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.rel.metadata.CachingRelMetadataProvider;
import
org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory;
-import org.apache.ignite.internal.sql.engine.metadata.IgniteMetadata;
-import org.apache.ignite.internal.sql.engine.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.sql.engine.util.Commons;
/**
@@ -49,10 +46,7 @@ class MappingContext {
public RelOptCluster cluster() {
if (cluster == null) {
- cluster = RelOptCluster.create(Commons.cluster().getPlanner(),
Commons.cluster().getRexBuilder());
- cluster.setMetadataProvider(new
CachingRelMetadataProvider(IgniteMetadata.METADATA_PROVIDER,
- Commons.cluster().getPlanner()));
- cluster.setMetadataQuerySupplier(RelMetadataQueryEx::create);
+ cluster = Commons.cluster();
}
return cluster;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index 0c6c567a5e..53897532d3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -33,15 +33,18 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.calcite.plan.RelOptCluster;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.prepare.PlanId;
import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
-import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.cache.Cache;
+import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite.lang.ErrorGroups.Sql;
/**
@@ -58,6 +61,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
private final String localNodeName;
private final ExecutionTargetProvider targetProvider;
+ private final Cache<PlanId, FragmentsTemplate> templatesCache;
private final Executor taskExecutor;
@@ -66,15 +70,20 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
*
* @param localNodeName Name of the current Ignite node.
* @param targetProvider Execution target provider.
+ * @param cacheFactory A factory to create cache of fragments.
+ * @param cacheSize Size of the cache of query plans. Should be non
negative.
* @param taskExecutor Mapper service task executor.
*/
public MappingServiceImpl(
String localNodeName,
ExecutionTargetProvider targetProvider,
+ CacheFactory cacheFactory,
+ int cacheSize,
Executor taskExecutor
) {
this.localNodeName = localNodeName;
this.targetProvider = targetProvider;
+ this.templatesCache = cacheFactory.create(cacheSize);
this.taskExecutor = taskExecutor;
}
@@ -88,16 +97,16 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
}
private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan
multiStepPlan) {
- List<Fragment> fragments = multiStepPlan.fragments();
-
List<String> nodes = topologyHolder.nodes();
-
MappingContext context = new MappingContext(localNodeName, nodes);
- List<Fragment> fragments0 = Commons.transform(fragments, fragment ->
fragment.attach(context.cluster()));
+ FragmentsTemplate template = getOrCreateTemplate(multiStepPlan,
context);
+
+ IdGenerator idGenerator = new IdGenerator(template.nextId);
+ List<Fragment> fragments = new ArrayList<>(template.fragments);
List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets =
- fragments0.stream().flatMap(fragment -> Stream.concat(
+ fragments.stream().flatMap(fragment -> Stream.concat(
fragment.tables().stream()
.map(table ->
targetProvider.forTable(context.targetFactory(), table)
.thenApply(target ->
IntObjectPair.of(table.id(), target))
@@ -120,14 +129,14 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
targetsById.put(pair.firstInt(), pair.second());
}
- FragmentMapper mapper = new
FragmentMapper(context.cluster().getMetadataQuery(), context, targetsById);
+ FragmentMapper mapper = new
FragmentMapper(template.cluster.getMetadataQuery(), context, targetsById);
Long2ObjectMap<FragmentMapping> mappingByFragmentId = new
Long2ObjectOpenHashMap<>();
Long2ObjectMap<ColocationGroup> groupsBySourceId = new
Long2ObjectOpenHashMap<>();
Long2ObjectMap<List<String>> allSourcesByExchangeId = new
Long2ObjectOpenHashMap<>();
Exception ex = null;
boolean lastAttemptSucceed = true;
- List<Fragment> fragmentsToMap = fragments0;
+ List<Fragment> fragmentsToMap = fragments;
for (int attempt = 0; attempt < MAPPING_ATTEMPTS;
attempt++) {
Fragment currentFragment = null;
try {
@@ -170,7 +179,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
fragmentsToMap = replace(
fragmentsToMap,
currentFragment,
- new
FragmentSplitter(mappingException.node()).go(currentFragment)
+ new FragmentSplitter(idGenerator,
mappingException.node()).go(currentFragment)
);
lastAttemptSucceed = false;
@@ -304,4 +313,29 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
.collect(Collectors.toUnmodifiableList());
}
}
+
+ private FragmentsTemplate getOrCreateTemplate(MultiStepPlan plan,
MappingContext context) {
+ // QuerySplitter is deterministic, thus we can cache result in order
to reuse it next time
+ return templatesCache.get(plan.id(), key -> {
+ IdGenerator idGenerator = new IdGenerator(0);
+
+ List<Fragment> fragments = new QuerySplitter(idGenerator,
context.cluster()).split(plan.root());
+
+ return new FragmentsTemplate(
+ idGenerator.nextId(), context.cluster(), fragments
+ );
+ });
+ }
+
+ private static class FragmentsTemplate {
+ private final long nextId;
+ private final RelOptCluster cluster;
+ private final List<Fragment> fragments;
+
+ FragmentsTemplate(long nextId, RelOptCluster cluster, List<Fragment>
fragments) {
+ this.nextId = nextId;
+ this.cluster = cluster;
+ this.fragments = fragments;
+ }
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/QuerySplitter.java
similarity index 70%
rename from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java
rename to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/QuerySplitter.java
index 7ed65752a1..e898e1559a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/QuerySplitter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.prepare;
+package org.apache.ignite.internal.sql.engine.exec.mapping;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
@@ -24,6 +24,8 @@ import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.ignite.internal.sql.engine.prepare.Fragment;
+import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
@@ -37,26 +39,62 @@ import
org.apache.ignite.internal.sql.engine.rel.IgniteTrimExchange;
import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel;
import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.util.Cloner;
import org.apache.ignite.internal.sql.engine.util.Commons;
/**
* Splits a query into a list of query fragments.
+ *
+ * <p>Distributed query tree may have number of {@link IgniteExchange}
operators. These operators
+ * represents points where one distribution should be converted to another. In
runtime, this operation
+ * implemented as physical exchange of messages between two nodes (or itself,
it's intentional tradeoff).
+ *
+ * <p>{@link QuerySplitter} is meant to find all {@link IgniteExchange}
operators, cut the tree in that places,
+ * and replace single exchange with {@link IgniteReceiver} and {@link
IgniteSender} on target and source fragments
+ * accordingly. Here is an example for a simple `SELECT * FROM test` query:
+ *
+ * <pre>
+ * Here is a relation tree representing the query:
+ *
+ * IgniteExchange(distribution=[single]) ------------| we've got exchange
here because table distributed by affinity, meaning
+ * IgniteTableScan(table=[[PUBLIC, TEST]]) | every node owns
only a few partitions of that table. Thus, in order to
+ * | get all rows in a
single place, we need to resend them. In general it
+ * | means, that
messages will be sent over network, but in some cases it
+ * After splitting it will be represented | may be local call
as well (for instance, it is a single node cluster)
+ * by two fragments:
+ *
+ * Fragment#1:
+ * IgniteReceiver(source=Fragment#2, exchangeId=3)
+ *
+ * Fragment#2:
+ * IgniteSender(target=Fragment#1, exchangeId=3)
+ * IgniteTableScan(table=[[PUBLIC, TEST]])
+ * </pre>
*/
public class QuerySplitter extends IgniteRelShuttle {
private final Deque<FragmentProto> stack = new LinkedList<>();
+ private final RelOptCluster cluster;
+ private final IdGenerator idGenerator;
+
private FragmentProto curr;
private boolean correlated = false;
+ public QuerySplitter(IdGenerator idGenerator, RelOptCluster cluster) {
+ this.idGenerator = idGenerator;
+ this.cluster = cluster;
+ }
+
/**
- * Go.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Splits given relation tree on a list of {@link Fragment fragments}.
+ *
+ * <p>See class-level javadoc for details.
*/
- public List<Fragment> go(IgniteRel root) {
+ public List<Fragment> split(IgniteRel root) {
ArrayList<Fragment> res = new ArrayList<>();
- stack.push(new FragmentProto(IdGenerator.nextId(), false, root));
+ stack.push(new FragmentProto(idGenerator.nextId(), false, root));
while (!stack.isEmpty()) {
curr = stack.pop();
@@ -68,7 +106,7 @@ public class QuerySplitter extends IgniteRelShuttle {
// V V
// IgniteSort#285 IgniteSort#285
// IgniteTableScan#180 IgniteTableScan#180
- curr.root = Cloner.clone(curr.root);
+ curr.root = Cloner.clone(curr.root, cluster);
correlated = curr.correlated;
@@ -110,7 +148,7 @@ public class QuerySplitter extends IgniteRelShuttle {
RelOptCluster cluster = rel.getCluster();
long targetFragmentId = curr.id;
- long sourceFragmentId = IdGenerator.nextId();
+ long sourceFragmentId = idGenerator.nextId();
long exchangeId = sourceFragmentId;
IgniteReceiver receiver = new IgniteReceiver(cluster,
rel.getTraitSet(), rel.getRowType(), exchangeId, sourceFragmentId);
@@ -126,7 +164,7 @@ public class QuerySplitter extends IgniteRelShuttle {
/** {@inheritDoc} */
@Override
public IgniteRel visit(IgniteTrimExchange rel) {
- return ((SourceAwareIgniteRel)
processNode(rel)).clone(IdGenerator.nextId());
+ return ((SourceAwareIgniteRel)
processNode(rel)).clone(idGenerator.nextId());
}
/** {@inheritDoc} */
@@ -140,7 +178,7 @@ public class QuerySplitter extends IgniteRelShuttle {
curr.tables.add(table);
}
- return rel.clone(IdGenerator.nextId());
+ return rel.clone(idGenerator.nextId());
}
/** {@inheritDoc} */
@@ -154,7 +192,7 @@ public class QuerySplitter extends IgniteRelShuttle {
curr.tables.add(table);
}
- return rel.clone(IdGenerator.nextId());
+ return rel.clone(idGenerator.nextId());
}
/** {@inheritDoc} */
@@ -182,7 +220,7 @@ public class QuerySplitter extends IgniteRelShuttle {
curr.systemViews.add(view);
}
- return rel.clone(IdGenerator.nextId());
+ return rel.clone(idGenerator.nextId());
}
private static class FragmentProto {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
index b5ba35b959..d34f09e07a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
@@ -414,7 +414,7 @@ class RelJson {
private Object toJson(RexNode node) {
// removes calls to SEARCH and the included Sarg and converts them to
comparisons
- node = RexUtil.expandSearch(Commons.cluster().getRexBuilder(), null,
node);
+ node = RexUtil.expandSearch(Commons.emptyCluster().getRexBuilder(),
null, node);
Map<String, Object> map;
switch (node.getKind()) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
index 23c41333b8..4b75d5868b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
@@ -133,13 +133,13 @@ public class RelJsonReader {
/** {@inheritDoc} */
@Override
public RelOptCluster getCluster() {
- return Commons.cluster();
+ return Commons.emptyCluster();
}
/** {@inheritDoc} */
@Override
public RelTraitSet getTraitSet() {
- return Commons.cluster().traitSet();
+ return Commons.emptyCluster().traitSet();
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java
deleted file mode 100644
index ed2918bfdb..0000000000
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.prepare;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
-import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
-import org.apache.ignite.internal.sql.engine.util.Commons;
-
-/**
- * Cloner.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
-public class Cloner {
- private final RelOptCluster cluster;
-
- private List<IgniteReceiver> remotes;
-
- public Cloner(RelOptCluster cluster) {
- this.cluster = cluster;
- }
-
- /**
- * Clones and associates a plan with a new cluster.
- *
- * @param src Fragment to clone.
- * @return New plan.
- */
- public Fragment go(Fragment src) {
- try {
- remotes = new ArrayList<>();
-
- IgniteRel newRoot = visit(src.root());
-
- return new Fragment(src.fragmentId(), src.correlated(), newRoot,
src.serialized(),
- remotes, src.tables(), src.systemViews());
- } finally {
- remotes = null;
- }
- }
-
- /**
- * Clone.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
- public static IgniteRel clone(IgniteRel r) {
- Cloner c = new Cloner(r.getCluster());
-
- return c.visit(r);
- }
-
- private IgniteRel collect(IgniteRel rel) {
- if (rel instanceof IgniteReceiver && remotes != null) {
- remotes.add((IgniteReceiver) rel);
- }
-
- return rel;
- }
-
- /**
- * Clones and associates a plan with a new cluster.
- *
- * @param rel The head of the relational tree.
- * @return A new tree.
- */
- public IgniteRel visit(IgniteRel rel) {
- return collect(rel.clone(cluster, Commons.transform(rel.getInputs(),
rel0 -> visit((IgniteRel) rel0))));
- }
-}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/DdlPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/DdlPlan.java
index c594cc0e46..3b27d68b85 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/DdlPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/DdlPlan.java
@@ -35,9 +35,11 @@ public class DdlPlan implements QueryPlan {
private static final ResultSetMetadata DDL_METADATA = new
ResultSetMetadataImpl(List.of(
new ColumnMetadataImpl("APPLIED", ColumnType.BOOLEAN, 1,
ColumnMetadata.UNDEFINED_SCALE, false, null)));
+ private final PlanId id;
private final DdlCommand cmd;
- public DdlPlan(DdlCommand cmd) {
+ DdlPlan(PlanId id, DdlCommand cmd) {
+ this.id = id;
this.cmd = cmd;
}
@@ -45,6 +47,12 @@ public class DdlPlan implements QueryPlan {
return cmd;
}
+ /** {@inheritDoc} */
+ @Override
+ public PlanId id() {
+ return id;
+ }
+
/** {@inheritDoc} */
@Override
public SqlQueryType type() {
@@ -59,12 +67,7 @@ public class DdlPlan implements QueryPlan {
/** {@inheritDoc} */
@Override
- public QueryPlan copy() {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
+ public String toString() {
return cmd.toString();
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ExplainPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ExplainPlan.java
index 78089a7ef0..4cb3522629 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ExplainPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ExplainPlan.java
@@ -28,38 +28,39 @@ import org.apache.ignite.sql.ResultSetMetadata;
/**
* Query explain plan.
*/
-public class ExplainPlan implements QueryPlan {
+public class ExplainPlan implements QueryPlan {
/** Explain metadata holder. */
private static final ResultSetMetadata EXPLAIN_METADATA = new
ResultSetMetadataImpl(List.of(
new ColumnMetadataImpl("PLAN", ColumnType.STRING,
ColumnMetadata.UNDEFINED_PRECISION,
ColumnMetadata.UNDEFINED_SCALE, true, null)));
- private final String plan;
+ private final PlanId id;
+ private final MultiStepPlan plan;
- /**
- * Constructor.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
- public ExplainPlan(String plan) {
+ ExplainPlan(PlanId id, MultiStepPlan plan) {
+ this.id = id;
this.plan = plan;
}
/** {@inheritDoc} */
- @Override public SqlQueryType type() {
- return SqlQueryType.EXPLAIN;
+ @Override
+ public PlanId id() {
+ return id;
}
/** {@inheritDoc} */
- @Override public QueryPlan copy() {
- return this;
+ @Override
+ public SqlQueryType type() {
+ return SqlQueryType.EXPLAIN;
}
/** {@inheritDoc} */
- @Override public ResultSetMetadata metadata() {
+ @Override
+ public ResultSetMetadata metadata() {
return EXPLAIN_METADATA;
}
- public String plan() {
+ public MultiStepPlan plan() {
return plan;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
index 6fe68c3d62..36a53859b6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.prepare;
import static
org.apache.ignite.internal.sql.engine.externalize.RelJsonWriter.toJson;
import java.util.List;
-import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -30,7 +29,6 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
-import org.jetbrains.annotations.Nullable;
/**
* Fragment of distributed query.
@@ -62,28 +60,12 @@ public class Fragment {
*/
public Fragment(long id, boolean correlated, IgniteRel root,
List<IgniteReceiver> remotes,
List<IgniteTable> tables, List<IgniteSystemView> systemViews) {
- this(id, correlated, root, null, remotes, tables, systemViews);
- }
-
- /**
- * Constructor.
- *
- * @param id An identifier of this fragment.
- * @param correlated Whether some correlated variables should be set prior
to fragment execution.
- * @param root Root node of the fragment.
- * @param rootSer Serialised representation of a root. Optional.
- * @param remotes Remote sources of the fragment.
- * @param tables A list of tables containing by this fragment.
- * @param systemViews A list of system views containing by this fragment.
- */
- Fragment(long id, boolean correlated, IgniteRel root, @Nullable String
rootSer, List<IgniteReceiver> remotes,
- List<IgniteTable> tables, List<IgniteSystemView> systemViews) {
this.id = id;
this.root = root;
this.remotes = List.copyOf(remotes);
this.tables = List.copyOf(tables);
this.systemViews = List.copyOf(systemViews);
- this.rootSer = rootSer != null ? rootSer : toJson(root);
+ this.rootSer = toJson(root);
this.correlated = correlated;
}
@@ -139,10 +121,6 @@ public class Fragment {
return !(root instanceof IgniteSender);
}
- public Fragment attach(RelOptCluster cluster) {
- return root.getCluster() == cluster ? this : new
Cloner(cluster).go(this);
- }
-
public boolean single() {
return rootFragment() || (root instanceof IgniteSender
&& ((IgniteSender)
root).sourceDistribution().satisfies(IgniteDistributions.single()));
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
index 7cca94a716..558755e1bb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.sql.engine.prepare;
-import java.util.List;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.sql.ResultSetMetadata;
import org.jetbrains.annotations.Nullable;
@@ -27,19 +27,27 @@ import org.jetbrains.annotations.Nullable;
*/
public class MultiStepPlan implements QueryPlan {
+ private final PlanId id;
+
private final SqlQueryType type;
- protected final ResultSetMetadata meta;
+ private final ResultSetMetadata meta;
- protected final List<Fragment> fragments;
+ private final IgniteRel root;
- /** Constructor. */
- public MultiStepPlan(SqlQueryType type, List<Fragment> fragments,
ResultSetMetadata meta) {
+ MultiStepPlan(PlanId id, SqlQueryType type, IgniteRel root,
ResultSetMetadata meta) {
+ this.id = id;
this.type = type;
- this.fragments = fragments;
+ this.root = root;
this.meta = meta;
}
+ /** {@inheritDoc} */
+ @Override
+ public PlanId id() {
+ return id;
+ }
+
/** {@inheritDoc} */
@Override
public ResultSetMetadata metadata() {
@@ -53,14 +61,8 @@ public class MultiStepPlan implements QueryPlan {
return type;
}
- /** {@inheritDoc} */
- @Override
- public QueryPlan copy() {
- return new MultiStepPlan(type, fragments, meta);
- }
-
- /** A list for fragment this query plan consists of. */
- public List<Fragment> fragments() {
- return fragments;
+ /** Returns root of the query tree. */
+ public IgniteRel root() {
+ return root;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
new file mode 100644
index 0000000000..112500b461
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.prepare;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * Unique identifier of a query plan.
+ *
+ * <p>This identifier consists of unique identifier randomly generated by
particular
+ * instance of {@link PrepareService} and monotonically increasing plan number
within this instance.
+ */
+public class PlanId implements Serializable {
+ private static final long serialVersionUID = -7944926799869032327L;
+
+ private final UUID prepareServiceId;
+ private final long planNumber;
+
+ PlanId(UUID prepareServiceId, long planNumber) {
+ this.prepareServiceId = prepareServiceId;
+ this.planNumber = planNumber;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PlanId planId1 = (PlanId) o;
+
+ return planNumber == planId1.planNumber
+ && prepareServiceId.equals(planId1.prepareServiceId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = prepareServiceId.hashCode();
+ result = 31 * result + (int) (planNumber ^ (planNumber >>> 32));
+ return result;
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 6ca87e2f56..beff874de2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -26,18 +26,19 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.SqlDdl;
import org.apache.calcite.sql.SqlExplain;
-import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
@@ -51,6 +52,8 @@ import
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverte
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.sql.engine.util.Cloner;
+import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.sql.engine.util.cache.Cache;
import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
@@ -58,6 +61,7 @@ import
org.apache.ignite.internal.sql.metrics.SqlPlanCacheMetricSource;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.ResultSetMetadata;
@@ -82,6 +86,9 @@ public class PrepareServiceImpl implements PrepareService {
private static final int THREAD_COUNT = 4;
+ private final UUID prepareServiceId = UUID.randomUUID();
+ private final AtomicLong planIdGen = new AtomicLong();
+
private final DdlSqlToCommandConverter ddlConverter;
private final Cache<CacheKey, CompletableFuture<QueryPlan>> cache;
@@ -222,30 +229,50 @@ public class PrepareServiceImpl implements PrepareService
{
assert sqlNode instanceof SqlDdl : sqlNode == null ? "null" :
sqlNode.getClass().getName();
- return CompletableFuture.completedFuture(new
DdlPlan(ddlConverter.convert((SqlDdl) sqlNode, ctx)));
+ return CompletableFuture.completedFuture(new DdlPlan(nextPlanId(),
ddlConverter.convert((SqlDdl) sqlNode, ctx)));
}
private CompletableFuture<QueryPlan> prepareExplain(ParsedResult
parsedResult, PlanningContext ctx) {
- return CompletableFuture.supplyAsync(() -> {
- IgnitePlanner planner = ctx.planner();
+ SqlNode parsedTree = parsedResult.parsedTree();
- SqlNode sqlNode = parsedResult.parsedTree();
+ assert single(parsedTree);
+ assert parsedTree instanceof SqlExplain :
parsedTree.getClass().getCanonicalName();
- assert single(sqlNode);
+ SqlNode explicandum = ((SqlExplain) parsedTree).getExplicandum();
- // Validate
- // We extract query subtree inside the validator.
- SqlNode explainNode = planner.validate(sqlNode);
- // Extract validated query.
- SqlNode validNode = ((SqlExplain) explainNode).getExplicandum();
+ SqlQueryType queryType = Commons.getQueryType(explicandum);
- // Convert to Relational operators graph
- IgniteRel igniteRel = optimize(validNode, planner);
+ if (queryType != SqlQueryType.QUERY && queryType != SqlQueryType.DML) {
+ return CompletableFuture.failedFuture(new SqlException(
+ Sql.STMT_PARSE_ERR, "Failed to parse query: Incorrect
syntax near the keyword " + queryType
+ ));
+ }
+
+ ParsedResult newParsedResult = new ParsedResultImpl(
+ queryType,
+ parsedResult.originalQuery(),
+ explicandum.toString(),
+ parsedResult.dynamicParamsCount(),
+ explicandum
+ );
+
+ CompletableFuture<QueryPlan> result;
+ switch (queryType) {
+ case QUERY:
+ result = prepareQuery(newParsedResult, ctx);
+ break;
+ case DML:
+ result = prepareDml(newParsedResult, ctx);
+ break;
+ default:
+ throw new AssertionError("should not get here");
+ }
- String plan = RelOptUtil.toString(igniteRel,
SqlExplainLevel.ALL_ATTRIBUTES);
+ return result.thenApply(plan -> {
+ assert plan instanceof MultiStepPlan : plan == null ? "<null>" :
plan.getClass().getCanonicalName();
- return new ExplainPlan(plan);
- }, planningPool);
+ return new ExplainPlan(nextPlanId(), (MultiStepPlan) plan);
+ });
}
private static boolean single(SqlNode sqlNode) {
@@ -269,14 +296,20 @@ public class PrepareServiceImpl implements PrepareService
{
IgniteRel igniteRel = optimize(validatedNode, planner);
- // Split query plan to query fragments.
- List<Fragment> fragments = new QuerySplitter().go(igniteRel);
+ // cluster keeps a lot of cached stuff that won't be used anymore.
+ // In order let GC collect that, let's reattach tree to an empty
cluster
+ // before storing tree in plan cache
+ IgniteRel clonedTree = Cloner.clone(igniteRel,
Commons.emptyCluster());
- return new MultiStepPlan(SqlQueryType.QUERY, fragments,
+ return new MultiStepPlan(nextPlanId(), SqlQueryType.QUERY,
clonedTree,
resultSetMetadata(validated.dataType(),
validated.origins(), validated.aliases()));
}, planningPool));
- return planFut.thenApply(QueryPlan::copy);
+ return planFut.thenApply(Function.identity());
+ }
+
+ private PlanId nextPlanId() {
+ return new PlanId(prepareServiceId, planIdGen.getAndIncrement());
}
private CompletableFuture<QueryPlan> prepareDml(ParsedResult parsedResult,
PlanningContext ctx) {
@@ -295,13 +328,15 @@ public class PrepareServiceImpl implements PrepareService
{
// Convert to Relational operators graph
IgniteRel igniteRel = optimize(validatedNode, planner);
- // Split query plan to query fragments.
- List<Fragment> fragments = new QuerySplitter().go(igniteRel);
+ // cluster keeps a lot of cached stuff that won't be used anymore.
+ // In order let GC collect that, let's reattach tree to an empty
cluster
+ // before storing tree in plan cache
+ IgniteRel clonedTree = Cloner.clone(igniteRel,
Commons.emptyCluster());
- return new MultiStepPlan(SqlQueryType.DML, fragments,
DML_METADATA);
+ return new MultiStepPlan(nextPlanId(), SqlQueryType.DML,
clonedTree, DML_METADATA);
}, planningPool));
- return planFut.thenApply(QueryPlan::copy);
+ return planFut.thenApply(Function.identity());
}
private static CacheKey createCacheKey(ParsedResult parsedResult,
PlanningContext ctx) {
@@ -315,7 +350,7 @@ public class PrepareServiceImpl implements PrepareService {
return new CacheKey(catalogVersion, ctx.schemaName(),
parsedResult.normalizedQuery(), distributed, paramTypes);
}
- private ResultSetMetadata resultSetMetadata(
+ private static ResultSetMetadata resultSetMetadata(
RelDataType rowType,
@Nullable List<List<String>> origins,
List<String> aliases
@@ -344,4 +379,56 @@ public class PrepareServiceImpl implements PrepareService {
}
);
}
+
+ private static class ParsedResultImpl implements ParsedResult {
+ private final SqlQueryType queryType;
+ private final String originalQuery;
+ private final String normalizedQuery;
+ private final int dynamicParamCount;
+ private final SqlNode parsedTree;
+
+ private ParsedResultImpl(
+ SqlQueryType queryType,
+ String originalQuery,
+ String normalizedQuery,
+ int dynamicParamCount,
+ SqlNode parsedTree
+ ) {
+ this.queryType = queryType;
+ this.originalQuery = originalQuery;
+ this.normalizedQuery = normalizedQuery;
+ this.dynamicParamCount = dynamicParamCount;
+ this.parsedTree = parsedTree;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SqlQueryType queryType() {
+ return queryType;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String originalQuery() {
+ return originalQuery;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String normalizedQuery() {
+ return normalizedQuery;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int dynamicParamsCount() {
+ return dynamicParamCount;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SqlNode parsedTree() {
+ return parsedTree;
+ }
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
index c26f2d75e6..95d752d1b2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine.prepare;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.sql.ResultSetMetadata;
-import org.jetbrains.annotations.Nullable;
/**
* QueryPlan interface.
@@ -27,17 +26,17 @@ import org.jetbrains.annotations.Nullable;
*/
public interface QueryPlan {
/**
- * Get query type, or {@code null} if this is a fragment.
+ * Get a unique identifier of a plan.
*/
- @Nullable SqlQueryType type();
+ PlanId id();
/**
- * Get fields metadata.
+ * Get query type, or {@code null} if this is a fragment.
*/
- ResultSetMetadata metadata();
+ SqlQueryType type();
/**
- * Clones this plan.
+ * Get fields metadata.
*/
- QueryPlan copy();
+ ResultSetMetadata metadata();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableFunctionScan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableFunctionScan.java
index 0a7e244fc0..bf586f40b8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableFunctionScan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableFunctionScan.java
@@ -33,7 +33,7 @@ import org.apache.calcite.rel.metadata.RelColumnMapping;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
-import org.apache.ignite.internal.sql.engine.prepare.QuerySplitter;
+import org.apache.ignite.internal.sql.engine.exec.mapping.QuerySplitter;
/**
* Relational operator for table function scan.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteValues.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteValues.java
index 33f6924576..745986280a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteValues.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteValues.java
@@ -29,7 +29,7 @@ import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Values;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
-import org.apache.ignite.internal.sql.engine.prepare.QuerySplitter;
+import org.apache.ignite.internal.sql.engine.exec.mapping.QuerySplitter;
/**
* IgniteValues.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedSortAggregate.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedSortAggregate.java
index a39e382dc6..b99549b43e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedSortAggregate.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteColocatedSortAggregate.java
@@ -102,8 +102,6 @@ public class IgniteColocatedSortAggregate extends
IgniteColocatedAggregateBase i
/** {@inheritDoc} */
@Override
public RelCollation collation() {
- assert collation.equals(super.collation());
-
return collation;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteReduceSortAggregate.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteReduceSortAggregate.java
index cf60c766c1..357ba1c42d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteReduceSortAggregate.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/IgniteReduceSortAggregate.java
@@ -133,4 +133,10 @@ public class IgniteReduceSortAggregate extends
IgniteReduceAggregateBase impleme
0
);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public RelCollation collation() {
+ return collation;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Cloner.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Cloner.java
new file mode 100644
index 0000000000..e7d72e98aa
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Cloner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+
+/**
+ * Utility class to clone relational tree and optionally replace assigned
{@link RelOptCluster cluster}
+ * to another one.
+ */
+public class Cloner {
+ private final RelOptCluster cluster;
+
+ private Cloner(RelOptCluster cluster) {
+ this.cluster = cluster;
+ }
+
+ /**
+ * Clones a given relational tree and reassigns copy to a given cluster.
+ *
+ * @param root Root of the tree to clone.
+ * @param cluster Cluster to attach copy to.
+ * @return Copy of the given tree.
+ */
+ public static IgniteRel clone(IgniteRel root, RelOptCluster cluster) {
+ Cloner c = new Cloner(cluster);
+
+ return c.visit(root);
+ }
+
+ private IgniteRel visit(IgniteRel rel) {
+ return rel.clone(cluster, Commons.transform(rel.getInputs(), rel0 ->
visit((IgniteRel) rel0)));
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index 3791bcb681..323ae265e1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -84,6 +84,8 @@ import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.sql.engine.exec.exp.RexExecutorImpl;
import org.apache.ignite.internal.sql.engine.hint.IgniteHint;
+import org.apache.ignite.internal.sql.engine.metadata.IgniteMetadata;
+import org.apache.ignite.internal.sql.engine.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.sql.engine.prepare.IgniteConvertletTable;
import org.apache.ignite.internal.sql.engine.prepare.IgniteTypeCoercion;
@@ -273,12 +275,12 @@ public final class Commons {
* Standalone type factory.
*/
public static IgniteTypeFactory typeFactory() {
- return typeFactory(cluster());
+ return typeFactory(emptyCluster());
}
/** Row-expression builder. **/
public static RexBuilder rexBuilder() {
- return cluster().getRexBuilder();
+ return emptyCluster().getRexBuilder();
}
/**
@@ -750,10 +752,35 @@ public final class Commons {
};
}
- public static RelOptCluster cluster() {
+ /**
+ * Returns cluster that can be used only as a stub to keep query tree in
the cache.
+ *
+ * <p>Any attempt to invoke operation involving cluster on a tree attached
to this cluster
+ * will result in an error.
+ *
+ * @return A stub of a cluster.
+ */
+ public static RelOptCluster emptyCluster() {
return CLUSTER;
}
+ /**
+ * Returns cluster that may be used to acquire metadata from a relation
tree, but should not
+ * be used for planning.
+ *
+ * @return A new cluster.
+ */
+ public static RelOptCluster cluster() {
+ RelOptCluster emptyCluster = emptyCluster();
+
+ RelOptCluster cluster =
RelOptCluster.create(emptyCluster.getPlanner(), emptyCluster.getRexBuilder());
+
+ cluster.setMetadataProvider(IgniteMetadata.METADATA_PROVIDER);
+ cluster.setMetadataQuerySupplier(RelMetadataQueryEx::create);
+
+ return cluster;
+ }
+
/**
* Checks whether an implicit PK mode enabled or not.
*
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 9860a7d4bb..b2fbfd294d 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -755,7 +755,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
}
};
- var mappingService = new MappingServiceImpl(nodeName, targetProvider,
taskExecutor);
+ var mappingService = new MappingServiceImpl(nodeName, targetProvider,
EmptyCacheFactory.INSTANCE, 0, taskExecutor);
List<LogicalNode> logicalNodes = nodeNames.stream()
.map(name -> new LogicalNode(name, name,
NetworkAddress.from("127.0.0.1:10000")))
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
index 9ae7670630..46d1c20084 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImplTest.java
@@ -197,7 +197,7 @@ public class ExpressionFactoryImplTest extends
BaseIgniteAbstractTest {
RexNode val1 = rexBuilder.makeExactLiteral(new BigDecimal("1"));
RexNode val2 = rexBuilder.makeExactLiteral(new BigDecimal("2"));
- RelDataTypeSystem typeSystem =
Commons.cluster().getTypeFactory().getTypeSystem();
+ RelDataTypeSystem typeSystem =
Commons.emptyCluster().getTypeFactory().getTypeSystem();
RexLocalRef ref1 = rexBuilder.makeLocalRef(new
BasicSqlType(typeSystem, SqlTypeName.INTEGER), conditionSatisfyIdx ? 1 : 3);
RexLocalRef ref2 = rexBuilder.makeLocalRef(new
BasicSqlType(typeSystem, SqlTypeName.INTEGER), 2);
@@ -215,7 +215,7 @@ public class ExpressionFactoryImplTest extends
BaseIgniteAbstractTest {
.build();
// build bounds for two sequential columns also belongs to index
- List<SearchBounds> bounds =
RexUtils.buildSortedSearchBounds(Commons.cluster(),
+ List<SearchBounds> bounds =
RexUtils.buildSortedSearchBounds(Commons.emptyCluster(),
RelCollations.of(ImmutableIntList.of(1, 2)), andCondition,
rowType, ImmutableBitSet.of(0, 1, 2));
if (!conditionSatisfyIdx) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index eb002020e2..cdf1dad118 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -34,7 +34,6 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
-import static org.apache.ignite.internal.sql.engine.SqlQueryType.QUERY;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -47,11 +46,14 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -59,7 +61,35 @@ import org.mockito.Mockito;
/**
* Test class to verify {@link MappingServiceImpl}.
*/
+@SuppressWarnings("ThrowFromFinallyBlock")
public class MappingServiceImplTest extends BaseIgniteAbstractTest {
+ private static final MultiStepPlan PLAN;
+
+ static {
+ //@formatter:off
+ TestCluster cluster = TestBuilders.cluster()
+ .nodes("N1")
+ .addTable()
+ .name("T1")
+ .addKeyColumn("ID", NativeTypes.INT32)
+ .addColumn("VAL", NativeTypes.INT32)
+ .end()
+ .build();
+ //@formatter:on
+
+ cluster.start();
+
+ try {
+ PLAN = (MultiStepPlan) cluster.node("N1").prepare("SELECT * FROM
t1");
+ } finally {
+ try {
+ cluster.stop();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
@Test
public void serviceInitializationTest() {
String localNodeName = "NODE0";
@@ -67,8 +97,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
MappingServiceImpl mappingService =
createMappingService(localNodeName, List.of(localNodeName));
mappingService.onNodeJoined(Mockito.mock(LogicalNode.class), new
LogicalTopologySnapshot(1, logicalNodes(localNodeName)));
- CompletableFuture<List<MappedFragment>> mappingFuture = mappingService
- .map(new MultiStepPlan(QUERY, List.of(), new
ResultSetMetadataImpl(List.of())));
+ CompletableFuture<List<MappedFragment>> mappingFuture =
mappingService.map(PLAN);
assertThat(mappingFuture, willSucceedFast());
}
@@ -80,8 +109,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
MappingServiceImpl mappingService =
createMappingService(localNodeName, nodeNames);
- CompletableFuture<List<MappedFragment>> mappingFuture = mappingService
- .map(new MultiStepPlan(QUERY, List.of(), new
ResultSetMetadataImpl(List.of())));
+ CompletableFuture<List<MappedFragment>> mappingFuture =
mappingService.map(PLAN);
// Mapping should wait for service initialization.
assertFalse(mappingFuture.isDone());
@@ -94,7 +122,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
mappingService.onTopologyLeap(new LogicalTopologySnapshot(2,
logicalNodes("NODE", "NODE1", "NODE2")));
assertThat(mappingFuture, willSucceedFast());
- assertThat(mappingService.map(new MultiStepPlan(QUERY, List.of(), new
ResultSetMetadataImpl(List.of()))), willSucceedFast());
+ assertThat(mappingService.map(PLAN), willSucceedFast());
}
@Test
@@ -104,8 +132,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
MappingServiceImpl mappingService =
createMappingService(localNodeName, nodeNames);
- CompletableFuture<List<MappedFragment>> mappingFuture = mappingService
- .map(new MultiStepPlan(QUERY, List.of(), new
ResultSetMetadataImpl(List.of())));
+ CompletableFuture<List<MappedFragment>> mappingFuture =
mappingService.map(PLAN);
// Mapping should wait for service initialization.
assertFalse(mappingFuture.isDone());
@@ -121,7 +148,7 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
new LogicalTopologySnapshot(2, logicalNodes("NODE", "NODE1",
"NODE2")));
assertThat(mappingFuture, willSucceedFast());
- assertThat(mappingService.map(new MultiStepPlan(QUERY, List.of(), new
ResultSetMetadataImpl(List.of()))), willSucceedFast());
+ assertThat(mappingService.map(PLAN), willSucceedFast());
}
private static List<LogicalNode> logicalNodes(String... nodeNames) {
@@ -143,6 +170,6 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
}
};
- return new MappingServiceImpl(localNodeName, targetProvider,
Runnable::run);
+ return new MappingServiceImpl(localNodeName, targetProvider,
EmptyCacheFactory.INSTANCE, 0, Runnable::run);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 7c38d9c095..c94b2470b8 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -573,7 +573,7 @@ public class TestBuilders {
.map(name -> {
var systemViewManager = new
SystemViewManagerImpl(name, catalogManager);
var targetProvider = new
TestNodeExecutionTargetProvider(systemViewManager, owningNodesByTableName);
- var mappingService = new MappingServiceImpl(name,
targetProvider, Runnable::run);
+ var mappingService = new MappingServiceImpl(name,
targetProvider, EmptyCacheFactory.INSTANCE, 0, Runnable::run);
systemViewManager.register(() -> systemViews);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
index 9a40045c4a..ea06372cce 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
@@ -33,10 +33,10 @@ import
org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
-import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
import org.apache.ignite.internal.systemview.api.SystemViews;
@@ -153,8 +153,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest
{
// Ensure the plan contains full table scan.
assertTrue(plan instanceof MultiStepPlan);
- Fragment fragment = ((MultiStepPlan) plan).fragments().get(1);
- assertTrue(fragment.root().getInput(0) instanceof IgniteTableScan);
+ assertTrue(lastNode(((MultiStepPlan) plan).root()) instanceof
IgniteTableScan);
}
@Test
@@ -175,8 +174,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest
{
// Ensure the plan contains full table scan.
assertTrue(plan instanceof MultiStepPlan);
- Fragment fragment = ((MultiStepPlan) plan).fragments().get(1);
- assertTrue(fragment.root().getInput(0) instanceof IgniteTableScan);
+ assertTrue(lastNode(((MultiStepPlan) plan).root()) instanceof
IgniteTableScan);
}
@Test
@@ -192,9 +190,8 @@ public class TestClusterTest extends BaseIgniteAbstractTest
{
// Ensure the plan uses index.
assertTrue(plan instanceof MultiStepPlan);
- Fragment fragment = ((MultiStepPlan) plan).fragments().get(1);
- assertTrue(fragment.root().getInput(0) instanceof IgniteIndexScan);
- assertEquals("T1_PK", ((IgniteIndexScan)
fragment.root().getInput(0)).indexName());
+ assertTrue(lastNode(((MultiStepPlan) plan).root()) instanceof
IgniteIndexScan);
+ assertEquals("T1_PK", ((IgniteIndexScan) lastNode(((MultiStepPlan)
plan).root())).indexName());
}
@Test
@@ -210,9 +207,16 @@ public class TestClusterTest extends
BaseIgniteAbstractTest {
// Ensure the plan uses index.
assertTrue(plan instanceof MultiStepPlan);
- Fragment fragment = ((MultiStepPlan) plan).fragments().get(1);
- assertTrue(fragment.root().getInput(0) instanceof IgniteIndexScan);
- assertEquals("SORTED_IDX", ((IgniteIndexScan)
fragment.root().getInput(0)).indexName());
+ assertTrue(lastNode(((MultiStepPlan) plan).root()) instanceof
IgniteIndexScan);
+ assertEquals("SORTED_IDX", ((IgniteIndexScan)
lastNode(((MultiStepPlan) plan).root())).indexName());
+ }
+
+ private static IgniteRel lastNode(IgniteRel root) {
+ while (!root.getInputs().isEmpty()) {
+ root = (IgniteRel) root.getInput(0);
+ }
+
+ return root;
}
@Test
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index edcd61e3b0..cab0e9e5de 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -74,17 +74,17 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
+import org.apache.ignite.internal.sql.engine.exec.mapping.IdGenerator;
+import org.apache.ignite.internal.sql.engine.exec.mapping.QuerySplitter;
import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.HashIndexBuilder;
import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.SortedIndexBuilder;
import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
-import org.apache.ignite.internal.sql.engine.prepare.Cloner;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.IgnitePlanner;
import org.apache.ignite.internal.sql.engine.prepare.PlannerHelper;
import org.apache.ignite.internal.sql.engine.prepare.PlanningContext;
-import org.apache.ignite.internal.sql.engine.prepare.QuerySplitter;
import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -644,9 +644,7 @@ public abstract class AbstractPlannerTest extends
IgniteAbstractTest {
assertNotNull(rel);
assertFalse(schemas.isEmpty());
- rel = Cloner.clone(rel);
-
- List<Fragment> fragments = new QuerySplitter().go(rel);
+ List<Fragment> fragments = new QuerySplitter(new IdGenerator(0),
rel.getCluster()).split(rel);
List<String> serialized = new ArrayList<>(fragments.size());
for (Fragment fragment : fragments) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceAggregatesTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceAggregatesTest.java
index c38ee82484..9a4ded58dc 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceAggregatesTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MapReduceAggregatesTest.java
@@ -61,7 +61,7 @@ public class MapReduceAggregatesTest {
.add("f2", SqlTypeName.INTEGER)
.build();
- RelOptCluster cluster = Commons.cluster();
+ RelOptCluster cluster = Commons.emptyCluster();
RelTraitSet traitSet = RelTraitSet.createEmpty();
// Input to aggregate
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
new file mode 100644
index 0000000000..d0057bbec4
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.prepare;
+
+import static
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
+import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
+import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests to verify {@link PrepareServiceImpl}.
+ */
+@SuppressWarnings("DataFlowIssue")
+class PrepareServiceImplTest extends BaseIgniteAbstractTest {
+ private static final List<PrepareService> createdServices = new
ArrayList<>();
+
+ @AfterEach
+ void stopServices() throws Exception {
+ for (PrepareService createdService : createdServices) {
+ createdService.stop();
+ }
+
+ createdServices.clear();
+ }
+
+ @Test
+ void prepareServiceReturnsExistingPlanForExplain() {
+ PrepareService service = createPlannerService();
+
+ QueryPlan queryPlan = await(service.prepareAsync(
+ parse("SELECT * FROM t"),
+ createContext()
+ ));
+
+ QueryPlan explainPlan = await(service.prepareAsync(
+ parse("explain plan for select * from t"),
+ createContext()
+ ));
+
+ assertThat(explainPlan, instanceOf(ExplainPlan.class));
+
+ ExplainPlan plan = (ExplainPlan) explainPlan;
+
+ assertThat(plan.plan(), sameInstance(queryPlan));
+ }
+
+ @Test
+ void prepareServiceCachesPlanCreatedForExplain() {
+ PrepareService service = createPlannerService();
+
+ QueryPlan explainPlan = await(service.prepareAsync(
+ parse("explain plan for select * from t"),
+ createContext()
+ ));
+
+ QueryPlan queryPlan = await(service.prepareAsync(
+ parse("SELECT * FROM t"),
+ createContext()
+ ));
+
+ assertThat(explainPlan, instanceOf(ExplainPlan.class));
+
+ ExplainPlan plan = (ExplainPlan) explainPlan;
+
+ assertThat(plan.plan(), sameInstance(queryPlan));
+ }
+
+ private static ParsedResult parse(String query) {
+ return new ParserServiceImpl(0,
EmptyCacheFactory.INSTANCE).parse(query);
+ }
+
+ private static BaseQueryContext createContext() {
+ return BaseQueryContext.builder()
+ .queryId(UUID.randomUUID())
+ .frameworkConfig(
+ Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(wrap(createSchema()))
+ .build()
+ )
+ .build();
+ }
+
+ private static SchemaPlus wrap(IgniteSchema schema) {
+ var schemaPlus = Frameworks.createRootSchema(false);
+
+ schemaPlus.add(schema.getName(), schema);
+
+ return schemaPlus.getSubSchema(schema.getName());
+ }
+
+ private static IgniteSchema createSchema() {
+ IgniteTable table = TestBuilders.table()
+ .name("T")
+ .addColumn("C", NativeTypes.INT32)
+ .distribution(IgniteDistributions.single())
+ .build();
+
+ return new IgniteSchema("PUBLIC", 0, List.of(table));
+ }
+
+ private static PrepareService createPlannerService() {
+ PrepareService service = new PrepareServiceImpl("test", 1_000,
CaffeineCacheFactory.INSTANCE,
+ mock(DdlSqlToCommandConverter.class), 5_000,
mock(MetricManager.class));
+
+ createdServices.add(service);
+
+ service.start();
+
+ return service;
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java
index 5d90c02e19..ce4876c61e 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/EmptyCacheFactory.java
@@ -56,7 +56,7 @@ public class EmptyCacheFactory implements CacheFactory {
@Override
public V get(K key, Function<? super K, ? extends V> mappingFunction) {
- return null;
+ return mappingFunction.apply(key);
}
@Override
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementCheckerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementCheckerTest.java
index 67567dcc87..6cc74a1ae7 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementCheckerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementCheckerTest.java
@@ -140,7 +140,7 @@ public class StatementCheckerTest extends
BaseIgniteAbstractTest {
RexLiteral lit = rexBuilder.makeLiteral("1");
- IgniteValues values = new IgniteValues(Commons.cluster(), rowType,
ImmutableList.<ImmutableList<RexLiteral>>builder()
+ IgniteValues values = new IgniteValues(Commons.emptyCluster(),
rowType, ImmutableList.<ImmutableList<RexLiteral>>builder()
.add(ImmutableList.of(lit))
.add(ImmutableList.of(lit)).build(),
RelTraitSet.createEmpty());