This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new c7bdc11 IGNITE-12871: Calcite integration. Broadcast to hash
conversion without exchange. This closes #7638
c7bdc11 is described below
commit c7bdc119c0d2e085708877013a952bd9ccd915ff
Author: Igor Seliverstov <[email protected]>
AuthorDate: Tue Apr 14 15:51:01 2020 +0300
IGNITE-12871: Calcite integration. Broadcast to hash conversion without
exchange. This closes #7638
---
.../query/calcite/exec/LogicalRelImplementor.java | 37 +++++++++
.../query/calcite/exec/PhysicalRelImplementor.java | 31 +++++++
.../processors/query/calcite/prepare/Cloner.java | 8 ++
.../query/calcite/prepare/FragmentSplitter.java | 83 +++++++++++++------
.../processors/query/calcite/prepare/Splitter.java | 6 ++
.../query/calcite/rel/IgniteRelVisitor.java | 5 ++
.../query/calcite/rel/IgniteTrimExchange.java | 71 ++++++++++++++++
.../calcite/serialize/PhysicalRelVisitor.java | 5 ++
.../calcite/serialize/ReceiverPhysicalRel.java | 2 +
.../calcite/serialize/RelToPhysicalConverter.java | 18 +++++
.../calcite/serialize/TableModifyPhysicalRel.java | 2 +
.../calcite/serialize/TrimExchangePhysicalRel.java | 94 ++++++++++++++++++++++
.../query/calcite/trait/DistributionTraitDef.java | 15 +++-
.../query/calcite/trait/IgniteDistributions.java | 2 +-
14 files changed, 352 insertions(+), 27 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 68c55fc..c340d2e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -21,11 +21,14 @@ import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.function.ToIntFunction;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
@@ -54,9 +57,11 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
import
org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
import org.apache.ignite.internal.processors.query.calcite.trait.Destination;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -126,6 +131,20 @@ public class LogicalRelImplementor implements
IgniteRelVisitor<Node<Object[]>> {
}
/** {@inheritDoc} */
+ @Override public Node<Object[]> visit(IgniteTrimExchange rel) {
+ RelTarget target = rel.target();
+
+ assert target != null && target.mapping() != null &&
!F.isEmpty(target.mapping().assignments());
+
+ Predicate<Object[]> predicate = partitionFilter(rel.distribution(),
target.mapping().assignments().size());
+
+ FilterNode node = new FilterNode(ctx, predicate);
+ node.register(visit(rel.getInput()));
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
@Override public Node<Object[]> visit(IgniteProject rel) {
Function<Object[], Object[]> projection =
expressionFactory.project(ctx, rel.getProjects(), rel.getInput().getRowType());
ProjectNode node = new ProjectNode(ctx, projection);
@@ -243,6 +262,24 @@ public class LogicalRelImplementor implements
IgniteRelVisitor<Node<Object[]>> {
}
/** */
+ private Predicate<Object[]> partitionFilter(IgniteDistribution distr, int
partitions) {
+ assert distr.getType() == RelDistribution.Type.HASH_DISTRIBUTED;
+ assert !F.isEmpty(ctx.partitions());
+
+ boolean[] filter = new boolean[partitions];
+
+ for (int part : ctx.partitions())
+ filter[part] = true;
+
+ DistributionFunction function = distr.function();
+ ImmutableIntList keys = distr.getKeys();
+
+ ToIntFunction<Object> partFunction =
function.partitionFunction(partitionService, partitions, keys);
+
+ return o -> filter[partFunction.applyAsInt(o)];
+ }
+
+ /** */
public Node<Object[]> go(IgniteRel rel) {
return visit(rel);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PhysicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PhysicalRelImplementor.java
index 1760465..d1c2282 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PhysicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PhysicalRelImplementor.java
@@ -18,10 +18,14 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.List;
+import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.function.ToIntFunction;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
@@ -46,8 +50,10 @@ import
org.apache.ignite.internal.processors.query.calcite.serialize.ReceiverPhy
import
org.apache.ignite.internal.processors.query.calcite.serialize.SenderPhysicalRel;
import
org.apache.ignite.internal.processors.query.calcite.serialize.TableModifyPhysicalRel;
import
org.apache.ignite.internal.processors.query.calcite.serialize.TableScanPhysicalRel;
+import
org.apache.ignite.internal.processors.query.calcite.serialize.TrimExchangePhysicalRel;
import
org.apache.ignite.internal.processors.query.calcite.serialize.ValuesPhysicalRel;
import org.apache.ignite.internal.processors.query.calcite.trait.Destination;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.util.typedef.F;
@@ -105,6 +111,16 @@ public class PhysicalRelImplementor implements
PhysicalRelVisitor<Node<Object[]>
}
/** {@inheritDoc} */
+ @Override public Node<Object[]> visit(TrimExchangePhysicalRel rel) {
+ Predicate<Object[]> predicate = partitionFilter(rel.function(),
rel.distributionKeys(), rel.partitions());
+
+ FilterNode node = new FilterNode(ctx, predicate);
+ node.register(visit(rel.input()));
+
+ return node;
+ }
+
+ /** {@inheritDoc} */
@Override public Node<Object[]> visit(ProjectPhysicalRel rel) {
ProjectNode node = new ProjectNode(ctx, expressionFactory.project(ctx,
rel.projects(), rel.rowType()));
node.register(visit(rel.input()));
@@ -204,6 +220,21 @@ public class PhysicalRelImplementor implements
PhysicalRelVisitor<Node<Object[]>
}
/** */
+ private Predicate<Object[]> partitionFilter(DistributionFunction function,
ImmutableIntList keys, int partitions) {
+ assert function.type() == RelDistribution.Type.HASH_DISTRIBUTED;
+ assert !F.isEmpty(ctx.partitions());
+
+ boolean[] filter = new boolean[partitions];
+
+ for (int part : ctx.partitions())
+ filter[part] = true;
+
+ ToIntFunction<Object> partFunction =
function.partitionFunction(partitionService, partitions, keys);
+
+ return o -> filter[partFunction.applyAsInt(o)];
+ }
+
+ /** */
public Node<Object[]> go(PhysicalRel root) {
return visit(root);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 1edb424..238de49 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -36,6 +36,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
import org.apache.ignite.internal.util.typedef.F;
@@ -95,6 +96,13 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
}
/** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTrimExchange rel) {
+ RelNode input = visit((IgniteRel) rel.getInput());
+
+ return new IgniteTrimExchange(cluster, rel.getTraitSet(), input);
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteProject rel) {
RelNode input = visit((IgniteRel) rel.getInput());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
index aae90d9..18655bf 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
@@ -40,9 +40,11 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -84,6 +86,7 @@ public class FragmentSplitter implements
IgniteRelVisitor<IgniteRel> {
@Override public IgniteRel visit(IgniteSender rel) {
// a split may happen on BiRel inputs merge. A sender node cannot be a
BiRel input.
assert rel != cutPoint;
+ assert cutPoint != null;
RelNode input = rel.getInput();
RelNode newInput = visit((IgniteRel) input);
@@ -96,30 +99,25 @@ public class FragmentSplitter implements
IgniteRelVisitor<IgniteRel> {
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteAggregate rel) {
- boolean split = rel == cutPoint;
+ boolean split = cutPoint(rel);
RelNode input = rel.getInput();
RelNode newInput = visit((IgniteRel) input);
if (input != newInput) {
- IgniteDistribution newDistr = IgniteDistributions.aggregate(mq,
- newInput, rel.getGroupSet(), rel.getGroupSets(),
rel.getAggCallList());
+ checkDistributionEqual(input, newInput);
- assert newDistr != null;
-
- RelTraitSet traits = rel.getTraitSet()
- .replace(newDistr);
-
- rel = (IgniteAggregate) rel.copy(traits,
ImmutableList.of(newInput));
+ rel = (IgniteAggregate) rel.copy(rel.getTraitSet(),
ImmutableList.of(newInput));
}
- return split ? split(rel, rel.getTraitSet()) : rel;
+ return split ? split(rel) : rel;
}
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteMapAggregate rel) {
// a split may happen on BiRel inputs merge. An map aggregate node
cannot be a BiRel input.
assert rel != cutPoint;
+ assert cutPoint != null;
RelNode input = rel.getInput();
RelNode newInput = visit((IgniteRel) input);
@@ -143,7 +141,7 @@ public class FragmentSplitter implements
IgniteRelVisitor<IgniteRel> {
// a split may happen on BiRel inputs merge. An reduce aggregate node
doesn't have a
// physical mapping (because it always goes after receiver node), so,
its merge with
// any input cannot cause the split.
- assert rel != cutPoint;
+ assert cutPoint == null;
if (U.assertionsEnabled()) {
RelNode input = rel.getInput();
@@ -157,7 +155,7 @@ public class FragmentSplitter implements
IgniteRelVisitor<IgniteRel> {
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteFilter rel) {
- boolean split = rel == cutPoint;
+ boolean split = cutPoint(rel);
RelNode input = rel.getInput();
IgniteRel newInput = visit((IgniteRel) input);
@@ -169,12 +167,32 @@ public class FragmentSplitter implements
IgniteRelVisitor<IgniteRel> {
rel = (IgniteFilter) rel.copy(traits, ImmutableList.of(newInput));
}
- return split ? split(rel, rel.getTraitSet()) : rel;
+ return split ? split(rel) : rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTrimExchange rel) {
+ boolean split = cutPoint(rel);
+
+ // after split we need to remove the exchange, return an original
+ // distribution and propagate the distribution to a fragment root.
+ if (split)
+ return split((IgniteRel) rel.getInput());
+ else if (cutPoint == null)
+ return (IgniteRel) rel.getInput();
+
+ RelNode input = rel.getInput();
+ IgniteRel newInput = visit((IgniteRel) input);
+
+ if (input == newInput)
+ return rel;
+
+ return (IgniteRel) rel.copy(rel.getTraitSet(), F.asList(newInput));
}
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteProject rel) {
- boolean split = rel == cutPoint;
+ boolean split = cutPoint(rel);
RelNode input = rel.getInput();
RelNode newInput = visit((IgniteRel) input);
@@ -186,12 +204,15 @@ public class FragmentSplitter implements
IgniteRelVisitor<IgniteRel> {
rel = (IgniteProject) rel.copy(traits, ImmutableList.of(newInput));
}
- return split ? split(rel, rel.getTraitSet()) : rel;
+ return split ? split(rel) : rel;
}
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteJoin rel) {
- boolean split = rel == cutPoint;
+ if (cutPoint == null)
+ return rel; // no need to check deeper
+
+ boolean split = cutPoint(rel);
RelNode left = rel.getLeft();
RelNode right = rel.getRight();
@@ -208,32 +229,35 @@ public class FragmentSplitter implements
IgniteRelVisitor<IgniteRel> {
rel = (IgniteJoin) rel.copy(rel.getTraitSet(),
ImmutableList.of(newLeft, newRight));
}
- return split ? split(rel, rel.getTraitSet()) : rel;
+ return split ? split(rel) : rel;
}
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteTableModify rel) {
- boolean split = rel == cutPoint;
+ boolean split = cutPoint(rel);
RelNode input = rel.getInput();
RelNode newInput = visit((IgniteRel) input);
- if (input != newInput)
+ if (input != newInput) {
+ checkDistributionEqual(input, newInput);
+
rel = (IgniteTableModify) rel.copy(rel.getTraitSet(),
ImmutableList.of(newInput));
+ }
- return split ? split(rel, rel.getTraitSet()) : rel;
+ return split ? split(rel) : rel;
}
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteTableScan rel) {
- return rel == cutPoint ? split(rel, rel.getTraitSet()) : rel;
+ return cutPoint(rel) ? split(rel) : rel;
}
/** {@inheritDoc} */
@Override public IgniteRel visit(IgniteReceiver rel) {
// a split may happen on BiRel inputs merge. A receiver doesn't have a
// physical mapping, so, its merge with any input cannot cause the
split.
- assert rel != cutPoint;
+ assert cutPoint == null;
return rel;
}
@@ -242,7 +266,7 @@ public class FragmentSplitter implements
IgniteRelVisitor<IgniteRel> {
@Override public IgniteRel visit(IgniteValues rel) {
// a split may happen on BiRel inputs merge. A values node doesn't
have a
// physical mapping, so, its merge with any input cannot cause the
split.
- assert rel != cutPoint;
+ assert cutPoint == null;
return rel;
}
@@ -258,8 +282,19 @@ public class FragmentSplitter implements
IgniteRelVisitor<IgniteRel> {
}
/** */
- private IgniteRel split(IgniteRel input, RelTraitSet traits) {
+ public boolean cutPoint(RelNode rel) {
+ boolean res = rel == cutPoint;
+
+ if (res)
+ cutPoint = null;
+
+ return res;
+ }
+
+ /** */
+ private IgniteRel split(IgniteRel input) {
RelOptCluster cluster = input.getCluster();
+ RelTraitSet traits = input.getTraitSet();
Fragment fragment = new Fragment(new IgniteSender(cluster, traits,
input));
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index cacaed6..1cbfc9e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -35,6 +35,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
/**
@@ -66,6 +67,11 @@ public class Splitter implements IgniteRelVisitor<IgniteRel>
{
}
/** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTrimExchange rel) {
+ return visitChildren(rel);
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteRel visit(IgniteProject rel) {
return visitChildren(rel);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 94b2a34..cc178ce 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -34,6 +34,11 @@ public interface IgniteRelVisitor<T> {
/**
* See {@link IgniteRelVisitor#visit(IgniteRel)}
*/
+ T visit(IgniteTrimExchange rel);
+
+ /**
+ * See {@link IgniteRelVisitor#visit(IgniteRel)}
+ */
T visit(IgniteProject rel);
/**
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
new file mode 100644
index 0000000..4126060
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.RelTargetAware;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+
+import static
org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
+
+/**
+ *
+ */
+public class IgniteTrimExchange extends SingleRel implements IgniteRel,
RelTargetAware {
+ /** */
+ private RelTarget target;
+
+ /** */
+ public IgniteTrimExchange(RelOptCluster cluster, RelTraitSet traits,
RelNode input) {
+ super(cluster, traits, input);
+
+ assert
input.getTraitSet().getTrait(DistributionTraitDef.INSTANCE).getType() ==
BROADCAST_DISTRIBUTED;
+ assert traits.getTrait(DistributionTraitDef.INSTANCE).getType() ==
HASH_DISTRIBUTED;
+ }
+
+ /** */
+ @Override public RelNode copy(RelTraitSet traits, List<RelNode> inputs) {
+ RelNode input = sole(inputs);
+
+ assert
input.getTraitSet().getTrait(DistributionTraitDef.INSTANCE).getType() ==
BROADCAST_DISTRIBUTED;
+ assert traits.getTrait(DistributionTraitDef.INSTANCE).getType() ==
HASH_DISTRIBUTED;
+
+ return new IgniteTrimExchange(getCluster(), traits, input);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** */
+ public RelTarget target() {
+ return target;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void target(RelTarget target) {
+ this.target = target;
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/PhysicalRelVisitor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/PhysicalRelVisitor.java
index 5bbbd0e..028daa0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/PhysicalRelVisitor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/PhysicalRelVisitor.java
@@ -34,6 +34,11 @@ public interface PhysicalRelVisitor<T> {
/**
* See {@link PhysicalRelVisitor#visit(PhysicalRel)}
*/
+ T visit(TrimExchangePhysicalRel rel);
+
+ /**
+ * See {@link PhysicalRelVisitor#visit(PhysicalRel)}
+ */
T visit(ProjectPhysicalRel rel);
/**
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverPhysicalRel.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverPhysicalRel.java
index ffe0ba3..b2d4894 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverPhysicalRel.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverPhysicalRel.java
@@ -125,6 +125,8 @@ public class ReceiverPhysicalRel implements PhysicalRel {
for (int i = 0; i < sourcesSize; i++)
sources.add((UUID) in.readObject());
+ this.sources = sources;
+
int collationsSize = in.readInt();
if (collationsSize == -1)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
index d73a744..0466219 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.processors.query.calcite.serialize;
import com.google.common.collect.ImmutableList;
import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
@@ -27,6 +28,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.exp.RexToExpTran
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggCallExp;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.type.DataType;
import
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
@@ -40,10 +42,13 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
/**
* Converts RelNode tree to physical rel tree.
@@ -84,6 +89,19 @@ public class RelToPhysicalConverter implements
IgniteRelVisitor<PhysicalRel> {
}
/** {@inheritDoc} */
+ @Override public PhysicalRel visit(IgniteTrimExchange rel) {
+ IgniteDistribution distr = rel.distribution();
+ RelTarget target = rel.target();
+
+ assert distr.getType() == RelDistribution.Type.HASH_DISTRIBUTED;
+ assert target != null && target.mapping() != null &&
!F.isEmpty(target.mapping().assignments());
+
+ int partitions = target.mapping().assignments().size();
+
+ return new TrimExchangePhysicalRel(distr.function(), distr.getKeys(),
partitions, visit((IgniteRel) rel.getInput()));
+ }
+
+ /** {@inheritDoc} */
@Override public PhysicalRel visit(IgniteProject rel) {
return new
ProjectPhysicalRel(DataType.fromType(rel.getInput().getRowType()),
rexTranslator.translate(rel.getProjects()), visit((IgniteRel)
rel.getInput()));
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableModifyPhysicalRel.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableModifyPhysicalRel.java
index 651ff7b..9ce72c3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableModifyPhysicalRel.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableModifyPhysicalRel.java
@@ -99,6 +99,8 @@ public class TableModifyPhysicalRel implements PhysicalRel {
for (int i = 0; i < tableNameSize; i++)
tableName.add(in.readUTF());
+ this.tableName = tableName;
+
operation = TableModify.Operation.values()[in.readByte()];
int columnsSize = in.readInt();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TrimExchangePhysicalRel.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TrimExchangePhysicalRel.java
new file mode 100644
index 0000000..d7cdc3a
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TrimExchangePhysicalRel.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.calcite.util.ImmutableIntList;
+import
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+
+/**
+ *
+ */
+public class TrimExchangePhysicalRel implements PhysicalRel {
+ /** */
+ private DistributionFunction function;
+
+ /** */
+ private ImmutableIntList keys;
+
+ /** */
+ private int partitions;
+
+ /** */
+ private PhysicalRel input;
+
+ /** */
+ public TrimExchangePhysicalRel() {
+ }
+
+ /** */
+ public TrimExchangePhysicalRel(DistributionFunction function,
ImmutableIntList keys, int partitions, PhysicalRel input) {
+ this.function = function;
+ this.keys = keys;
+ this.partitions = partitions;
+ this.input = input;
+ }
+
+ /** */
+ public DistributionFunction function() {
+ return function;
+ }
+
+ /** */
+ public ImmutableIntList distributionKeys() {
+ return keys;
+ }
+
+ /** */
+ public int partitions() {
+ return partitions;
+ }
+
+ /** */
+ public PhysicalRel input() {
+ return input;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T accept(PhysicalRelVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(function);
+ out.writeObject(keys.toIntArray());
+ out.writeInt(partitions);
+ out.writeObject(input);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ function = (DistributionFunction) in.readObject();
+ keys = ImmutableIntList.of((int[]) in.readObject());
+ partitions = in.readInt();
+ input = (PhysicalRel) in.readObject();
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
index f94c0d9..b55c77d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
@@ -21,8 +21,10 @@ import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
import org.apache.ignite.internal.processors.query.calcite.rule.RuleUtils;
/**
@@ -53,8 +55,17 @@ public class DistributionTraitDef extends
RelTraitDef<IgniteDistribution> {
return rel;
RelTraitSet newTraits = rel.getTraitSet().replace(toDist);
- RelNode input = RuleUtils.convert(rel, IgniteDistributions.any()); //
erasing source distribution a bit reduces search space
- RelNode newRel = planner.register(new IgniteExchange(rel.getCluster(),
newTraits, input, toDist), rel);
+ RelNode newRel;
+
+ // special case
+ if (fromDist.getType() == RelDistribution.Type.BROADCAST_DISTRIBUTED
+ && toDist.getType() == RelDistribution.Type.HASH_DISTRIBUTED) {
+ newRel = planner.register(new IgniteTrimExchange(rel.getCluster(),
newTraits, rel), rel);
+ }
+ else {
+ RelNode input = RuleUtils.convert(rel, IgniteDistributions.any());
// erasing source distribution a bit reduces search space
+ newRel = planner.register(new IgniteExchange(rel.getCluster(),
newTraits, input, toDist), rel);
+ }
if (!newRel.getTraitSet().equals(newTraits))
newRel = planner.changeTraits(newRel, newTraits);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 4c0e492..1e4bb02 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -191,7 +191,7 @@ public class IgniteDistributions {
IgniteDistribution out, in;
- if (simpleAggregate(groupSet, groupSets)) {
+ if (simpleAggregate(groupSet, groupSets) && !groupSet.isEmpty()) {
// re-hash by group keys
in = hash(groupSet.asList());
out = hash(ImmutableIntList.range(0, groupSet.cardinality()));