This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new 8cdb219 query execution 8cdb219 is described below commit 8cdb219ecb96bd09c14388f0678e79faf4916c04 Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Wed Dec 4 18:54:37 2019 +0300 query execution --- .../processors/query/calcite/exchange/Outbox.java | 2 +- .../query/calcite/exec/AbstractNode.java | 2 +- .../query/calcite/exec/Interpretable.java | 510 +++++++++++++++++++++ .../processors/query/calcite/exec/Node.java | 2 +- .../query/calcite/exec/ScalarFactory.java | 121 ----- .../processors/query/calcite/exec/ScanNode.java | 12 +- .../query/calcite/metadata/FragmentInfo.java | 10 +- .../calcite/metadata/IgniteMdFragmentInfo.java | 4 +- .../query/calcite/metadata/RelMetadataQueryEx.java | 8 +- .../ContextValue.java} | 30 +- .../query/calcite/prepare/DataContextImpl.java | 26 +- .../rel/{Receiver.java => IgniteReceiver.java} | 4 +- .../calcite/rel/{Sender.java => IgniteSender.java} | 16 +- .../query/calcite/rule/PlannerPhase.java | 8 + .../query/calcite/schema/IgniteTable.java | 9 +- .../calcite/serialize/relation/ReceiverNode.java | 6 +- .../serialize/relation/RelToGraphConverter.java | 8 +- .../calcite/serialize/relation/SenderNode.java | 6 +- .../query/calcite/splitter/Fragment.java | 14 +- .../query/calcite/splitter/QueryPlan.java | 8 +- .../query/calcite/splitter/Splitter.java | 12 +- .../query/calcite/util/IgniteRelShuttle.java | 16 +- .../query/calcite/util/RelImplementor.java | 8 +- .../{ScanIterator.java => TableScanIterator.java} | 4 +- .../query/calcite/CalciteQueryProcessorTest.java | 144 +++++- .../query/calcite/exec/ExecutionTest.java | 4 +- 26 files changed, 765 insertions(+), 229 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java index b1cf688..9099b46 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java @@ -45,7 +45,7 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T> private ExchangeProcessor srvc; - protected Outbox(GridCacheVersion queryId, long exchangeId, Collection<UUID> targets, DestinationFunction function) { + public Outbox(GridCacheVersion queryId, long exchangeId, Collection<UUID> targets, DestinationFunction function) { super(Sink.noOp()); this.queryId = queryId; this.exchangeId = exchangeId; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java index e832da4..66ba475 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java @@ -22,7 +22,7 @@ import java.util.List; /** * */ -public abstract class AbstractNode<T> implements Node<T>, Source { +public abstract class AbstractNode<T> implements Node<T> { protected final Sink<T> target; protected List<Source> sources; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java new file mode 100644 index 0000000..04108d4 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java @@ -0,0 +1,510 @@ +/* + * Copyright 2019 GridGain Systems, Inc. and Contributors. + * + * Licensed under the GridGain Community Edition License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.calcite.DataContext; +import org.apache.calcite.interpreter.Context; +import org.apache.calcite.interpreter.JaninoRexCompiler; +import org.apache.calcite.interpreter.Scalar; +import org.apache.calcite.interpreter.Util; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.ScannableTable; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.calcite.exchange.Outbox; +import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; +import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; +import org.apache.ignite.internal.processors.query.calcite.splitter.Target; +import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction; +import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory; +import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.PLANNER_CONTEXT; +import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.QUERY_ID; + +/** + * + */ +public class Interpretable { + public static final Convention INTERPRETABLE = new Convention.Impl("INTERPRETABLE", InterRel.class) {}; + + public static final List<RelOptRule> RULES = ImmutableList.of( + new TableScanConverter(), + new JoinConverter(), + new ProjectConverter(), + new FilterConverter(), + new SenderConverter(), + new ReceiverConverter() + ); + + public interface InterRel extends RelNode { + <T> Node<T> implement(Implementor<T> implementor); + } + + public static class TableScanConverter extends ConverterRule { + public TableScanConverter() { + super(IgniteTableScan.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "TableScanConverter"); + } + + @Override public RelNode convert(RelNode rel) { + IgniteTableScan scan = (IgniteTableScan) rel; + + RelTraitSet traitSet = scan.getTraitSet().replace(INTERPRETABLE); + return new ScanRel(rel.getCluster(), traitSet, scan.getTable()); + } + } + + public static class JoinConverter extends ConverterRule { + public JoinConverter() { + super(IgniteJoin.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "JoinConverter"); + } + + @Override public RelNode convert(RelNode rel) { + IgniteJoin join = (IgniteJoin) rel; + + RelNode left = convert(join.getLeft(), INTERPRETABLE); + RelNode right = convert(join.getRight(), INTERPRETABLE); + + RelTraitSet traitSet = join.getTraitSet().replace(INTERPRETABLE); + + return new JoinRel(rel.getCluster(), traitSet, left, right, join.getCondition(), join.getVariablesSet(), join.getJoinType()); + } + } + + public static class ProjectConverter extends ConverterRule { + public ProjectConverter() { + super(IgniteProject.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "ProjectConverter"); + } + + @Override public RelNode convert(RelNode rel) { + IgniteProject project = (IgniteProject) rel; + RelTraitSet traitSet = project.getTraitSet().replace(INTERPRETABLE); + RelNode input = convert(project.getInput(), INTERPRETABLE); + + return new ProjectRel(rel.getCluster(), traitSet, input, project.getProjects(), project.getRowType()); + } + } + + public static class FilterConverter extends ConverterRule { + public FilterConverter() { + super(IgniteFilter.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "FilterConverter"); + } + + @Override public RelNode convert(RelNode rel) { + IgniteFilter filter = (IgniteFilter) rel; + RelTraitSet traitSet = filter.getTraitSet().replace(INTERPRETABLE); + RelNode input = convert(filter.getInput(), INTERPRETABLE); + + return new FilterRel(rel.getCluster(), traitSet, input, filter.getCondition()); + } + } + + public static class SenderConverter extends ConverterRule { + public SenderConverter() { + super(IgniteSender.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "SenderConverter"); + } + + @Override public RelNode convert(RelNode rel) { + IgniteSender sender = (IgniteSender) rel; + RelTraitSet traitSet = sender.getTraitSet().replace(INTERPRETABLE); + RelNode input = convert(sender.getInput(), INTERPRETABLE); + + return new SenderRel(rel.getCluster(), traitSet, input, sender.target()); + } + } + + public static class ReceiverConverter extends ConverterRule { + public ReceiverConverter() { + super(IgniteReceiver.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "ReceiverConverter"); + } + + @Override public RelNode convert(RelNode rel) { + IgniteReceiver sender = (IgniteReceiver) rel; + RelTraitSet traitSet = sender.getTraitSet().replace(INTERPRETABLE); + + return new ReceiverRel(rel.getCluster(), traitSet, sender.source()); + } + } + + public static class ReceiverRel extends AbstractRelNode implements InterRel { + private final org.apache.ignite.internal.processors.query.calcite.splitter.Source source; + + protected ReceiverRel(RelOptCluster cluster, RelTraitSet traits, org.apache.ignite.internal.processors.query.calcite.splitter.Source source) { + super(cluster, traits); + + this.source = source; + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new ReceiverRel(getCluster(), traitSet, source); + } + + @Override public <T> Node<T> implement(Implementor<T> implementor) { + return implementor.implement(this); + } + } + + public static class SenderRel extends SingleRel implements InterRel { + private final Target target; + + protected SenderRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, @NotNull Target target) { + super(cluster, traits, input); + this.target = target; + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new SenderRel(getCluster(), traitSet, sole(inputs), target); + } + + @Override public <T> Node<T> implement(Implementor<T> implementor) { + return implementor.implement(this); + } + } + + public static class FilterRel extends Filter implements InterRel { + protected FilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) { + super(cluster, traits, child, condition); + } + + @Override public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return new FilterRel(getCluster(), traitSet, input, condition); + } + + @Override public <T> Node<T> implement(Implementor<T> implementor) { + return implementor.implement(this); + } + } + + public static class ProjectRel extends Project implements InterRel { + protected ProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + @Override public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) { + return new ProjectRel(getCluster(), traitSet, input, projects, rowType); + } + + @Override public <T> Node<T> implement(Implementor<T> implementor) { + return implementor.implement(this); + } + } + + public static class JoinRel extends Join implements InterRel { + protected JoinRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) { + super(cluster, traitSet, left, right, condition, variablesSet, joinType); + } + + @Override public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { + return new JoinRel(getCluster(), traitSet, left, right, condition, variablesSet, joinType); + } + + @Override public <T> Node<T> implement(Implementor<T> implementor) { + return implementor.implement(this); + } + } + + public static class ScanRel extends TableScan implements InterRel { + protected ScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { + super(cluster, traitSet, table); + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return this; + } + + @Override public <T> Node<T> implement(Implementor<T> implementor) { + return implementor.implement(this); + } + } + + public static class Implementor<T> { + private final PlannerContext ctx; + private final DataContext root; + private final ScalarFactory factory; + private Deque<Sink<T>> stack; + + public Implementor(DataContext root) { + this.root = root; + + ctx = PLANNER_CONTEXT.get(root); + factory = new ScalarFactory(new RexBuilder(ctx.typeFactory())); + stack = new ArrayDeque<>(); + } + + public Node<T> implement(SenderRel rel) { + assert stack.isEmpty(); + + GridCacheVersion id = QUERY_ID.get(root); + long exchangeId = rel.target.exchangeId(); + NodesMapping mapping = rel.target.mapping(); + List<UUID> targets = mapping.nodes(); + DistributionTrait distribution = rel.target.distribution(); + DestinationFunctionFactory destFactory = distribution.destinationFunctionFactory(); + DestinationFunction function = destFactory.create(ctx, mapping, distribution.keys()); + + Outbox<T> res = new Outbox<>(id, exchangeId, targets, function); + + stack.push(res.sink()); + + res.source(source(rel.getInput())); + + return res; + } + + public Node<T> implement(FilterRel rel) { + assert !stack.isEmpty(); + + FilterNode res = new FilterNode((Sink<Object[]>) stack.pop(), factory.filterPredicate(root, rel.getCondition(), rel.getRowType())); + + stack.push((Sink<T>) res.sink()); + + res.source(source(rel.getInput())); + + return (Node<T>) res; + } + + public Node<T> implement(ProjectRel rel) { + assert !stack.isEmpty(); + + ProjectNode res = new ProjectNode((Sink<Object[]>) stack.pop(), factory.projectExpression(root, rel.getProjects(), rel.getInput().getRowType())); + + stack.push((Sink<T>) res.sink()); + + res.source(source(rel.getInput())); + + return (Node<T>) res; + } + + public Node<T> implement(JoinRel rel) { + assert !stack.isEmpty(); + + JoinNode res = new JoinNode((Sink<Object[]>) stack.pop(), factory.joinExpression(root, rel.getCondition(), rel.getLeft().getRowType(), rel.getRight().getRowType())); + + stack.push((Sink<T>) res.sink(1)); + stack.push((Sink<T>) res.sink(0)); + + res.sources(sources(rel.getInputs())); + + return (Node<T>) res; + } + + public Node<T> implement(ScanRel rel) { + assert !stack.isEmpty(); + + Iterable<Object[]> source = rel.getTable().unwrap(ScannableTable.class).scan(root); + + return (Node<T>) new ScanNode((Sink<Object[]>)stack.pop(), source); + } + + public Node<T> implement(ReceiverRel rel) { + throw new AssertionError(); // TODO + } + + private Source source(RelNode rel) { + if (rel.getConvention() != INTERPRETABLE) + throw new IllegalStateException("INTERPRETABLE is required."); + + return ((InterRel)rel).implement(this); + } + + private List<Source> sources(List<RelNode> rels) { + ArrayList<Source> res = new ArrayList<>(rels.size()); + + for (RelNode rel : rels) { + res.add(source(rel)); + } + + return res; + } + + public Node<T> go(RelNode rel) { + if (rel.getConvention() != INTERPRETABLE) + throw new IllegalStateException("INTERPRETABLE is required."); + + if (rel instanceof SenderRel) + return implement((SenderRel)rel); + + ConsumerNode res = new ConsumerNode(); + + stack.push((Sink<T>) res.sink()); + + res.source(source(rel)); + + return (Node<T>) res; + } + } + + /** + * + */ + private static class ScalarFactory { + private final JaninoRexCompiler rexCompiler; + private final RexBuilder builder; + + private ScalarFactory(RexBuilder builder) { + rexCompiler = new JaninoRexCompiler(builder); + this.builder = builder; + } + + public <T> Predicate<T> filterPredicate(DataContext root, RexNode filter, RelDataType rowType) { + System.out.println("filterPredicate for" + filter); + + Scalar scalar = rexCompiler.compile(ImmutableList.of(filter), rowType); + Context ctx = Util.createContext(root); + + return new FilterPredicate<>(ctx, scalar); + } + + public <T> Function<T, T> projectExpression(DataContext root, List<RexNode> projects, RelDataType rowType) { + System.out.println("joinExpression for" + projects); + + Scalar scalar = rexCompiler.compile(projects, rowType); + Context ctx = Util.createContext(root); + int count = projects.size(); + + return new ProjectExpression<>(ctx, scalar, count); + } + + public <T> BiFunction<T, T, T> joinExpression(DataContext root, RexNode expression, RelDataType leftType, RelDataType rightType) { + System.out.println("joinExpression for" + expression); + + RelDataType rowType = combinedType(leftType, rightType); + + Scalar scalar = rexCompiler.compile(ImmutableList.of(expression), rowType); + Context ctx = Util.createContext(root); + ctx.values = new Object[rowType.getFieldCount()]; + + return new JoinExpression<>(ctx, scalar); + } + + private RelDataType combinedType(RelDataType... types) { + RelDataTypeFactory.Builder typeBuilder = new RelDataTypeFactory.Builder(builder.getTypeFactory()); + + for (RelDataType type : types) + typeBuilder.addAll(type.getFieldList()); + + return typeBuilder.build(); + } + + private static class FilterPredicate<T> implements Predicate<T> { + private final Context ctx; + private final Scalar scalar; + private final Object[] vals; + + private FilterPredicate(Context ctx, Scalar scalar) { + this.ctx = ctx; + this.scalar = scalar; + + vals = new Object[1]; + } + + @Override public boolean test(T r) { + ctx.values = (Object[]) r; + scalar.execute(ctx, vals); + return (Boolean) vals[0]; + } + } + + private static class JoinExpression<T> implements BiFunction<T, T, T> { + private final Object[] vals; + private final Context ctx; + private final Scalar scalar; + + private Object[] left0; + + private JoinExpression(Context ctx, Scalar scalar) { + this.ctx = ctx; + this.scalar = scalar; + + vals = new Object[1]; + } + + @Override public T apply(T left, T right) { + if (left0 != left) { + left0 = (Object[]) left; + System.arraycopy(left0, 0, ctx.values, 0, left0.length); + } + + Object[] right0 = (Object[]) right; + System.arraycopy(right0, 0, ctx.values, left0.length, right0.length); + + scalar.execute(ctx, vals); + + if ((Boolean) vals[0]) + return (T) Arrays.copyOf(ctx.values, ctx.values.length); + + return null; + } + } + + private static class ProjectExpression<T> implements Function<T, T> { + private final Context ctx; + private final Scalar scalar; + private final int count; + + private ProjectExpression(Context ctx, Scalar scalar, int count) { + this.ctx = ctx; + this.scalar = scalar; + this.count = count; + } + + @Override public T apply(T r) { + ctx.values = (Object[]) r; + Object[] res = new Object[count]; + scalar.execute(ctx, res); + + return (T) res; + } + } + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java index 71266ff..2472e8e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java @@ -21,7 +21,7 @@ import java.util.List; /** * */ -public interface Node<T> { +public interface Node<T> extends Source { Sink<T> sink(int idx); void sources(List<Source> sources); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java deleted file mode 100644 index 62847ac..0000000 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2019 GridGain Systems, Inc. and Contributors. - * - * Licensed under the GridGain Community Edition License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.calcite.exec; - -import com.google.common.collect.ImmutableList; -import java.util.Arrays; -import java.util.function.BiFunction; -import java.util.function.Predicate; -import org.apache.calcite.DataContext; -import org.apache.calcite.interpreter.Context; -import org.apache.calcite.interpreter.JaninoRexCompiler; -import org.apache.calcite.interpreter.Scalar; -import org.apache.calcite.interpreter.Util; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; - -/** - * - */ -public class ScalarFactory { - private final JaninoRexCompiler rexCompiler; - private final RexBuilder builder; - - public ScalarFactory(RexBuilder builder) { - rexCompiler = new JaninoRexCompiler(builder); - this.builder = builder; - } - - public <T> Predicate<T> filterPredicate(DataContext root, RexNode filter, RelDataType rowType) { - Scalar scalar = rexCompiler.compile(ImmutableList.of(filter), rowType); - Context ctx = Util.createContext(root); - - return new FilterPredicate<>(ctx, scalar); - } - - public <T> BiFunction<T, T, T> joinExpression(DataContext root, RexNode expression, RelDataType leftType, RelDataType rightType) { - RelDataType rowType = combinedType(leftType, rightType); - - Scalar scalar = rexCompiler.compile(ImmutableList.of(expression), rowType); - Context ctx = Util.createContext(root); - ctx.values = new Object[rowType.getFieldCount()]; - - return new JoinExpression<>(ctx, scalar); - } - - private RelDataType combinedType(RelDataType... types) { - RelDataTypeFactory.Builder typeBuilder = new RelDataTypeFactory.Builder(builder.getTypeFactory()); - - for (RelDataType type : types) - typeBuilder.addAll(type.getFieldList()); - - return typeBuilder.build(); - } - - private static class FilterPredicate<T> implements Predicate<T> { - private final Context ctx; - private final Scalar scalar; - private final Object[] vals; - - private FilterPredicate(Context ctx, Scalar scalar) { - this.ctx = ctx; - this.scalar = scalar; - - vals = new Object[1]; - } - - @Override public boolean test(T r) { - ctx.values = (Object[]) r; - scalar.execute(ctx, vals); - return (Boolean) vals[0]; - } - } - - private static class JoinExpression<T> implements BiFunction<T, T, T> { - private final Object[] vals; - private final Context ctx; - private final Scalar scalar; - - private Object[] left0; - - private JoinExpression(Context ctx, Scalar scalar) { - this.ctx = ctx; - this.scalar = scalar; - - vals = new Object[1]; - } - - @Override public T apply(T left, T right) { - if (left0 != left) { - left0 = (Object[]) left; - System.arraycopy(left0, 0, ctx.values, 0, left0.length); - } - - Object[] right0 = (Object[]) right; - System.arraycopy(right0, 0, ctx.values, left0.length, right0.length); - - scalar.execute(ctx, vals); - - if ((Boolean) vals[0]) - return (T) Arrays.copyOf(ctx.values, ctx.values.length); - - return null; - } - } -} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java index 19edc8f..37c128a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java @@ -22,18 +22,19 @@ import java.util.List; /** * */ -public class ScanNode implements SingleNode<Object[]>, Source{ +public class ScanNode implements SingleNode<Object[]> { private static final Object[] END = new Object[0]; /** */ private final Sink<Object[]> target; - private final Iterator<Object[]> it; + private final Iterable<Object[]> source; + private Iterator<Object[]> it; private Object[] row; - protected ScanNode(Sink<Object[]> target, Iterator<Object[]> it) { + protected ScanNode(Sink<Object[]> target, Iterable<Object[]> source) { this.target = target; - this.it = it; + this.source = source; } @Override public void signal() { @@ -45,6 +46,9 @@ public class ScanNode implements SingleNode<Object[]>, Source{ row = null; + if (it == null) + it = source.iterator(); + while (it.hasNext()) { row = it.next(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java index 3ace003..b282ea7 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.metadata; import com.google.common.collect.ImmutableList; import org.apache.calcite.util.Pair; -import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; import org.apache.ignite.internal.processors.query.calcite.splitter.Source; /** @@ -26,9 +26,9 @@ import org.apache.ignite.internal.processors.query.calcite.splitter.Source; */ public class FragmentInfo { private final NodesMapping mapping; - private final ImmutableList<Pair<Receiver, Source>> sources; + private final ImmutableList<Pair<IgniteReceiver, Source>> sources; - public FragmentInfo(Pair<Receiver, Source> source) { + public FragmentInfo(Pair<IgniteReceiver, Source> source) { this(ImmutableList.of(source), null); } @@ -36,7 +36,7 @@ public class FragmentInfo { this(null, mapping); } - public FragmentInfo(ImmutableList<Pair<Receiver, Source>> sources, NodesMapping mapping) { + public FragmentInfo(ImmutableList<Pair<IgniteReceiver, Source>> sources, NodesMapping mapping) { this.sources = sources; this.mapping = mapping; } @@ -45,7 +45,7 @@ public class FragmentInfo { return mapping; } - public ImmutableList<Pair<Receiver, Source>> sources() { + public ImmutableList<Pair<IgniteReceiver, Source>> sources() { return sources; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java index 9f02f0c..503b424 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java @@ -29,8 +29,8 @@ import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.Pair; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; -import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; import org.apache.ignite.internal.processors.query.calcite.util.Edge; import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod; @@ -93,7 +93,7 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> { return new OptimisticPlanningException(msg, new Edge(rel, rel.getRight(), 1), cause); } - public FragmentInfo getFragmentInfo(Receiver rel, RelMetadataQuery mq) { + public FragmentInfo getFragmentInfo(IgniteReceiver rel, RelMetadataQuery mq) { return new FragmentInfo(Pair.of(rel, rel.source())); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java index 0a76b4d..c93d04b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java @@ -24,9 +24,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; -import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; -import org.apache.ignite.internal.processors.query.calcite.rel.Sender; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.jetbrains.annotations.NotNull; @@ -44,8 +44,8 @@ public class RelMetadataQueryEx extends RelMetadataQuery { IgniteJoin.class, IgniteProject.class, IgniteTableScan.class, - Receiver.class, - Sender.class + IgniteReceiver.class, + IgniteSender.class )); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ContextValue.java similarity index 52% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ContextValue.java index e832da4..0fe3dc5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ContextValue.java @@ -14,31 +14,31 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.calcite.exec; +package org.apache.ignite.internal.processors.query.calcite.prepare; -import java.util.Collections; -import java.util.List; +import org.apache.calcite.DataContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** * */ -public abstract class AbstractNode<T> implements Node<T>, Source { - protected final Sink<T> target; - protected List<Source> sources; +public enum ContextValue { + QUERY_ID("_query_id", GridCacheVersion.class), + PLANNER_CONTEXT("_planner_context", PlannerContext.class); - protected AbstractNode(Sink<T> target) { - this.target = target; - } + private final String valueName; + private final Class type; - @Override public void sources(List<Source> sources) { - this.sources = Collections.unmodifiableList(sources); + ContextValue(String valueName, Class type) { + this.valueName = valueName; + this.type = type; } - public void signal(int idx) { - sources.get(idx).signal(); + public String valueName() { + return valueName; } - @Override public void signal() { - sources.forEach(Source::signal); + public <T> T get(DataContext ctx) { + return (T) type.cast(ctx.get(valueName)); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java index 06aaa024..3c2d14c 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java @@ -26,15 +26,9 @@ import org.apache.calcite.schema.SchemaPlus; /** * */ -class DataContextImpl implements DataContext { +public class DataContextImpl implements DataContext { /** */ - private final JavaTypeFactory typeFactory; - - /** */ - private final SchemaPlus schema; - - /** */ - private final QueryProvider queryProvider; + private final PlannerContext ctx; /** */ private final Map<String, Object> params; @@ -43,31 +37,31 @@ class DataContextImpl implements DataContext { * @param params Parameters. * @param ctx Query context. */ - DataContextImpl(Map<String, Object> params, PlannerContext ctx) { - typeFactory = ctx.typeFactory(); - schema = ctx.schema(); - queryProvider = ctx.queryProvider(); - + public DataContextImpl(Map<String, Object> params, PlannerContext ctx) { this.params = params; + this.ctx = ctx; } /** {@inheritDoc} */ @Override public SchemaPlus getRootSchema() { - return schema; + return ctx.schema(); } /** {@inheritDoc} */ @Override public JavaTypeFactory getTypeFactory() { - return typeFactory; + return ctx.typeFactory(); } /** {@inheritDoc} */ @Override public QueryProvider getQueryProvider() { - return queryProvider; + return ctx.queryProvider(); } /** {@inheritDoc} */ @Override public Object get(String name) { + if (ContextValue.PLANNER_CONTEXT.valueName().equals(name)) + return ctx; + return params.get(name); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java similarity index 90% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java index 0287236..98cf908 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java @@ -28,14 +28,14 @@ import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; /** * */ -public final class Receiver extends AbstractRelNode implements IgniteRel { +public final class IgniteReceiver extends AbstractRelNode implements IgniteRel { private final Source source; /** * @param cluster Cluster this relational expression belongs to * @param traits Trait set. */ - public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, Source source) { + public IgniteReceiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, Source source) { super(cluster, traits); this.rowType = rowType; this.source = source; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java similarity index 79% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java index fb1f277..e89712a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java @@ -16,6 +16,7 @@ package org.apache.ignite.internal.processors.query.calcite.rel; +import java.util.List; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; @@ -25,12 +26,11 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDist import org.apache.ignite.internal.processors.query.calcite.splitter.Target; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef; import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; -import org.jetbrains.annotations.NotNull; /** * */ -public final class Sender extends SingleRel implements IgniteRel { +public final class IgniteSender extends SingleRel implements IgniteRel { private Target target; /** @@ -39,16 +39,20 @@ public final class Sender extends SingleRel implements IgniteRel { * @param traits Trait set. * @param input Input relational expression */ - public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) { + public IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input) { super(cluster, traits, input); } - private Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input, @NotNull Target target) { + private IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input, Target target) { super(cluster, traits, input); this.target = target; } + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new IgniteSender(getCluster(), traitSet, sole(inputs), target); + } + /** {@inheritDoc} */ @Override public <T> T implement(RelImplementor<T> implementor) { return implementor.implement(this); @@ -62,7 +66,7 @@ public final class Sender extends SingleRel implements IgniteRel { return target; } - public static Sender create(RelNode input, Target target) { + public static IgniteSender create(RelNode input, Target target) { RelOptCluster cluster = input.getCluster(); RelMetadataQuery mq = cluster.getMetadataQuery(); @@ -70,6 +74,6 @@ public final class Sender extends SingleRel implements IgniteRel { .replace(IgniteRel.IGNITE_CONVENTION) .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.distribution(input, mq)); - return new Sender(cluster, traits, input, target); + return new IgniteSender(cluster, traits, input, target); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java index 24938a1..a243f52 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.rule; import org.apache.calcite.tools.RuleSet; import org.apache.calcite.tools.RuleSets; +import org.apache.ignite.internal.processors.query.calcite.exec.Interpretable; import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; /** @@ -37,6 +38,13 @@ public enum PlannerPhase { @Override public RuleSet getRules(PlannerContext ctx) { return RuleSets.ofList(IgniteRules.logicalRules(ctx)); } + }, + + /** */ + PHYSICAL("Execution tree building") { + @Override public RuleSet getRules(PlannerContext ctx) { + return RuleSets.ofList(Interpretable.RULES); + } }; public final String description; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java index 3007a85..14b8191 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java @@ -17,12 +17,15 @@ package org.apache.ignite.internal.processors.query.calcite.schema; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; import org.apache.calcite.schema.TranslatableTable; import org.apache.calcite.schema.impl.AbstractTable; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo; @@ -36,7 +39,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.internal.CU; /** */ -public class IgniteTable extends AbstractTable implements TranslatableTable { +public class IgniteTable extends AbstractTable implements TranslatableTable, ScannableTable { private final String tableName; private final String cacheName; private final RowType rowType; @@ -82,4 +85,8 @@ public class IgniteTable extends AbstractTable implements TranslatableTable { public FragmentInfo fragmentInfo(PlannerContext ctx) { return new FragmentInfo(ctx.mapForCache(CU.cacheId(cacheName), ctx.topologyVersion())); } + + @Override public Enumerable<Object[]> scan(DataContext root) { + throw new AssertionError(); // TODO + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java index d1a00cd..24534ec 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.serialize.relation; import java.util.List; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; -import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; import org.apache.ignite.internal.processors.query.calcite.serialize.type.DataType; import org.apache.ignite.internal.processors.query.calcite.splitter.Source; import org.apache.ignite.internal.processors.query.calcite.splitter.SourceImpl; @@ -38,14 +38,14 @@ public class ReceiverNode extends RelGraphNode { this.source = source; } - public static ReceiverNode create(Receiver rel) { + public static ReceiverNode create(IgniteReceiver rel) { Source source = new SourceImpl(rel.source().exchangeId(), rel.source().mapping()); return new ReceiverNode(rel.getTraitSet(), DataType.fromType(rel.getRowType()), source); } @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) { - return new Receiver(ctx.getCluster(), + return new IgniteReceiver(ctx.getCluster(), traitSet.toTraitSet(ctx.getCluster()), dataType.toRelDataType(ctx.getTypeFactory()), source); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java index c3fb982..6e534d2 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java @@ -24,10 +24,10 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; -import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; -import org.apache.ignite.internal.processors.query.calcite.rel.Sender; import org.apache.ignite.internal.processors.query.calcite.serialize.expression.RexToExpTranslator; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor; @@ -69,11 +69,11 @@ public class RelToGraphConverter { return new Item(graph.addNode(curParent, TableScanNode.create(rel)), Commons.cast(rel.getInputs())); } - @Override public Item implement(Receiver rel) { + @Override public Item implement(IgniteReceiver rel) { return new Item(graph.addNode(curParent, ReceiverNode.create(rel)), Collections.emptyList()); } - @Override public Item implement(Sender rel) { + @Override public Item implement(IgniteSender rel) { return new Item(graph.addNode(curParent, SenderNode.create(rel)), Commons.cast(rel.getInputs())); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java index 94671dd..5164eed 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.serialize.relation; import java.util.List; import org.apache.calcite.rel.RelNode; -import org.apache.ignite.internal.processors.query.calcite.rel.Sender; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; import org.apache.ignite.internal.processors.query.calcite.splitter.Target; import org.apache.ignite.internal.util.typedef.F; @@ -32,11 +32,11 @@ public class SenderNode extends RelGraphNode { this.target = target; } - public static SenderNode create(Sender rel) { + public static SenderNode create(IgniteSender rel) { return new SenderNode(rel.target()); } @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) { - return Sender.create(F.first(children), target); + return IgniteSender.create(F.first(children), target); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java index 45c4591..e523f50 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java @@ -25,8 +25,8 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentInfo; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; -import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; -import org.apache.ignite.internal.processors.query.calcite.rel.Sender; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait; import org.apache.ignite.internal.util.typedef.F; @@ -55,11 +55,11 @@ public class Fragment implements Source { else mapping = info.mapping().deduplicate(); - ImmutableList<Pair<Receiver, Source>> sources = info.sources(); + ImmutableList<Pair<IgniteReceiver, Source>> sources = info.sources(); if (!F.isEmpty(sources)) { - for (Pair<Receiver, Source> input : sources) { - Receiver receiver = input.left; + for (Pair<IgniteReceiver, Source> input : sources) { + IgniteReceiver receiver = input.left; Source source = input.right; source.init(mapping, receiver.distribution(), ctx, mq); @@ -74,7 +74,7 @@ public class Fragment implements Source { @Override public void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) { assert remote(); - ((Sender) root).init(new TargetImpl(exchangeId, mapping, distribution)); + ((IgniteSender) root).init(new TargetImpl(exchangeId, mapping, distribution)); init(ctx, mq); } @@ -88,6 +88,6 @@ public class Fragment implements Source { } private boolean remote() { - return root instanceof Sender; + return root instanceof IgniteSender; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java index b0369bd..fd0c75f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java @@ -24,8 +24,8 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException; import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx; import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; -import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; -import org.apache.ignite.internal.processors.query.calcite.rel.Sender; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; import org.apache.ignite.internal.processors.query.calcite.util.Edge; import org.apache.ignite.internal.util.typedef.F; @@ -62,11 +62,11 @@ public class QueryPlan { RelOptCluster cluster = child.getCluster(); RelTraitSet traitSet = child.getTraitSet(); - Sender sender = new Sender(cluster, traitSet, child); + IgniteSender sender = new IgniteSender(cluster, traitSet, child); Fragment fragment = new Fragment(sender); fragments.add(fragment); - parent.replaceInput(edge.childIdx(), new Receiver(cluster, traitSet, sender.getRowType(), fragment)); + parent.replaceInput(edge.childIdx(), new IgniteReceiver(cluster, traitSet, sender.getRowType(), fragment)); } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java index c15c46a..53ae9df 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java @@ -23,9 +23,9 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; -import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; -import org.apache.ignite.internal.processors.query.calcite.rel.Sender; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; import org.apache.ignite.internal.processors.query.calcite.util.IgniteRelShuttle; /** @@ -50,18 +50,18 @@ public class Splitter extends IgniteRelShuttle { RelTraitSet inputTraits = input.getTraitSet(); RelTraitSet outputTraits = rel.getTraitSet(); - Sender sender = new Sender(cluster, inputTraits, visit(input)); + IgniteSender sender = new IgniteSender(cluster, inputTraits, visit(input)); Fragment fragment = new Fragment(sender); fragments.add(fragment); - return new Receiver(cluster, outputTraits, sender.getRowType(), fragment); + return new IgniteReceiver(cluster, outputTraits, sender.getRowType(), fragment); } - @Override public RelNode visit(Receiver rel) { + @Override public RelNode visit(IgniteReceiver rel) { throw new AssertionError("An attempt to split an already split task."); } - @Override public RelNode visit(Sender rel) { + @Override public RelNode visit(IgniteSender rel) { throw new AssertionError("An attempt to split an already split task."); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java index eb1129a..9734885 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java @@ -22,9 +22,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; -import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; -import org.apache.ignite.internal.processors.query.calcite.rel.Sender; /** * @@ -42,11 +42,11 @@ public class IgniteRelShuttle extends RelShuttleImpl { return visitChild(rel, 0, rel.getInput()); } - public RelNode visit(Receiver rel) { + public RelNode visit(IgniteReceiver rel) { return rel; } - public RelNode visit(Sender rel) { + public RelNode visit(IgniteSender rel) { return visitChild(rel, 0, rel.getInput()); } @@ -65,10 +65,10 @@ public class IgniteRelShuttle extends RelShuttleImpl { return visit((IgniteFilter)rel); if (rel instanceof IgniteProject) return visit((IgniteProject)rel); - if (rel instanceof Receiver) - return visit((Receiver)rel); - if (rel instanceof Sender) - return visit((Sender)rel); + if (rel instanceof IgniteReceiver) + return visit((IgniteReceiver)rel); + if (rel instanceof IgniteSender) + return visit((IgniteSender)rel); if (rel instanceof IgniteTableScan) return visit((IgniteTableScan)rel); if (rel instanceof IgniteJoin) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java index 58ba064..bcbe604 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java @@ -20,10 +20,10 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; -import org.apache.ignite.internal.processors.query.calcite.rel.Receiver; -import org.apache.ignite.internal.processors.query.calcite.rel.Sender; /** * @@ -34,7 +34,7 @@ public interface RelImplementor<T> { T implement(IgniteJoin rel); T implement(IgniteProject rel); T implement(IgniteTableScan rel); - T implement(Receiver rel); - T implement(Sender rel); + T implement(IgniteReceiver rel); + T implement(IgniteSender rel); T implement(IgniteRel other); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScanIterator.java similarity index 95% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScanIterator.java index c8d8a16..f603fcc 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScanIterator.java @@ -34,7 +34,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFIN /** * */ -public class ScanIterator<T> extends GridCloseableIteratorAdapter<T> { +public class TableScanIterator<T> extends GridCloseableIteratorAdapter<T> { private final int cacheId; private final Iterator<GridDhtLocalPartition> parts; private final Function<CacheDataRow, T> typeWrapper; @@ -54,7 +54,7 @@ public class ScanIterator<T> extends GridCloseableIteratorAdapter<T> { */ private T next; - public ScanIterator(int cacheId, Iterator<GridDhtLocalPartition> parts, Function<CacheDataRow, T> typeWrapper, + public TableScanIterator(int cacheId, Iterator<GridDhtLocalPartition> parts, Function<CacheDataRow, T> typeWrapper, Predicate<CacheDataRow> typeFilter) { this.cacheId = cacheId; this.parts = parts; diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java index 644633b..a5159d3 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -21,7 +21,11 @@ package org.apache.ignite.internal.processors.query.calcite; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.UUID; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.ConventionTraitDef; @@ -34,9 +38,15 @@ import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.tools.Frameworks; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.calcite.exec.ConsumerNode; +import org.apache.ignite.internal.processors.query.calcite.exec.Interpretable; +import org.apache.ignite.internal.processors.query.calcite.exec.Node; import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService; +import org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue; +import org.apache.ignite.internal.processors.query.calcite.prepare.DataContextImpl; import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner; import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext; import org.apache.ignite.internal.processors.query.calcite.prepare.Query; @@ -56,17 +66,22 @@ import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTra import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; import org.apache.ignite.internal.processors.query.calcite.type.RowType; import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.testframework.junits.GridTestKernalContext; +import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import static org.apache.ignite.internal.processors.query.calcite.exec.Interpretable.INTERPRETABLE; + /** * */ -//@WithSystemProperty(key = "calcite.debug", value = "true") +@WithSystemProperty(key = "calcite.debug", value = "true") public class CalciteQueryProcessorTest extends GridCommonAbstractTest { private static GridTestKernalContext kernalContext; @@ -89,30 +104,55 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { .field("name", String.class) .field("projectId", Integer.class) .field("cityId", Integer.class) - .build())); + .build()){ + @Override public Enumerable<Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(Arrays.asList( + new Object[]{0, null, 0, "Igor", 0, 1}, + new Object[]{1, null, 1, "Roman", 0, 0} + )); + } + }); publicSchema.addTable(new IgniteTable("Project", "Project", RowType.builder() .keyField("id", Integer.class, true) .field("name", String.class) .field("ver", Integer.class) - .build())); - - + .build()){ + @Override public Enumerable<Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(Arrays.asList( + new Object[]{0, null, 0, "Calcite", 1}, + new Object[]{1, null, 1, "Ignite", 1} + )); + } + }); publicSchema.addTable(new IgniteTable("Country", "Country", RowType.builder() .keyField("id", Integer.class, true) .field("name", String.class) .field("countryCode", Integer.class) - .build())); + .build()){ + @Override public Enumerable<Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(Arrays.<Object[]>asList( + new Object[]{0, null, 0, "Russia", 7} + )); + } + }); publicSchema.addTable(new IgniteTable("City", "City", RowType.builder() .keyField("id", Integer.class, true) .field("name", String.class) .field("countryId", Integer.class) - .build())); + .build()){ + @Override public Enumerable<Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(Arrays.asList( + new Object[]{0, null, 0, "Moscow", 0}, + new Object[]{1, null, 1, "Saint Petersburg", 0} + )); + } + }); schema = Frameworks .createRootSchema(false) @@ -487,6 +527,96 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest { } @Test + public void testPhysicalPlan() throws Exception { + String sql = "SELECT d.id, d.name, d.projectId, p.name0, p.ver0 " + + "FROM PUBLIC.Developer d JOIN (" + + "SELECT pp.id as id0, pp.name as name0, pp.ver as ver0 FROM PUBLIC.Project pp" + + ") p " + + "ON d.projectId = p.id0 " + + "WHERE (d.projectId + 1) > ?"; + + PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{-10}, this::context); + + assertNotNull(ctx); + + RelTraitDef[] traitDefs = { + DistributionTraitDef.INSTANCE, + ConventionTraitDef.INSTANCE + }; + + try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){ + assertNotNull(planner); + + Query query = Commons.plannerContext(ctx).query(); + + assertNotNull(planner); + + // Parse + SqlNode sqlNode = planner.parse(query.sql()); + + // Validate + sqlNode = planner.validate(sqlNode); + + // Convert to Relational operators graph + RelRoot relRoot = planner.rel(sqlNode); + + RelNode rel = relRoot.rel; + + // Transformation chain + rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet()); + + RelTraitSet desired = rel.getCluster().traitSet() + .replace(IgniteRel.IGNITE_CONVENTION) + .replace(IgniteDistributions.single()) + .simplify(); + + rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired); + + assertNotNull(relRoot); + + QueryPlan plan = new Splitter().go((IgniteRel) rel); + + assertNotNull(plan); + + plan.init(ctx); + + assertNotNull(plan); + + assertTrue(plan.fragments().size() == 2); + + desired = rel.getCluster().traitSetOf(INTERPRETABLE); + + RelNode phys = planner.transform(PlannerType.VOLCANO, PlannerPhase.PHYSICAL, plan.fragments().get(1).root(), desired); + + assertNotNull(phys); + + Map<String, Object> params = ctx.query().params(F.asMap(ContextValue.QUERY_ID.valueName(), new GridCacheVersion())); + + Interpretable.Implementor<Object[]> implementor = new Interpretable.Implementor<>(new DataContextImpl(params, ctx)); + + Node<Object[]> exec = implementor.go(phys.getInput(0)); + + assertNotNull(exec); + + assertTrue(exec instanceof ConsumerNode); + + ConsumerNode consumer = (ConsumerNode) exec; + + assertTrue(consumer.hasNext()); + + ArrayList<Object[]> res = new ArrayList<>(); + + while (consumer.hasNext()) + res.add(consumer.next()); + + assertFalse(res.isEmpty()); + + Assert.assertArrayEquals(new Object[]{0, "Igor", 0, "Calcite", 1}, res.get(0)); + Assert.assertArrayEquals(new Object[]{1, "Roman", 0, "Calcite", 1}, res.get(1)); + } + } + + @Test public void testSplitterCollocatedReplicatedReplicated() throws Exception { String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " + "FROM PUBLIC.Developer d JOIN (" + diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java index 744280a..dacf823 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java @@ -50,14 +50,14 @@ public class ExecutionTest extends GridCommonAbstractTest { new Object[]{1, "Roman", "Kondakov"}, new Object[]{2, "Ivan", "Pavlukhin"}, new Object[]{3, "Alexey", "Goncharuk"} - ).iterator()); + )); ScanNode projects = new ScanNode(join.sink(1), Arrays.asList( new Object[]{0, 2, "Calcite"}, new Object[]{1, 1, "SQL"}, new Object[]{2, 2, "Ignite"}, new Object[]{3, 0, "Core"} - ).iterator()); + )); join.sources(Arrays.asList(persons, projects));