This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0080ae950ff IGNITE-20428 SQL Calcite: fix query freezes when
partitions are set. (#10935)
0080ae950ff is described below
commit 0080ae950ff735ddfdec2477589ab9b659427ded
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Mon Sep 18 14:38:31 2023 +0300
IGNITE-20428 SQL Calcite: fix query freezes when partitions are set.
(#10935)
---
.../processors/query/calcite/RootQuery.java | 7 ++-
.../query/calcite/exec/ExecutionServiceImpl.java | 38 ++++--------
.../calcite/prepare/AbstractMultiStepPlan.java | 72 ++++++----------------
.../query/calcite/prepare/ExecutionPlan.java | 49 ++++++++++++++-
.../query/calcite/prepare/MappingQueryContext.java | 13 +++-
.../query/calcite/prepare/MultiStepPlan.java | 24 +-------
.../processors/query/calcite/util/Commons.java | 6 +-
.../QueryWithPartitionsIntegrationTest.java | 17 ++---
.../query/calcite/planner/PlannerTest.java | 11 ++--
9 files changed, 113 insertions(+), 124 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index cd6290e9495..8a1f1c20555 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -46,8 +46,9 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
import
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadata;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
-import
org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
import
org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.running.TrackableQuery;
@@ -193,14 +194,14 @@ public class RootQuery<RowT> extends Query<RowT>
implements TrackableQuery {
/**
* Starts execution phase for the query and setup remote fragments.
*/
- public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT>
root) {
+ public void run(ExecutionContext<RowT> ctx, ExecutionPlan plan,
FieldsMetadata metadata, Node<RowT> root) {
synchronized (mux) {
if (state == QueryState.CLOSED)
throw queryCanceledException();
planningTime = U.currentTimeMillis() - startTs;
- RootNode<RowT> rootNode = new RootNode<>(ctx,
plan.fieldsMetadata().rowType(), this::tryClose);
+ RootNode<RowT> rootNode = new RootNode<>(ctx, metadata.rowType(),
this::tryClose);
rootNode.register(root);
addFragment(new
RunningFragment<>(F.first(plan.fragments()).root(), rootNode, ctx));
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index e7d1a594b9b..38667c7c25a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -76,15 +76,14 @@ import
org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
import
org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
import
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import
org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException;
import
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
import org.apache.ignite.internal.processors.query.calcite.prepare.DdlPlan;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
import
org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadataImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
@@ -561,22 +560,11 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
) {
qry.mapping();
- MappingQueryContext mapCtx = Commons.mapContext(locNodeId,
topologyVersion(), qry.context().isLocal());
+ MappingQueryContext mapCtx = Commons.mapContext(locNodeId,
topologyVersion(), qry.context());
- plan.init(mappingSvc, mapCtx);
+ ExecutionPlan execPlan = plan.init(mappingSvc, mapCtx);
- List<Fragment> fragments = plan.fragments();
-
- if (!F.isEmpty(qry.context().partitions())) {
- fragments = Commons.transform(fragments, f -> {
- try {
- return f.filterByPartitions(qry.context().partitions());
- }
- catch (ColocationMappingException e) {
- throw new FragmentMappingException("Failed to calculate
physical distribution", f, f.root(), e);
- }
- });
- }
+ List<Fragment> fragments = execPlan.fragments();
// Local execution
Fragment fragment = F.first(fragments);
@@ -584,13 +572,13 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
if (U.assertionsEnabled()) {
assert fragment != null;
- FragmentMapping mapping = plan.mapping(fragment);
+ FragmentMapping mapping = execPlan.mapping(fragment);
assert mapping != null;
List<UUID> nodes = mapping.nodeIds();
- assert nodes != null && nodes.size() == 1 &&
F.first(nodes).equals(localNodeId())
+ assert nodes != null && (nodes.size() == 1 &&
F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
: "nodes=" + nodes + ", localNode=" + localNodeId();
}
@@ -603,9 +591,9 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
FragmentDescription fragmentDesc = new FragmentDescription(
fragment.fragmentId(),
- plan.mapping(fragment),
- plan.target(fragment),
- plan.remotes(fragment));
+ execPlan.mapping(fragment),
+ execPlan.target(fragment),
+ execPlan.remotes(fragment));
ExecutionContext<Row> ectx = new ExecutionContext<>(
qry.context(),
@@ -624,7 +612,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(),
mailboxRegistry(),
exchangeService(), failureProcessor()).go(fragment.root());
- qry.run(ectx, plan, node);
+ qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
Map<UUID, Long> fragmentsPerNode = fragments.stream()
.skip(1)
@@ -636,9 +624,9 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
fragment = fragments.get(i);
fragmentDesc = new FragmentDescription(
fragment.fragmentId(),
- plan.mapping(fragment),
- plan.target(fragment),
- plan.remotes(fragment));
+ execPlan.mapping(fragment),
+ execPlan.target(fragment),
+ execPlan.remotes(fragment));
Throwable ex = null;
byte[] parametersMarshalled = null;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 0184ba91f86..f6a8be350ed 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -17,18 +17,12 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
import
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
/**
@@ -44,9 +38,6 @@ public abstract class AbstractMultiStepPlan extends
AbstractQueryPlan implements
/** */
protected final QueryTemplate queryTemplate;
- /** */
- protected ExecutionPlan executionPlan;
-
/** */
private final String textPlan;
@@ -66,11 +57,6 @@ public abstract class AbstractMultiStepPlan extends
AbstractQueryPlan implements
this.paramsMetadata = paramsMetadata;
}
- /** {@inheritDoc} */
- @Override public List<Fragment> fragments() {
- return Objects.requireNonNull(executionPlan).fragments();
- }
-
/** {@inheritDoc} */
@Override public FieldsMetadata fieldsMetadata() {
return fieldsMetadata;
@@ -82,47 +68,25 @@ public abstract class AbstractMultiStepPlan extends
AbstractQueryPlan implements
}
/** {@inheritDoc} */
- @Override public FragmentMapping mapping(Fragment fragment) {
- return fragment.mapping();
- }
-
- /** {@inheritDoc} */
- @Override public ColocationGroup target(Fragment fragment) {
- if (fragment.rootFragment())
- return null;
-
- IgniteSender sender = (IgniteSender)fragment.root();
- return
mapping(sender.targetFragmentId()).findGroup(sender.exchangeId());
- }
-
- /** {@inheritDoc} */
- @Override public Map<Long, List<UUID>> remotes(Fragment fragment) {
- List<IgniteReceiver> remotes = fragment.remotes();
+ @Override public ExecutionPlan init(MappingService mappingService,
MappingQueryContext ctx) {
+ ExecutionPlan executionPlan0 = queryTemplate.map(mappingService, ctx);
- if (F.isEmpty(remotes))
- return null;
+ if (!F.isEmpty(ctx.partitions()) &&
!F.isEmpty(executionPlan0.fragments())) {
+ List<Fragment> fragments = executionPlan0.fragments();
- HashMap<Long, List<UUID>> res = U.newHashMap(remotes.size());
+ fragments = Commons.transform(fragments, f -> {
+ try {
+ return f.filterByPartitions(ctx.partitions());
+ }
+ catch (ColocationMappingException e) {
+ throw new FragmentMappingException("Failed to calculate
physical distribution", f, f.root(), e);
+ }
+ });
- for (IgniteReceiver remote : remotes)
- res.put(remote.exchangeId(),
mapping(remote.sourceFragmentId()).nodeIds());
-
- return res;
- }
+ return new ExecutionPlan(executionPlan0.topologyVersion(),
fragments);
+ }
- /** {@inheritDoc} */
- @Override public void init(MappingService mappingService,
MappingQueryContext ctx) {
- executionPlan = queryTemplate.map(mappingService, ctx);
- }
-
- /** */
- private FragmentMapping mapping(long fragmentId) {
- return Objects.requireNonNull(executionPlan).fragments().stream()
- .filter(f -> f.fragmentId() == fragmentId)
- .findAny().orElseThrow(() -> new IllegalStateException("Cannot
find fragment with given ID. [" +
- "fragmentId=" + fragmentId + ", " +
- "fragments=" + fragments() + "]"))
- .mapping();
+ return executionPlan0;
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
index 75fc70d741b..5a25502ea27 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java
@@ -17,14 +17,23 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import com.google.common.collect.ImmutableList;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
*
*/
-class ExecutionPlan {
+public class ExecutionPlan {
/** */
private final AffinityTopologyVersion ver;
@@ -46,4 +55,42 @@ class ExecutionPlan {
public List<Fragment> fragments() {
return fragments;
}
+
+ /** */
+ public FragmentMapping mapping(Fragment fragment) {
+ return fragment.mapping();
+ }
+
+ /** */
+ public ColocationGroup target(Fragment fragment) {
+ if (fragment.rootFragment())
+ return null;
+
+ IgniteSender sender = (IgniteSender)fragment.root();
+ return
mapping(sender.targetFragmentId()).findGroup(sender.exchangeId());
+ }
+
+ /** */
+ public Map<Long, List<UUID>> remotes(Fragment fragment) {
+ List<IgniteReceiver> remotes = fragment.remotes();
+
+ if (F.isEmpty(remotes))
+ return null;
+
+ HashMap<Long, List<UUID>> res = U.newHashMap(remotes.size());
+
+ for (IgniteReceiver remote : remotes)
+ res.put(remote.exchangeId(),
mapping(remote.sourceFragmentId()).nodeIds());
+
+ return res;
+ }
+
+ /** */
+ private FragmentMapping mapping(long fragmentId) {
+ return fragments().stream()
+ .filter(f -> f.fragmentId() == fragmentId)
+ .findAny().orElseThrow(() -> new IllegalStateException("Cannot
find fragment with given ID. [" +
+ "fragmentId=" + fragmentId + ", " + "fragments=" + fragments()
+ "]"))
+ .mapping();
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
index 2bdd1a0c913..f8b6f43128f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
@@ -41,16 +41,20 @@ public class MappingQueryContext {
/** */
private final boolean isLocal;
+ /** */
+ private final int[] parts;
+
/** */
public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer)
{
- this(locNodeId, topVer, false);
+ this(locNodeId, topVer, false, null);
}
/** */
- public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer,
boolean isLocal) {
+ public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer,
boolean isLocal, int[] parts) {
this.locNodeId = locNodeId;
this.topVer = topVer;
this.isLocal = isLocal;
+ this.parts = parts;
}
/** */
@@ -68,6 +72,11 @@ public class MappingQueryContext {
return isLocal;
}
+ /** */
+ public int[] partitions() {
+ return parts;
+ }
+
/** Creates a cluster. */
RelOptCluster cluster() {
if (cluster == null) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
index e738d9b92a0..692f713c939 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
@@ -17,22 +17,12 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
import
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
/**
* Regular query or DML
*/
public interface MultiStepPlan extends QueryPlan {
- /**
- * @return Query fragments.
- */
- List<Fragment> fragments();
-
/**
* @return Fields metadata.
*/
@@ -43,24 +33,12 @@ public interface MultiStepPlan extends QueryPlan {
*/
FieldsMetadata paramsMetadata();
- /**
- * @param fragment Fragment.
- * @return Mapping for a given fragment.
- */
- FragmentMapping mapping(Fragment fragment);
-
- /** */
- ColocationGroup target(Fragment fragment);
-
- /** */
- Map<Long, List<UUID>> remotes(Fragment fragment);
-
/**
* Inits query fragments.
*
* @param ctx Planner context.
*/
- void init(MappingService mappingService, MappingQueryContext ctx);
+ ExecutionPlan init(MappingService mappingService, MappingQueryContext ctx);
/**
* @return Text representation of query plan
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index fcafc98b301..4476ebc3793 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -444,11 +444,11 @@ public final class Commons {
/** */
public static MappingQueryContext mapContext(UUID locNodeId,
AffinityTopologyVersion topVer) {
- return mapContext(locNodeId, topVer, false);
+ return new MappingQueryContext(locNodeId, topVer, false, null);
}
/** */
- public static MappingQueryContext mapContext(UUID locNodeId,
AffinityTopologyVersion topVer, boolean isLocal) {
- return new MappingQueryContext(locNodeId, topVer, isLocal);
+ public static MappingQueryContext mapContext(UUID locNodeId,
AffinityTopologyVersion topVer, BaseQueryContext ctx) {
+ return new MappingQueryContext(locNodeId, topVer, ctx.isLocal(),
ctx.partitions());
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
index 16655187dc8..660ee91dc3e 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import org.apache.calcite.util.Pair;
import org.apache.ignite.IgniteCache;
@@ -53,12 +52,16 @@ public class QueryWithPartitionsIntegrationTest extends
AbstractBasicIntegration
public boolean local;
/** */
- @Parameterized.Parameters(name = "local = {0}")
+ @Parameterized.Parameter(1)
+ public int partSz;
+
+ /** */
+ @Parameterized.Parameters(name = "local = {0}, partSz = {1}")
public static List<Object[]> parameters() {
- return ImmutableList.of(
- new Object[]{true},
- new Object[]{false}
- );
+ return Stream.of(true, false)
+ .flatMap(isLocal -> Stream.of(1, 2, 5, 10, 20)
+ .map(i -> new Object[]{isLocal, i}))
+ .collect(Collectors.toList());
}
/** {@inheritDoc} */
@@ -67,7 +70,7 @@ public class QueryWithPartitionsIntegrationTest extends
AbstractBasicIntegration
List<Integer> parts0 = IntStream.range(0,
1024).boxed().collect(Collectors.toList());
Collections.shuffle(parts0);
- parts = Ints.toArray(parts0.subList(0, 20));
+ parts = Ints.toArray(parts0.subList(0, partSz));
log.info("Running tests with parts=" + Arrays.toString(parts));
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index aa4cd3e95f2..597869d237c 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -52,6 +52,7 @@ import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
import
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
import
org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
@@ -314,7 +315,7 @@ public class PlannerTest extends AbstractPlannerTest {
IgniteRel phys = physicalPlan(ctx);
- MultiStepPlan plan = splitPlan(phys);
+ ExecutionPlan plan = splitPlan(phys);
List<Fragment> fragments = plan.fragments();
assertEquals(2, fragments.size());
@@ -359,16 +360,14 @@ public class PlannerTest extends AbstractPlannerTest {
}
/** */
- private MultiStepPlan splitPlan(IgniteRel phys) {
+ private ExecutionPlan splitPlan(IgniteRel phys) {
assertNotNull(phys);
MultiStepPlan plan = new MultiStepQueryPlan(null, null, new
QueryTemplate(new Splitter().go(phys)), null, null);
assertNotNull(plan);
- plan.init(this::intermediateMapping,
Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
-
- return plan;
+ return plan.init(this::intermediateMapping,
Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE));
}
/**
@@ -378,7 +377,7 @@ public class PlannerTest extends AbstractPlannerTest {
BaseQueryContext qctx,
PlanningContext ctx,
TestIoManager mgr,
- MultiStepPlan plan,
+ ExecutionPlan plan,
Fragment fragment,
UUID qryId,
UUID nodeId