This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 255e289 Turn implementor to visitor
255e289 is described below
commit 255e289a4e1802de162afb0eb90827d38691af73
Author: Igor Seliverstov <[email protected]>
AuthorDate: Thu Dec 12 16:38:21 2019 +0300
Turn implementor to visitor
---
.../query/calcite/cluster/MappingServiceImpl.java | 6 +-
.../query/calcite/exchange/ExchangeProcessor.java | 7 +-
.../{ImplementorImpl.java => Implementor.java} | 26 +++----
.../query/calcite/rel/IgniteExchange.java | 4 +-
.../processors/query/calcite/rel/IgniteFilter.java | 4 +-
.../processors/query/calcite/rel/IgniteJoin.java | 4 +-
.../query/calcite/rel/IgniteProject.java | 4 +-
.../query/calcite/rel/IgniteReceiver.java | 4 +-
.../processors/query/calcite/rel/IgniteRel.java | 2 +-
.../query/calcite/rel/IgniteRelShuttle.java | 76 --------------------
.../{Implementor.java => IgniteRelVisitor.java} | 22 +++---
.../processors/query/calcite/rel/IgniteSender.java | 4 +-
.../query/calcite/rel/IgniteTableScan.java | 4 +-
.../serialize/relation/RelToGraphConverter.java | 27 +++----
.../query/calcite/splitter/Splitter.java | 83 +++++++++++++++++-----
.../processors/query/calcite/util/Commons.java | 9 +++
.../query/calcite/CalciteQueryProcessorTest.java | 55 +++++++-------
.../query/calcite/exchange/OutboxTest.java | 18 ++++-
18 files changed, 178 insertions(+), 181 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
index 66c7a74..3e91219 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
@@ -55,12 +55,12 @@ public class MappingServiceImpl implements MappingService {
}
@Override public NodesMapping distributed(int cacheId,
AffinityTopologyVersion topVer) {
- GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
+ GridCacheContext<?,?> cctx =
ctx.cache().context().cacheContext(cacheId);
return cctx.isReplicated() ? replicatedLocation(cctx, topVer) :
partitionedLocation(cctx, topVer);
}
- private NodesMapping partitionedLocation(GridCacheContext cctx,
AffinityTopologyVersion topVer) {
+ private NodesMapping partitionedLocation(GridCacheContext<?,?> cctx,
AffinityTopologyVersion topVer) {
byte flags = NodesMapping.HAS_PARTITIONED_CACHES;
List<List<ClusterNode>> assignments =
cctx.affinity().assignments(topVer);
@@ -95,7 +95,7 @@ public class MappingServiceImpl implements MappingService {
return new NodesMapping(null, res, flags);
}
- private NodesMapping replicatedLocation(GridCacheContext cctx,
AffinityTopologyVersion topVer) {
+ private NodesMapping replicatedLocation(GridCacheContext<?,?> cctx,
AffinityTopologyVersion topVer) {
byte flags = NodesMapping.HAS_REPLICATED_CACHES;
if (cctx.config().getNodeFilter() != null)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
index 692b4fa..6a6810a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
@@ -24,7 +24,10 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
*
*/
public interface ExchangeProcessor {
- void register(Outbox outbox);
- void unregister(Outbox outbox);
+ <T> Outbox<T> register(Outbox<T> outbox);
+ <T> void unregister(Outbox<T> outbox);
+ Inbox register(Inbox inbox);
+ void unregister(Inbox inbox);
void send(GridCacheVersion queryId, long exchangeId, UUID nodeId, int
batchId, List<?> rows);
+ void acknowledge(GridCacheVersion queryId, long exchangeId, UUID nodeId,
int batchId);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
similarity index 87%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
index 2a65887..cc8ed64 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
@@ -37,9 +37,9 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.Implementor;
import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
import
org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
@@ -51,13 +51,13 @@ import static
org.apache.ignite.internal.processors.query.calcite.prepare.Contex
/**
*
*/
-public class ImplementorImpl implements Implementor<Node<Object[]>>,
RelOp<IgniteRel, Node<Object[]>> {
+public class Implementor implements IgniteRelVisitor<Node<Object[]>>,
RelOp<IgniteRel, Node<Object[]>> {
private final PlannerContext ctx;
private final DataContext root;
private final ScalarFactory factory;
private Deque<Sink<Object[]>> stack;
- public ImplementorImpl(DataContext root) {
+ public Implementor(DataContext root) {
this.root = root;
ctx = PLANNER_CONTEXT.get(root);
@@ -65,7 +65,7 @@ public class ImplementorImpl implements
Implementor<Node<Object[]>>, RelOp<Ignit
stack = new ArrayDeque<>();
}
- @Override public Node<Object[]> implement(IgniteSender rel) {
+ @Override public Node<Object[]> visit(IgniteSender rel) {
assert stack.isEmpty();
GridCacheVersion id = QUERY_ID.get(root);
@@ -85,7 +85,7 @@ public class ImplementorImpl implements
Implementor<Node<Object[]>>, RelOp<Ignit
return res;
}
- @Override public Node<Object[]> implement(IgniteFilter rel) {
+ @Override public Node<Object[]> visit(IgniteFilter rel) {
assert !stack.isEmpty();
FilterNode res = new FilterNode(stack.pop(),
factory.filterPredicate(root, rel.getCondition(), rel.getRowType()));
@@ -97,7 +97,7 @@ public class ImplementorImpl implements
Implementor<Node<Object[]>>, RelOp<Ignit
return res;
}
- @Override public Node<Object[]> implement(IgniteProject rel) {
+ @Override public Node<Object[]> visit(IgniteProject rel) {
assert !stack.isEmpty();
ProjectNode res = new ProjectNode(stack.pop(),
factory.projectExpression(root, rel.getProjects(),
rel.getInput().getRowType()));
@@ -109,7 +109,7 @@ public class ImplementorImpl implements
Implementor<Node<Object[]>>, RelOp<Ignit
return res;
}
- @Override public Node<Object[]> implement(IgniteJoin rel) {
+ @Override public Node<Object[]> visit(IgniteJoin rel) {
assert !stack.isEmpty();
JoinNode res = new JoinNode(stack.pop(), factory.joinExpression(root,
rel.getCondition(), rel.getLeft().getRowType(), rel.getRight().getRowType()));
@@ -122,7 +122,7 @@ public class ImplementorImpl implements
Implementor<Node<Object[]>>, RelOp<Ignit
return res;
}
- @Override public Node<Object[]> implement(IgniteTableScan rel) {
+ @Override public Node<Object[]> visit(IgniteTableScan rel) {
assert !stack.isEmpty();
Iterable<Object[]> source =
rel.getTable().unwrap(ScannableTable.class).scan(root);
@@ -130,15 +130,15 @@ public class ImplementorImpl implements
Implementor<Node<Object[]>>, RelOp<Ignit
return new ScanNode(stack.pop(), source);
}
- @Override public Node<Object[]> implement(IgniteReceiver rel) {
+ @Override public Node<Object[]> visit(IgniteReceiver rel) {
throw new AssertionError(); // TODO
}
- @Override public Node<Object[]> implement(IgniteExchange rel) {
+ @Override public Node<Object[]> visit(IgniteExchange rel) {
throw new AssertionError();
}
- @Override public Node<Object[]> implement(IgniteRel other) {
+ @Override public Node<Object[]> visit(IgniteRel other) {
throw new AssertionError();
}
@@ -146,7 +146,7 @@ public class ImplementorImpl implements
Implementor<Node<Object[]>>, RelOp<Ignit
if (rel.getConvention() != IgniteConvention.INSTANCE)
throw new IllegalStateException("INTERPRETABLE is required.");
- return ((IgniteRel) rel).implement(this);
+ return ((IgniteRel) rel).accept(this);
}
private List<Source> sources(List<RelNode> rels) {
@@ -161,7 +161,7 @@ public class ImplementorImpl implements
Implementor<Node<Object[]>>, RelOp<Ignit
@Override public Node<Object[]> go(IgniteRel rel) {
if (rel instanceof IgniteSender)
- return implement((IgniteSender) rel);
+ return visit((IgniteSender) rel);
ConsumerNode res = new ConsumerNode();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index f932020..9d01fbb 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -34,7 +34,7 @@ public class IgniteExchange extends Exchange implements
IgniteRel {
return new IgniteExchange(getCluster(), traitSet, newInput,
newDistribution);
}
- @Override public <T> T implement(Implementor<T> implementor) {
- return implementor.implement(this);
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 2a0594e..c408769 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -34,7 +34,7 @@ public class IgniteFilter extends Filter implements IgniteRel
{
return new IgniteFilter(getCluster(), traitSet, input, condition);
}
- @Override public <T> T implement(Implementor<T> implementor) {
- return implementor.implement(this);
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
index 344f4a2..b1ad44b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
@@ -37,7 +37,7 @@ public class IgniteJoin extends Join implements IgniteRel {
return new IgniteJoin(getCluster(), traitSet, left, right, condition,
variablesSet, joinType);
}
- @Override public <T> T implement(Implementor<T> implementor) {
- return implementor.implement(this);
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index 03fe6ca..42d2af2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -36,7 +36,7 @@ public class IgniteProject extends Project implements
IgniteRel {
return new IgniteProject(getCluster(), traitSet, input, projects,
rowType);
}
- @Override public <T> T implement(Implementor<T> implementor) {
- return implementor.implement(this);
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
index 60d5477..545c6e8 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
@@ -43,8 +43,8 @@ public class IgniteReceiver extends AbstractRelNode
implements IgniteRel {
return new IgniteReceiver(getCluster(), traitSet, rowType, source);
}
- @Override public <T> T implement(Implementor<T> implementor) {
- return implementor.implement(this);
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
}
public RelSource source() {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index b5d876f..8e1567a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -22,5 +22,5 @@ import org.apache.calcite.rel.RelNode;
*
*/
public interface IgniteRel extends RelNode {
- <T> T implement(Implementor<T> implementor);
+ <T> T accept(IgniteRelVisitor<T> visitor);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
deleted file mode 100644
index 7d3efc9..0000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rel;
-
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelShuttleImpl;
-
-/**
- *
- */
-public class IgniteRelShuttle extends RelShuttleImpl {
- public RelNode visit(IgniteExchange rel) {
- return visitChild(rel, 0, rel.getInput());
- }
-
- public RelNode visit(IgniteFilter rel) {
- return visitChild(rel, 0, rel.getInput());
- }
-
- public RelNode visit(IgniteProject rel) {
- return visitChild(rel, 0, rel.getInput());
- }
-
- public RelNode visit(IgniteReceiver rel) {
- return rel;
- }
-
- public RelNode visit(IgniteSender rel) {
- return visitChild(rel, 0, rel.getInput());
- }
-
- public RelNode visit(IgniteTableScan rel) {
- return rel;
- }
-
- public RelNode visit(IgniteJoin rel) {
- return visitChildren(rel);
- }
-
- @Override public RelNode visit(RelNode rel) {
- if (rel instanceof IgniteExchange)
- return visit((IgniteExchange)rel);
- if (rel instanceof IgniteFilter)
- return visit((IgniteFilter)rel);
- if (rel instanceof IgniteProject)
- return visit((IgniteProject)rel);
- if (rel instanceof IgniteReceiver)
- return visit((IgniteReceiver)rel);
- if (rel instanceof IgniteSender)
- return visit((IgniteSender)rel);
- if (rel instanceof IgniteTableScan)
- return visit((IgniteTableScan)rel);
- if (rel instanceof IgniteJoin)
- return visit((IgniteJoin)rel);
-
- return visitOther(rel);
- }
-
- protected RelNode visitOther(RelNode rel) {
- return super.visit(rel);
- }
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
similarity index 63%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 50da38c..6c6ffa1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -19,24 +19,20 @@ package
org.apache.ignite.internal.processors.query.calcite.rel;
/**
*
*/
-public interface Implementor<T> extends RelOp<IgniteRel, T> {
- T implement(IgniteSender rel);
+public interface IgniteRelVisitor<T> {
+ T visit(IgniteSender rel);
- T implement(IgniteFilter rel);
+ T visit(IgniteFilter rel);
- T implement(IgniteProject rel);
+ T visit(IgniteProject rel);
- T implement(IgniteJoin rel);
+ T visit(IgniteJoin rel);
- T implement(IgniteTableScan rel);
+ T visit(IgniteTableScan rel);
- T implement(IgniteReceiver rel);
+ T visit(IgniteReceiver rel);
- T implement(IgniteExchange rel);
+ T visit(IgniteExchange rel);
- T implement(IgniteRel other);
-
- @Override default T go(IgniteRel rel) {
- return rel.implement(this);
- }
+ T visit(IgniteRel other);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
index e50b7bf..4763c38 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
@@ -43,8 +43,8 @@ public class IgniteSender extends SingleRel implements
IgniteRel {
return new IgniteSender(getCluster(), traitSet, sole(inputs), target);
}
- @Override public <T> T implement(Implementor<T> implementor) {
- return implementor.implement(this);
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
}
public RelTarget target() {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 5f01899..59af858 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -35,7 +35,7 @@ public class IgniteTableScan extends TableScan implements
IgniteRel {
return this;
}
- @Override public <T> T implement(Implementor<T> implementor) {
- return implementor.implement(this);
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
index d935255..56d77aa 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
@@ -26,6 +26,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
@@ -52,44 +53,44 @@ public class RelToGraphConverter implements
RelOp<IgniteRel, RelGraph> {
}
}
- private final class Implementor implements
org.apache.ignite.internal.processors.query.calcite.rel.Implementor<Item> {
- @Override public Item implement(IgniteFilter rel) {
+ private final class ItemTranslator implements IgniteRelVisitor<Item> {
+ @Override public Item visit(IgniteFilter rel) {
return new Item(graph.addNode(curParent, FilterNode.create(rel,
rexTranslator)), Commons.cast(rel.getInputs()));
}
- @Override public Item implement(IgniteJoin rel) {
+ @Override public Item visit(IgniteJoin rel) {
return new Item(graph.addNode(curParent, JoinNode.create(rel,
rexTranslator)), Commons.cast(rel.getInputs()));
}
- @Override public Item implement(IgniteProject rel) {
+ @Override public Item visit(IgniteProject rel) {
return new Item(graph.addNode(curParent, ProjectNode.create(rel,
rexTranslator)), Commons.cast(rel.getInputs()));
}
- @Override public Item implement(IgniteTableScan rel) {
+ @Override public Item visit(IgniteTableScan rel) {
return new Item(graph.addNode(curParent,
TableScanNode.create(rel)), Commons.cast(rel.getInputs()));
}
- @Override public Item implement(IgniteReceiver rel) {
+ @Override public Item visit(IgniteReceiver rel) {
return new Item(graph.addNode(curParent,
ReceiverNode.create(rel)), Collections.emptyList());
}
- @Override public Item implement(IgniteSender rel) {
+ @Override public Item visit(IgniteSender rel) {
return new Item(graph.addNode(curParent, SenderNode.create(rel)),
Commons.cast(rel.getInputs()));
}
- @Override public Item implement(IgniteExchange rel) {
- throw new UnsupportedOperationException();
+ @Override public Item visit(IgniteRel rel) {
+ return rel.accept(this);
}
- @Override public Item implement(IgniteRel other) {
- throw new AssertionError();
+ @Override public Item visit(IgniteExchange rel) {
+ throw new AssertionError("Unexpected node: " + rel);
}
}
@Override public RelGraph go(IgniteRel root) {
graph = new RelGraph();
- Implementor implementor = new Implementor();
+ ItemTranslator itemTranslator = new ItemTranslator();
Deque<Item> stack = new ArrayDeque<>();
stack.push(new Item(-1, F.asList(root)));
@@ -99,7 +100,7 @@ public class RelToGraphConverter implements RelOp<IgniteRel,
RelGraph> {
curParent = item.parentId;
for (IgniteRel child : item.children) {
- stack.push(implementor.go(child));
+ stack.push(itemTranslator.visit(child));
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index f2ea67a..8dd3678 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -19,53 +19,104 @@ package
org.apache.ignite.internal.processors.query.calcite.splitter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelShuttle;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
+
+import static
org.apache.ignite.internal.processors.query.calcite.util.Commons.igniteRel;
/**
*
*/
-public class Splitter extends IgniteRelShuttle {
+public class Splitter implements IgniteRelVisitor<IgniteRel>, RelOp<IgniteRel,
QueryPlan> {
private List<Fragment> fragments;
- public QueryPlan go(IgniteRel root) {
+ @Override public QueryPlan go(IgniteRel root) {
fragments = new ArrayList<>();
- fragments.add(new Fragment(root.accept(this)));
+ fragments.add(new Fragment(visit(root)));
Collections.reverse(fragments);
return new QueryPlan(fragments);
}
- @Override public RelNode visit(IgniteExchange rel) {
- RelNode input = rel.getInput();
- RelOptCluster cluster = input.getCluster();
- RelTraitSet inputTraits = input.getTraitSet();
- RelTraitSet outputTraits = rel.getTraitSet();
+ @Override public IgniteRel visit(IgniteExchange rel) {
+ RelOptCluster cluster = rel.getCluster();
+ RelTraitSet outTraits = rel.getTraitSet();
+
+ IgniteRel input = visit(igniteRel(rel.getInput()));
+ RelTraitSet inTraits = input.getTraitSet();
+
+ Fragment fragment = new Fragment(new IgniteSender(cluster, inTraits,
input));
- IgniteSender sender = new IgniteSender(cluster, inputTraits,
visit(input));
- Fragment fragment = new Fragment(sender);
fragments.add(fragment);
- return new IgniteReceiver(cluster, outputTraits, sender.getRowType(),
fragment);
+ return new IgniteReceiver(cluster, outTraits, input.getRowType(),
fragment);
+ }
+
+ @Override public IgniteRel visit(IgniteFilter rel) {
+ return visitChild(rel);
}
- @Override public RelNode visit(IgniteReceiver rel) {
+ @Override public IgniteRel visit(IgniteProject rel) {
+ return visitChild(rel);
+ }
+
+ @Override public IgniteRel visit(IgniteJoin rel) {
+ return visitChildren(rel);
+ }
+
+ @Override public IgniteRel visit(IgniteTableScan rel) {
+ return rel;
+ }
+
+ @Override public IgniteRel visit(IgniteRel rel) {
+ return rel.accept(this);
+ }
+
+ @Override public IgniteRel visit(IgniteReceiver rel) {
throw new AssertionError("An attempt to split an already split task.");
}
- @Override public RelNode visit(IgniteSender rel) {
+ @Override public IgniteRel visit(IgniteSender rel) {
throw new AssertionError("An attempt to split an already split task.");
}
- @Override protected RelNode visitOther(RelNode rel) {
- throw new AssertionError("Unexpected node: " + rel);
+ private IgniteRel visitChildren(IgniteRel rel) {
+ for (Ord<RelNode> input : Ord.zip(rel.getInputs()))
+ visitChild(rel, input.i, igniteRel(input.e));
+
+ return rel;
+ }
+
+ /**
+ * Visits a single child of a parent.
+ */
+ private <T extends SingleRel & IgniteRel> IgniteRel visitChild(T rel) {
+ visitChild(rel, 0, igniteRel(rel.getInput()));
+
+ return rel;
+ }
+
+ /**
+ * Visits a particular child of a parent.
+ */
+ private void visitChild(IgniteRel parent, int i, IgniteRel child) {
+ IgniteRel child2 = visit(child);
+ if (child2 != child)
+ parent.replaceInput(i, child2);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 49d6091..1eb5c76 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -35,6 +35,8 @@ import
org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryContext;
import
org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.NotNull;
@@ -133,4 +135,11 @@ public final class Commons {
public static PlannerContext plannerContext(Context ctx) {
return Objects.requireNonNull(ctx.unwrap(PlannerContext.class));
}
+
+ public static IgniteRel igniteRel(RelNode rel) {
+ if (rel.getConvention() != IgniteConvention.INSTANCE)
+ throw new AssertionError("Unexpected node: " + rel);
+
+ return (IgniteRel) rel;
+ }
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index ee4c22e..f0afa1a 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -40,7 +40,7 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.calcite.exec.ConsumerNode;
-import
org.apache.ignite.internal.processors.query.calcite.exec.ImplementorImpl;
+import org.apache.ignite.internal.processors.query.calcite.exec.Implementor;
import org.apache.ignite.internal.processors.query.calcite.exec.Node;
import
org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
@@ -52,8 +52,6 @@ import
org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerType;
import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.Implementor;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import
org.apache.ignite.internal.processors.query.calcite.serialize.expression.Expression;
@@ -65,7 +63,6 @@ import
org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.type.RowType;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
@@ -75,6 +72,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import static
org.apache.ignite.internal.processors.query.calcite.util.Commons.igniteRel;
+
/**
*
*/
@@ -178,7 +177,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(query);
@@ -217,7 +216,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(query);
@@ -254,7 +253,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(query);
@@ -293,7 +292,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(query);
@@ -340,7 +339,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(query);
@@ -400,7 +399,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(planner);
@@ -433,7 +432,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
plan.init(ctx);
- RelGraph graph = new RelToGraphConverter().go((IgniteRel)
plan.fragments().get(1).root());
+ RelGraph graph = new
RelToGraphConverter().go(igniteRel(plan.fragments().get(1).root()));
convertedBytes = new JdkMarshaller().marshal(graph);
@@ -481,7 +480,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(planner);
@@ -511,7 +510,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
assertNotNull(plan);
@@ -543,7 +542,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(planner);
@@ -569,9 +568,9 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
Map<String, Object> params =
ctx.query().params(F.asMap(ContextValue.QUERY_ID.valueName(), new
GridCacheVersion()));
- Implementor<Node<Object[]>> implementor = new ImplementorImpl(new
DataContextImpl(params, ctx));
+ Implementor implementor = new Implementor(new
DataContextImpl(params, ctx));
- Node<Object[]> exec = implementor.go((IgniteRel) phys);
+ Node<Object[]> exec = implementor.go(igniteRel(phys));
assertNotNull(exec);
@@ -634,7 +633,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(planner);
@@ -664,7 +663,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
assertNotNull(plan);
@@ -725,7 +724,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(planner);
@@ -755,7 +754,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
assertNotNull(plan);
@@ -816,7 +815,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(planner);
@@ -846,7 +845,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
assertNotNull(plan);
@@ -900,7 +899,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(planner);
@@ -930,7 +929,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
assertNotNull(plan);
@@ -991,7 +990,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(planner);
@@ -1021,7 +1020,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
assertNotNull(plan);
@@ -1082,7 +1081,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
assertNotNull(planner);
- Query query = Commons.plannerContext(ctx).query();
+ Query query = ctx.query();
assertNotNull(planner);
@@ -1112,7 +1111,7 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
assertNotNull(relRoot);
- QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+ QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
assertNotNull(plan);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
index a08142c..61c8a7d 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
@@ -107,11 +107,13 @@ public class OutboxTest extends GridCommonAbstractTest {
private List<?> lastBatch;
- @Override public void register(Outbox outbox) {
+ @Override public <T> Outbox<T> register(Outbox<T> outbox) {
registered = true;
+
+ return outbox;
}
- @Override public void unregister(Outbox outbox) {
+ @Override public <T> void unregister(Outbox<T> outbox) {
unregistered = true;
}
@@ -120,6 +122,18 @@ public class OutboxTest extends GridCommonAbstractTest {
lastBatch = rows;
}
+
+ @Override public Inbox register(Inbox inbox) {
+ throw new AssertionError();
+ }
+
+ @Override public void unregister(Inbox inbox) {
+ throw new AssertionError();
+ }
+
+ @Override public void acknowledge(GridCacheVersion queryId, long
exchangeId, UUID nodeId, int batchId) {
+ throw new AssertionError();
+ }
}
private static class TestSource implements Source {