Repository: calcite Updated Branches: refs/heads/master 2918b8fe5 -> c0f912ddf
[CALCITE-2127] In Interpreter, allow a node to have more than one consumer Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/7b85d445 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/7b85d445 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/7b85d445 Branch: refs/heads/master Commit: 7b85d445462799926474945c6ab33b1cb8ce33cc Parents: 2918b8f Author: Julian Hyde <[email protected]> Authored: Mon Jan 8 23:21:54 2018 -0800 Committer: Julian Hyde <[email protected]> Committed: Tue Jan 9 23:01:18 2018 -0800 ---------------------------------------------------------------------- .../enumerable/EnumerableInterpretable.java | 10 +- .../calcite/interpreter/AbstractSingleNode.java | 6 +- .../calcite/interpreter/AggregateNode.java | 6 +- .../apache/calcite/interpreter/Bindables.java | 16 +- .../apache/calcite/interpreter/Compiler.java | 70 +++++ .../apache/calcite/interpreter/FilterNode.java | 8 +- .../calcite/interpreter/InterpretableRel.java | 6 +- .../apache/calcite/interpreter/Interpreter.java | 289 ++++++++++++------- .../apache/calcite/interpreter/JoinNode.java | 14 +- .../org/apache/calcite/interpreter/Nodes.java | 32 +- .../apache/calcite/interpreter/ProjectNode.java | 8 +- .../apache/calcite/interpreter/SortNode.java | 4 +- .../calcite/interpreter/TableScanNode.java | 64 ++-- .../apache/calcite/interpreter/UnionNode.java | 6 +- .../apache/calcite/interpreter/ValuesNode.java | 12 +- .../apache/calcite/interpreter/WindowNode.java | 4 +- .../calcite/adapter/druid/DruidQuery.java | 2 +- 17 files changed, 346 insertions(+), 211 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java index ef5af1e..17447bc 100644 --- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java +++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableInterpretable.java @@ -18,9 +18,9 @@ package org.apache.calcite.adapter.enumerable; import org.apache.calcite.DataContext; import org.apache.calcite.avatica.Helper; +import org.apache.calcite.interpreter.Compiler; import org.apache.calcite.interpreter.InterpretableConvention; import org.apache.calcite.interpreter.InterpretableRel; -import org.apache.calcite.interpreter.Interpreter; import org.apache.calcite.interpreter.Node; import org.apache.calcite.interpreter.Row; import org.apache.calcite.interpreter.Sink; @@ -79,7 +79,7 @@ public class EnumerableInterpretable extends ConverterImpl final ArrayBindable arrayBindable = box(bindable); final Enumerable<Object[]> enumerable = arrayBindable.bind(implementor.dataContext); - return new EnumerableNode(enumerable, implementor.interpreter, this); + return new EnumerableNode(enumerable, implementor.compiler, this); } public static Bindable toBindable(Map<String, Object> parameters, @@ -186,10 +186,10 @@ public class EnumerableInterpretable extends ConverterImpl private final Enumerable<Object[]> enumerable; private final Sink sink; - EnumerableNode(Enumerable<Object[]> enumerable, - Interpreter interpreter, EnumerableInterpretable rel) { + EnumerableNode(Enumerable<Object[]> enumerable, Compiler compiler, + EnumerableInterpretable rel) { this.enumerable = enumerable; - this.sink = interpreter.sink(rel); + this.sink = compiler.sink(rel); } public void run() throws InterruptedException { http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java b/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java index d6174ba..0734305 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java +++ b/core/src/main/java/org/apache/calcite/interpreter/AbstractSingleNode.java @@ -28,10 +28,10 @@ abstract class AbstractSingleNode<T extends SingleRel> implements Node { protected final Sink sink; protected final T rel; - AbstractSingleNode(Interpreter interpreter, T rel) { + AbstractSingleNode(Compiler compiler, T rel) { this.rel = rel; - this.source = interpreter.source(rel, 0); - this.sink = interpreter.sink(rel); + this.source = compiler.source(rel, 0); + this.sink = compiler.sink(rel); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java index 47b933b..e2d1adc 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java +++ b/core/src/main/java/org/apache/calcite/interpreter/AggregateNode.java @@ -63,9 +63,9 @@ public class AggregateNode extends AbstractSingleNode<Aggregate> { private final ImmutableList<AccumulatorFactory> accumulatorFactories; private final DataContext dataContext; - public AggregateNode(Interpreter interpreter, Aggregate rel) { - super(interpreter, rel); - this.dataContext = interpreter.getDataContext(); + public AggregateNode(Compiler compiler, Aggregate rel) { + super(compiler, rel); + this.dataContext = compiler.getDataContext(); ImmutableBitSet union = ImmutableBitSet.of(); http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Bindables.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/Bindables.java b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java index c5287eb..a26325e 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/Bindables.java +++ b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java @@ -321,7 +321,7 @@ public class Bindables { } public Node implement(InterpreterImplementor implementor) { - return new FilterNode(implementor.interpreter, this); + return new FilterNode(implementor.compiler, this); } } @@ -378,7 +378,7 @@ public class Bindables { } public Node implement(InterpreterImplementor implementor) { - return new ProjectNode(implementor.interpreter, this); + return new ProjectNode(implementor.compiler, this); } } @@ -434,7 +434,7 @@ public class Bindables { } public Node implement(InterpreterImplementor implementor) { - return new SortNode(implementor.interpreter, this); + return new SortNode(implementor.compiler, this); } } @@ -504,7 +504,7 @@ public class Bindables { } public Node implement(InterpreterImplementor implementor) { - return new JoinNode(implementor.interpreter, this); + return new JoinNode(implementor.compiler, this); } } @@ -556,7 +556,7 @@ public class Bindables { } public Node implement(InterpreterImplementor implementor) { - return new UnionNode(implementor.interpreter, this); + return new UnionNode(implementor.compiler, this); } } @@ -582,7 +582,7 @@ public class Bindables { } public Node implement(InterpreterImplementor implementor) { - return new ValuesNode(implementor.interpreter, this); + return new ValuesNode(implementor.compiler, this); } } @@ -660,7 +660,7 @@ public class Bindables { } public Node implement(InterpreterImplementor implementor) { - return new AggregateNode(implementor.interpreter, this); + return new AggregateNode(implementor.compiler, this); } } @@ -722,7 +722,7 @@ public class Bindables { } public Node implement(InterpreterImplementor implementor) { - return new WindowNode(implementor.interpreter, this); + return new WindowNode(implementor.compiler, this); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Compiler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/Compiler.java b/core/src/main/java/org/apache/calcite/interpreter/Compiler.java new file mode 100644 index 0000000..cdd5c5e --- /dev/null +++ b/core/src/main/java/org/apache/calcite/interpreter/Compiler.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.interpreter; + +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; + +import java.util.List; + +/** + * Context while converting a tree of {@link RelNode} to a program + * that can be run by an {@link Interpreter}. + */ +public interface Compiler { + + /** Compiles an expression to an executable form. */ + Scalar compile(List<RexNode> nodes, RelDataType inputRowType); + + RelDataType combinedRowType(List<RelNode> inputs); + + Source source(RelNode rel, int ordinal); + + /** + * Creates a Sink for a relational expression to write into. + * + * <p>This method is generally called from the constructor of a {@link Node}. + * But a constructor could instead call + * {@link #enumerable(RelNode, Enumerable)}. + * + * @param rel Relational expression + * @return Sink + */ + Sink sink(RelNode rel); + + /** Tells the interpreter that a given relational expression wishes to + * give its output as an enumerable. + * + * <p>This is as opposed to the norm, where a relational expression calls + * {@link #sink(RelNode)}, then its {@link Node#run()} method writes into that + * sink. + * + * @param rel Relational expression + * @param rowEnumerable Contents of relational expression + */ + void enumerable(RelNode rel, Enumerable<Row> rowEnumerable); + + DataContext getDataContext(); + + Context createContext(); + +} + +// End Compiler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java b/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java index 7d4ab3d..4f3fb7f 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java +++ b/core/src/main/java/org/apache/calcite/interpreter/FilterNode.java @@ -28,12 +28,12 @@ public class FilterNode extends AbstractSingleNode<Filter> { private final Scalar condition; private final Context context; - public FilterNode(Interpreter interpreter, Filter rel) { - super(interpreter, rel); + public FilterNode(Compiler compiler, Filter rel) { + super(compiler, rel); this.condition = - interpreter.compile(ImmutableList.of(rel.getCondition()), + compiler.compile(ImmutableList.of(rel.getCondition()), rel.getRowType()); - this.context = interpreter.createContext(); + this.context = compiler.createContext(); } public void run() throws InterruptedException { http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java b/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java index 340299e..42276ac 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java +++ b/core/src/main/java/org/apache/calcite/interpreter/InterpretableRel.java @@ -35,17 +35,17 @@ public interface InterpretableRel extends RelNode { /** Context when a {@link RelNode} is being converted to an interpreter * {@link Node}. */ class InterpreterImplementor { - public final Interpreter interpreter; + public final Compiler compiler; public final Map<String, Object> internalParameters = Maps.newLinkedHashMap(); public final CalcitePrepare.SparkHandler spark; public final DataContext dataContext; public final Map<RelNode, List<Sink>> relSinks = Maps.newHashMap(); - public InterpreterImplementor(Interpreter interpreter, + public InterpreterImplementor(Compiler compiler, CalcitePrepare.SparkHandler spark, DataContext dataContext) { - this.interpreter = interpreter; + this.compiler = compiler; this.spark = spark; this.dataContext = dataContext; } http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java index 9360294..6096630 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java +++ b/core/src/main/java/org/apache/calcite/interpreter/Interpreter.java @@ -21,7 +21,9 @@ import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.linq4j.Ord; import org.apache.calcite.linq4j.TransformedEnumerator; +import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.hep.HepPlanner; import org.apache.calcite.plan.hep.HepProgram; import org.apache.calcite.plan.hep.HepProgramBuilder; @@ -38,16 +40,27 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.util.Pair; import org.apache.calcite.util.ReflectUtil; import org.apache.calcite.util.ReflectiveVisitDispatcher; import org.apache.calcite.util.ReflectiveVisitor; +import org.apache.calcite.util.Util; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import java.math.BigDecimal; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -62,20 +75,19 @@ import java.util.NoSuchElementException; */ public class Interpreter extends AbstractEnumerable<Object[]> implements AutoCloseable { - final Map<RelNode, NodeInfo> nodes = Maps.newLinkedHashMap(); + private final Map<RelNode, NodeInfo> nodes; private final DataContext dataContext; private final RelNode rootRel; - private final Map<RelNode, List<RelNode>> relInputs = Maps.newHashMap(); - protected final ScalarCompiler scalarCompiler; /** Creates an Interpreter. */ public Interpreter(DataContext dataContext, RelNode rootRel) { this.dataContext = Preconditions.checkNotNull(dataContext); - this.scalarCompiler = - new JaninoRexCompiler(rootRel.getCluster().getRexBuilder()); final RelNode rel = optimize(rootRel); - final Compiler compiler = new Nodes.CoreCompiler(this); - this.rootRel = compiler.visitRoot(rel); + final CompilerImpl compiler = + new Nodes.CoreCompiler(this, rootRel.getCluster()); + Pair<RelNode, Map<RelNode, NodeInfo>> pair = compiler.visitRoot(rel); + this.rootRel = pair.left; + this.nodes = ImmutableMap.copyOf(pair.right); } private RelNode optimize(RelNode rootRel) { @@ -98,7 +110,8 @@ public class Interpreter extends AbstractEnumerable<Object[]> if (nodeInfo.rowEnumerable != null) { rows = nodeInfo.rowEnumerable.enumerator(); } else { - final ArrayDeque<Row> queue = ((ListSink) nodeInfo.sink).list; + final ArrayDeque<Row> queue = + Iterables.getOnlyElement(nodeInfo.sinks.values()).list; rows = Linq4j.iterableEnumerator(queue); } @@ -124,23 +137,6 @@ public class Interpreter extends AbstractEnumerable<Object[]> public void close() { } - /** Compiles an expression to an executable form. */ - public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) { - if (inputRowType == null) { - inputRowType = dataContext.getTypeFactory().builder().build(); - } - return scalarCompiler.compile(nodes, inputRowType); - } - - RelDataType combinedRowType(List<RelNode> inputs) { - final RelDataTypeFactory.Builder builder = - dataContext.getTypeFactory().builder(); - for (RelNode input : inputs) { - builder.addAll(input.getRowType().getFieldList()); - } - return builder.build(); - } - /** Not used. */ private class FooCompiler implements ScalarCompiler { public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) { @@ -249,85 +245,16 @@ public class Interpreter extends AbstractEnumerable<Object[]> } } - public Source source(RelNode rel, int ordinal) { - final RelNode input = getInput(rel, ordinal); - final NodeInfo nodeInfo = nodes.get(input); - if (nodeInfo == null) { - throw new AssertionError("should be registered: " + rel); - } - if (nodeInfo.rowEnumerable != null) { - return new EnumeratorSource(nodeInfo.rowEnumerable.enumerator()); - } - Sink sink = nodeInfo.sink; - if (sink instanceof ListSink) { - return new ListSource((ListSink) nodeInfo.sink); - } - throw new IllegalStateException( - "Got a sink " + sink + " to which there is no match source type!"); - } - - private RelNode getInput(RelNode rel, int ordinal) { - final List<RelNode> inputs = relInputs.get(rel); - if (inputs != null) { - return inputs.get(ordinal); - } - return rel.getInput(ordinal); - } - - /** - * Creates a Sink for a relational expression to write into. - * - * <p>This method is generally called from the constructor of a {@link Node}. - * But a constructor could instead call - * {@link #enumerable(RelNode, Enumerable)}. - * - * @param rel Relational expression - * @return Sink - */ - public Sink sink(RelNode rel) { - final ArrayDeque<Row> queue = new ArrayDeque<>(1); - final Sink sink = new ListSink(queue); - NodeInfo nodeInfo = new NodeInfo(rel, sink, null); - nodes.put(rel, nodeInfo); - return sink; - } - - /** Tells the interpreter that a given relational expression wishes to - * give its output as an enumerable. - * - * <p>This is as opposed to the norm, where a relational expression calls - * {@link #sink(RelNode)}, then its {@link Node#run()} method writes into that - * sink. - * - * @param rel Relational expression - * @param rowEnumerable Contents of relational expression - */ - public void enumerable(RelNode rel, Enumerable<Row> rowEnumerable) { - NodeInfo nodeInfo = new NodeInfo(rel, null, rowEnumerable); - nodes.put(rel, nodeInfo); - } - - public Context createContext() { - return new Context(dataContext); - } - - public DataContext getDataContext() { - return dataContext; - } - /** Information about a node registered in the data flow graph. */ private static class NodeInfo { final RelNode rel; - final Sink sink; + final Map<Edge, ListSink> sinks = new LinkedHashMap<>(); final Enumerable<Row> rowEnumerable; Node node; - NodeInfo(RelNode rel, Sink sink, Enumerable<Row> rowEnumerable) { + NodeInfo(RelNode rel, Enumerable<Row> rowEnumerable) { this.rel = rel; - this.sink = sink; this.rowEnumerable = rowEnumerable; - Preconditions.checkArgument((sink == null) != (rowEnumerable == null), - "one or the other"); } } @@ -386,15 +313,20 @@ public class Interpreter extends AbstractEnumerable<Object[]> /** Implementation of {@link Source} using a {@link java.util.ArrayDeque}. */ private static class ListSource implements Source { private final ArrayDeque<Row> list; + private Iterator<Row> iterator = null; - ListSource(ListSink sink) { - this.list = sink.list; + ListSource(ArrayDeque<Row> list) { + this.list = list; } public Row receive() { try { - return list.remove(); + if (iterator == null) { + iterator = list.iterator(); + } + return iterator.next(); } catch (NoSuchElementException e) { + iterator = null; return null; } } @@ -404,6 +336,35 @@ public class Interpreter extends AbstractEnumerable<Object[]> } } + /** Implementation of {@link Sink} using a {@link java.util.ArrayDeque}. */ + private static class DuplicatingSink implements Sink { + private List<ArrayDeque<Row>> queues; + + private DuplicatingSink(List<ArrayDeque<Row>> queues) { + this.queues = ImmutableList.copyOf(queues); + } + + public void send(Row row) throws InterruptedException { + for (ArrayDeque<Row> queue : queues) { + queue.add(row); + } + } + + public void end() throws InterruptedException { + } + + @SuppressWarnings("deprecation") + @Override public void setSourceEnumerable(Enumerable<Row> enumerable) + throws InterruptedException { + // just copy over the source into the local list + final Enumerator<Row> enumerator = enumerable.enumerator(); + while (enumerator.moveNext()) { + this.send(enumerator.current()); + } + enumerator.close(); + } + } + /** * Walks over a tree of {@link org.apache.calcite.rel.RelNode} and, for each, * creates a {@link org.apache.calcite.interpreter.Node} that can be @@ -417,25 +378,32 @@ public class Interpreter extends AbstractEnumerable<Object[]> * "visit" methods in this or a sub-class, and they will be found and called * via reflection. */ - public static class Compiler extends RelVisitor implements ReflectiveVisitor { - private final ReflectiveVisitDispatcher<Compiler, RelNode> dispatcher = - ReflectUtil.createDispatcher(Compiler.class, RelNode.class); + static class CompilerImpl extends RelVisitor + implements Compiler, ReflectiveVisitor { + final ScalarCompiler scalarCompiler; + private final ReflectiveVisitDispatcher<CompilerImpl, RelNode> dispatcher = + ReflectUtil.createDispatcher(CompilerImpl.class, RelNode.class); protected final Interpreter interpreter; protected RelNode rootRel; protected RelNode rel; protected Node node; + final Map<RelNode, NodeInfo> nodes = new LinkedHashMap<>(); + final Map<RelNode, List<RelNode>> relInputs = new HashMap<>(); + final Multimap<RelNode, Edge> outEdges = LinkedHashMultimap.create(); private static final String REWRITE_METHOD_NAME = "rewrite"; private static final String VISIT_METHOD_NAME = "visit"; - Compiler(Interpreter interpreter) { + CompilerImpl(Interpreter interpreter, RelOptCluster cluster) { this.interpreter = interpreter; + this.scalarCompiler = new JaninoRexCompiler(cluster.getRexBuilder()); } - public RelNode visitRoot(RelNode p) { + /** Visits the tree, starting from the root {@code p}. */ + Pair<RelNode, Map<RelNode, NodeInfo>> visitRoot(RelNode p) { rootRel = p; visit(p, 0, null); - return rootRel; + return Pair.of(rootRel, nodes); } @Override public void visit(RelNode p, int ordinal, RelNode parent) { @@ -454,10 +422,10 @@ public class Interpreter extends AbstractEnumerable<Object[]> } p = rel; if (parent != null) { - List<RelNode> inputs = interpreter.relInputs.get(parent); + List<RelNode> inputs = relInputs.get(parent); if (inputs == null) { inputs = Lists.newArrayList(parent.getInputs()); - interpreter.relInputs.put(parent, inputs); + relInputs.put(parent, inputs); } inputs.set(ordinal, p); } else { @@ -466,7 +434,10 @@ public class Interpreter extends AbstractEnumerable<Object[]> } // rewrite children first (from left to right) - final List<RelNode> inputs = interpreter.relInputs.get(p); + final List<RelNode> inputs = relInputs.get(p); + for (Ord<RelNode> input : Ord.zip(Util.first(inputs, p.getInputs()))) { + outEdges.put(input.e, new Edge(p, input.i)); + } if (inputs != null) { for (int i = 0; i < inputs.size(); i++) { RelNode input = inputs.get(i); @@ -482,17 +453,22 @@ public class Interpreter extends AbstractEnumerable<Object[]> if (p instanceof InterpretableRel) { InterpretableRel interpretableRel = (InterpretableRel) p; node = interpretableRel.implement( - new InterpretableRel.InterpreterImplementor(interpreter, null, - null)); + new InterpretableRel.InterpreterImplementor(this, null, null)); } else { // Probably need to add a visit(XxxRel) method to CoreCompiler. throw new AssertionError("interpreter: no implementation for " + p.getClass()); } } - final NodeInfo nodeInfo = interpreter.nodes.get(p); + final NodeInfo nodeInfo = nodes.get(p); assert nodeInfo != null; nodeInfo.node = node; + if (inputs != null) { + for (int i = 0; i < inputs.size(); i++) { + final RelNode input = inputs.get(i); + visit(input, i, p); + } + } } /** Fallback rewrite method. @@ -502,6 +478,95 @@ public class Interpreter extends AbstractEnumerable<Object[]> * rewrite. */ public void rewrite(RelNode r) { } + + public Scalar compile(List<RexNode> nodes, RelDataType inputRowType) { + if (inputRowType == null) { + inputRowType = interpreter.dataContext.getTypeFactory().builder() + .build(); + } + return scalarCompiler.compile(nodes, inputRowType); + } + + public RelDataType combinedRowType(List<RelNode> inputs) { + final RelDataTypeFactory.Builder builder = + interpreter.dataContext.getTypeFactory().builder(); + for (RelNode input : inputs) { + builder.addAll(input.getRowType().getFieldList()); + } + return builder.build(); + } + + public Source source(RelNode rel, int ordinal) { + final RelNode input = getInput(rel, ordinal); + final Edge edge = new Edge(rel, ordinal); + final Collection<Edge> edges = outEdges.get(input); + final NodeInfo nodeInfo = nodes.get(input); + if (nodeInfo == null) { + throw new AssertionError("should be registered: " + rel); + } + if (nodeInfo.rowEnumerable != null) { + return new EnumeratorSource(nodeInfo.rowEnumerable.enumerator()); + } + assert nodeInfo.sinks.size() == edges.size(); + final ListSink sink = nodeInfo.sinks.get(edge); + if (sink != null) { + return new ListSource(sink.list); + } + throw new IllegalStateException( + "Got a sink " + sink + " to which there is no match source type!"); + } + + private RelNode getInput(RelNode rel, int ordinal) { + final List<RelNode> inputs = relInputs.get(rel); + if (inputs != null) { + return inputs.get(ordinal); + } + return rel.getInput(ordinal); + } + + public Sink sink(RelNode rel) { + final Collection<Edge> edges = outEdges.get(rel); + final Collection<Edge> edges2 = edges.isEmpty() + ? ImmutableList.of(new Edge(null, 0)) + : edges; + NodeInfo nodeInfo = nodes.get(rel); + if (nodeInfo == null) { + nodeInfo = new NodeInfo(rel, null); + nodes.put(rel, nodeInfo); + for (Edge edge : edges2) { + nodeInfo.sinks.put(edge, new ListSink(new ArrayDeque<Row>())); + } + } + if (edges.size() == 1) { + return Iterables.getOnlyElement(nodeInfo.sinks.values()); + } else { + final List<ArrayDeque<Row>> queues = new ArrayList<>(); + for (ListSink sink : nodeInfo.sinks.values()) { + queues.add(sink.list); + } + return new DuplicatingSink(queues); + } + } + + public void enumerable(RelNode rel, Enumerable<Row> rowEnumerable) { + NodeInfo nodeInfo = new NodeInfo(rel, rowEnumerable); + nodes.put(rel, nodeInfo); + } + + public Context createContext() { + return new Context(getDataContext()); + } + + public DataContext getDataContext() { + return interpreter.dataContext; + } + } + + /** Edge between a {@link RelNode} and one of its inputs. */ + static class Edge extends Pair<RelNode, Integer> { + Edge(RelNode parent, int ordinal) { + super(parent, ordinal); + } } /** Converts a list of expressions to a scalar that can compute their http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java index 6e2ae15..349e26f 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java +++ b/core/src/main/java/org/apache/calcite/interpreter/JoinNode.java @@ -35,14 +35,14 @@ public class JoinNode implements Node { private final Scalar condition; private final Context context; - public JoinNode(Interpreter interpreter, Join rel) { - this.leftSource = interpreter.source(rel, 0); - this.rightSource = interpreter.source(rel, 1); - this.sink = interpreter.sink(rel); - this.condition = interpreter.compile(ImmutableList.of(rel.getCondition()), - interpreter.combinedRowType(rel.getInputs())); + public JoinNode(Compiler compiler, Join rel) { + this.leftSource = compiler.source(rel, 0); + this.rightSource = compiler.source(rel, 1); + this.sink = compiler.sink(rel); + this.condition = compiler.compile(ImmutableList.of(rel.getCondition()), + compiler.combinedRowType(rel.getInputs())); this.rel = rel; - this.context = interpreter.createContext(); + this.context = compiler.createContext(); } http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/Nodes.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java index 19f397a..b6f1b08 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/Nodes.java +++ b/core/src/main/java/org/apache/calcite/interpreter/Nodes.java @@ -16,6 +16,7 @@ */ package org.apache.calcite.interpreter; +import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.Join; @@ -35,54 +36,53 @@ import com.google.common.collect.ImmutableList; */ public class Nodes { /** Extension to - * {@link org.apache.calcite.interpreter.Interpreter.Compiler} + * {@link Interpreter.CompilerImpl} * that knows how to handle the core logical * {@link org.apache.calcite.rel.RelNode}s. */ - public static class CoreCompiler extends Interpreter.Compiler { - CoreCompiler(Interpreter interpreter) { - super(interpreter); + public static class CoreCompiler extends Interpreter.CompilerImpl { + CoreCompiler(Interpreter interpreter, RelOptCluster cluster) { + super(interpreter, cluster); } public void visit(Aggregate agg) { - node = new AggregateNode(interpreter, agg); + node = new AggregateNode(this, agg); } public void visit(Filter filter) { - node = new FilterNode(interpreter, filter); + node = new FilterNode(this, filter); } public void visit(Project project) { - node = new ProjectNode(interpreter, project); + node = new ProjectNode(this, project); } public void visit(Values value) { - node = new ValuesNode(interpreter, value); + node = new ValuesNode(this, value); } public void visit(TableScan scan) { - node = TableScanNode.create(interpreter, scan, - ImmutableList.<RexNode>of(), null); + final ImmutableList<RexNode> filters = ImmutableList.of(); + node = TableScanNode.create(this, scan, filters, null); } public void visit(Bindables.BindableTableScan scan) { - node = TableScanNode.create(interpreter, scan, scan.filters, - scan.projects); + node = TableScanNode.create(this, scan, scan.filters, scan.projects); } public void visit(Sort sort) { - node = new SortNode(interpreter, sort); + node = new SortNode(this, sort); } public void visit(Union union) { - node = new UnionNode(interpreter, union); + node = new UnionNode(this, union); } public void visit(Join join) { - node = new JoinNode(interpreter, join); + node = new JoinNode(this, join); } public void visit(Window window) { - node = new WindowNode(interpreter, window); + node = new WindowNode(this, window); } } } http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java b/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java index 4c280b7..2503b1cd 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java +++ b/core/src/main/java/org/apache/calcite/interpreter/ProjectNode.java @@ -27,12 +27,12 @@ public class ProjectNode extends AbstractSingleNode<Project> { private final Context context; private final int projectCount; - public ProjectNode(Interpreter interpreter, Project rel) { - super(interpreter, rel); + public ProjectNode(Compiler compiler, Project rel) { + super(compiler, rel); this.projectCount = rel.getProjects().size(); - this.scalar = interpreter.compile(rel.getProjects(), + this.scalar = compiler.compile(rel.getProjects(), rel.getInput().getRowType()); - this.context = interpreter.createContext(); + this.context = compiler.createContext(); } public void run() throws InterruptedException { http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/SortNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/SortNode.java b/core/src/main/java/org/apache/calcite/interpreter/SortNode.java index 5eaaaf6..dff97a6 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/SortNode.java +++ b/core/src/main/java/org/apache/calcite/interpreter/SortNode.java @@ -34,8 +34,8 @@ import java.util.List; * {@link org.apache.calcite.rel.core.Sort}. */ public class SortNode extends AbstractSingleNode<Sort> { - public SortNode(Interpreter interpreter, Sort rel) { - super(interpreter, rel); + public SortNode(Compiler compiler, Sort rel) { + super(compiler, rel); } public void run() throws InterruptedException { http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java index 9cec9e6..9c8124e 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java +++ b/core/src/main/java/org/apache/calcite/interpreter/TableScanNode.java @@ -59,12 +59,12 @@ import static org.apache.calcite.util.Static.RESOURCE; * {@link org.apache.calcite.rel.core.TableScan}. */ public class TableScanNode implements Node { - private TableScanNode(Interpreter interpreter, TableScan rel, + private TableScanNode(Compiler compiler, TableScan rel, Enumerable<Row> enumerable) { - interpreter.enumerable(rel, enumerable); + compiler.enumerable(rel, enumerable); } - public void run() throws InterruptedException { + public void run() { // nothing to do } @@ -73,56 +73,56 @@ public class TableScanNode implements Node { * <p>Tries various table SPIs, and negotiates with the table which filters * and projects it can implement. Adds to the Enumerable implementations of * any filters and projects that cannot be implemented by the table. */ - static TableScanNode create(Interpreter interpreter, TableScan rel, + static TableScanNode create(Compiler compiler, TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects) { final RelOptTable relOptTable = rel.getTable(); final ProjectableFilterableTable pfTable = relOptTable.unwrap(ProjectableFilterableTable.class); if (pfTable != null) { - return createProjectableFilterable(interpreter, rel, filters, projects, + return createProjectableFilterable(compiler, rel, filters, projects, pfTable); } final FilterableTable filterableTable = relOptTable.unwrap(FilterableTable.class); if (filterableTable != null) { - return createFilterable(interpreter, rel, filters, projects, + return createFilterable(compiler, rel, filters, projects, filterableTable); } final ScannableTable scannableTable = relOptTable.unwrap(ScannableTable.class); if (scannableTable != null) { - return createScannable(interpreter, rel, filters, projects, + return createScannable(compiler, rel, filters, projects, scannableTable); } //noinspection unchecked final Enumerable<Row> enumerable = relOptTable.unwrap(Enumerable.class); if (enumerable != null) { - return createEnumerable(interpreter, rel, enumerable, null, filters, + return createEnumerable(compiler, rel, enumerable, null, filters, projects); } final QueryableTable queryableTable = relOptTable.unwrap(QueryableTable.class); if (queryableTable != null) { - return createQueryable(interpreter, rel, filters, projects, + return createQueryable(compiler, rel, filters, projects, queryableTable); } throw new AssertionError("cannot convert table " + relOptTable + " to enumerable"); } - private static TableScanNode createScannable(Interpreter interpreter, - TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects, + private static TableScanNode createScannable(Compiler compiler, TableScan rel, + ImmutableList<RexNode> filters, ImmutableIntList projects, ScannableTable scannableTable) { final Enumerable<Row> rowEnumerable = - Enumerables.toRow(scannableTable.scan(interpreter.getDataContext())); - return createEnumerable(interpreter, rel, rowEnumerable, null, filters, + Enumerables.toRow(scannableTable.scan(compiler.getDataContext())); + return createEnumerable(compiler, rel, rowEnumerable, null, filters, projects); } - private static TableScanNode createQueryable(Interpreter interpreter, + private static TableScanNode createQueryable(Compiler compiler, TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects, QueryableTable queryableTable) { - final DataContext root = interpreter.getDataContext(); + final DataContext root = compiler.getDataContext(); final RelOptTable relOptTable = rel.getTable(); final Type elementType = queryableTable.getElementType(); SchemaPlus schema = root.getRootSchema(); @@ -163,14 +163,14 @@ public class TableScanNode implements Node { rowEnumerable = Schemas.queryable(root, Row.class, relOptTable.getQualifiedName()); } - return createEnumerable(interpreter, rel, rowEnumerable, null, filters, + return createEnumerable(compiler, rel, rowEnumerable, null, filters, projects); } - private static TableScanNode createFilterable(Interpreter interpreter, + private static TableScanNode createFilterable(Compiler compiler, TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects, FilterableTable filterableTable) { - final DataContext root = interpreter.getDataContext(); + final DataContext root = compiler.getDataContext(); final List<RexNode> mutableFilters = Lists.newArrayList(filters); final Enumerable<Object[]> enumerable = filterableTable.scan(root, mutableFilters); @@ -180,14 +180,14 @@ public class TableScanNode implements Node { } } final Enumerable<Row> rowEnumerable = Enumerables.toRow(enumerable); - return createEnumerable(interpreter, rel, rowEnumerable, null, + return createEnumerable(compiler, rel, rowEnumerable, null, mutableFilters, projects); } - private static TableScanNode createProjectableFilterable( - Interpreter interpreter, TableScan rel, ImmutableList<RexNode> filters, - ImmutableIntList projects, ProjectableFilterableTable pfTable) { - final DataContext root = interpreter.getDataContext(); + private static TableScanNode createProjectableFilterable(Compiler compiler, + TableScan rel, ImmutableList<RexNode> filters, ImmutableIntList projects, + ProjectableFilterableTable pfTable) { + final DataContext root = compiler.getDataContext(); final ImmutableIntList originalProjects = projects; for (;;) { final List<RexNode> mutableFilters = Lists.newArrayList(filters); @@ -234,19 +234,20 @@ public class TableScanNode implements Node { // project the leading columns. rejectedProjects = ImmutableIntList.identity(originalProjects.size()); } - return createEnumerable(interpreter, rel, rowEnumerable, projects, + return createEnumerable(compiler, rel, rowEnumerable, projects, mutableFilters, rejectedProjects); } } - private static TableScanNode createEnumerable( - Interpreter interpreter, TableScan rel, - Enumerable<Row> enumerable, final ImmutableIntList acceptedProjects, - List<RexNode> rejectedFilters, final ImmutableIntList rejectedProjects) { + private static TableScanNode createEnumerable(Compiler compiler, + TableScan rel, Enumerable<Row> enumerable, + final ImmutableIntList acceptedProjects, List<RexNode> rejectedFilters, + final ImmutableIntList rejectedProjects) { if (!rejectedFilters.isEmpty()) { final RexNode filter = RexUtil.composeConjunction(rel.getCluster().getRexBuilder(), rejectedFilters, false); + assert filter != null; // Re-map filter for the projects that have been applied already final RexNode filter2; final RelDataType inputRowType; @@ -267,9 +268,8 @@ public class TableScanNode implements Node { inputRowType = builder.build(); } final Scalar condition = - interpreter.compile( - ImmutableList.of(filter2), inputRowType); - final Context context = interpreter.createContext(); + compiler.compile(ImmutableList.of(filter2), inputRowType); + final Context context = compiler.createContext(); enumerable = enumerable.where( new Predicate1<Row>() { @Override public boolean apply(Row row) { @@ -292,7 +292,7 @@ public class TableScanNode implements Node { } }); } - return new TableScanNode(interpreter, rel, enumerable); + return new TableScanNode(compiler, rel, enumerable); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java b/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java index 701dbc7..0f6fe4f 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java +++ b/core/src/main/java/org/apache/calcite/interpreter/UnionNode.java @@ -32,13 +32,13 @@ public class UnionNode implements Node { private final Sink sink; private final Union rel; - public UnionNode(Interpreter interpreter, Union rel) { + public UnionNode(Compiler compiler, Union rel) { ImmutableList.Builder<Source> builder = ImmutableList.builder(); for (int i = 0; i < rel.getInputs().size(); i++) { - builder.add(interpreter.source(rel, i)); + builder.add(compiler.source(rel, i)); } this.sources = builder.build(); - this.sink = interpreter.sink(rel); + this.sink = compiler.sink(rel); this.rel = rel; } http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java b/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java index 008d6f0..1c6f255 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java +++ b/core/src/main/java/org/apache/calcite/interpreter/ValuesNode.java @@ -34,21 +34,21 @@ public class ValuesNode implements Node { private final int fieldCount; private final ImmutableList<Row> rows; - public ValuesNode(Interpreter interpreter, Values rel) { - this.sink = interpreter.sink(rel); + public ValuesNode(Compiler compiler, Values rel) { + this.sink = compiler.sink(rel); this.fieldCount = rel.getRowType().getFieldCount(); - this.rows = createRows(interpreter, rel.getTuples()); + this.rows = createRows(compiler, rel.getTuples()); } - private ImmutableList<Row> createRows(Interpreter interpreter, + private ImmutableList<Row> createRows(Compiler compiler, ImmutableList<ImmutableList<RexLiteral>> tuples) { final List<RexNode> nodes = Lists.newArrayList(); for (ImmutableList<RexLiteral> tuple : tuples) { nodes.addAll(tuple); } - final Scalar scalar = interpreter.compile(nodes, null); + final Scalar scalar = compiler.compile(nodes, null); final Object[] values = new Object[nodes.size()]; - final Context context = interpreter.createContext(); + final Context context = compiler.createContext(); scalar.execute(context, values); final ImmutableList.Builder<Row> rows = ImmutableList.builder(); Object[] subValues = new Object[fieldCount]; http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java b/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java index a03608c..eff4bea 100644 --- a/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java +++ b/core/src/main/java/org/apache/calcite/interpreter/WindowNode.java @@ -23,8 +23,8 @@ import org.apache.calcite.rel.core.Window; * {@link org.apache.calcite.rel.core.Window}. */ public class WindowNode extends AbstractSingleNode<Window> { - WindowNode(Interpreter interpreter, Window rel) { - super(interpreter, rel); + WindowNode(Compiler compiler, Window rel) { + super(compiler, rel); } public void run() throws InterruptedException { http://git-wip-us.apache.org/repos/asf/calcite/blob/7b85d445/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java index 2469841..8d3ea64 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java @@ -426,7 +426,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { } @Override public Node implement(InterpreterImplementor implementor) { - return new DruidQueryNode(implementor.interpreter, this); + return new DruidQueryNode(implementor.compiler, this); } public QuerySpec getQuerySpec() {
