This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new c7bdc11  IGNITE-12871: Calcite integration. Broadcast to hash 
conversion without exchange. This closes #7638
c7bdc11 is described below

commit c7bdc119c0d2e085708877013a952bd9ccd915ff
Author: Igor Seliverstov <[email protected]>
AuthorDate: Tue Apr 14 15:51:01 2020 +0300

    IGNITE-12871: Calcite integration. Broadcast to hash conversion without 
exchange. This closes #7638
---
 .../query/calcite/exec/LogicalRelImplementor.java  | 37 +++++++++
 .../query/calcite/exec/PhysicalRelImplementor.java | 31 +++++++
 .../processors/query/calcite/prepare/Cloner.java   |  8 ++
 .../query/calcite/prepare/FragmentSplitter.java    | 83 +++++++++++++------
 .../processors/query/calcite/prepare/Splitter.java |  6 ++
 .../query/calcite/rel/IgniteRelVisitor.java        |  5 ++
 .../query/calcite/rel/IgniteTrimExchange.java      | 71 ++++++++++++++++
 .../calcite/serialize/PhysicalRelVisitor.java      |  5 ++
 .../calcite/serialize/ReceiverPhysicalRel.java     |  2 +
 .../calcite/serialize/RelToPhysicalConverter.java  | 18 +++++
 .../calcite/serialize/TableModifyPhysicalRel.java  |  2 +
 .../calcite/serialize/TrimExchangePhysicalRel.java | 94 ++++++++++++++++++++++
 .../query/calcite/trait/DistributionTraitDef.java  | 15 +++-
 .../query/calcite/trait/IgniteDistributions.java   |  2 +-
 14 files changed, 352 insertions(+), 27 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 68c55fc..c340d2e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -21,11 +21,14 @@ import java.util.List;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import java.util.function.ToIntFunction;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.schema.ScannableTable;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
@@ -54,9 +57,11 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
 import 
org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
 import org.apache.ignite.internal.processors.query.calcite.trait.Destination;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -126,6 +131,20 @@ public class LogicalRelImplementor implements 
IgniteRelVisitor<Node<Object[]>> {
     }
 
     /** {@inheritDoc} */
+    @Override public Node<Object[]> visit(IgniteTrimExchange rel) {
+        RelTarget target = rel.target();
+
+        assert target != null && target.mapping() != null && 
!F.isEmpty(target.mapping().assignments());
+
+        Predicate<Object[]> predicate = partitionFilter(rel.distribution(), 
target.mapping().assignments().size());
+
+        FilterNode node = new FilterNode(ctx, predicate);
+        node.register(visit(rel.getInput()));
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
     @Override public Node<Object[]> visit(IgniteProject rel) {
         Function<Object[], Object[]> projection = 
expressionFactory.project(ctx, rel.getProjects(), rel.getInput().getRowType());
         ProjectNode node = new ProjectNode(ctx, projection);
@@ -243,6 +262,24 @@ public class LogicalRelImplementor implements 
IgniteRelVisitor<Node<Object[]>> {
     }
 
     /** */
+    private Predicate<Object[]> partitionFilter(IgniteDistribution distr, int 
partitions) {
+        assert distr.getType() == RelDistribution.Type.HASH_DISTRIBUTED;
+        assert !F.isEmpty(ctx.partitions());
+
+        boolean[] filter = new boolean[partitions];
+
+        for (int part : ctx.partitions())
+            filter[part] = true;
+
+        DistributionFunction function = distr.function();
+        ImmutableIntList keys = distr.getKeys();
+
+        ToIntFunction<Object> partFunction = 
function.partitionFunction(partitionService, partitions, keys);
+
+        return o -> filter[partFunction.applyAsInt(o)];
+    }
+
+    /** */
     public Node<Object[]> go(IgniteRel rel) {
         return visit(rel);
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PhysicalRelImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PhysicalRelImplementor.java
index 1760465..d1c2282 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PhysicalRelImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PhysicalRelImplementor.java
@@ -18,10 +18,14 @@
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.List;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
+import java.util.function.ToIntFunction;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.schema.ScannableTable;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
@@ -46,8 +50,10 @@ import 
org.apache.ignite.internal.processors.query.calcite.serialize.ReceiverPhy
 import 
org.apache.ignite.internal.processors.query.calcite.serialize.SenderPhysicalRel;
 import 
org.apache.ignite.internal.processors.query.calcite.serialize.TableModifyPhysicalRel;
 import 
org.apache.ignite.internal.processors.query.calcite.serialize.TableScanPhysicalRel;
+import 
org.apache.ignite.internal.processors.query.calcite.serialize.TrimExchangePhysicalRel;
 import 
org.apache.ignite.internal.processors.query.calcite.serialize.ValuesPhysicalRel;
 import org.apache.ignite.internal.processors.query.calcite.trait.Destination;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -105,6 +111,16 @@ public class PhysicalRelImplementor implements 
PhysicalRelVisitor<Node<Object[]>
     }
 
     /** {@inheritDoc} */
+    @Override public Node<Object[]> visit(TrimExchangePhysicalRel rel) {
+        Predicate<Object[]> predicate = partitionFilter(rel.function(), 
rel.distributionKeys(), rel.partitions());
+
+        FilterNode node = new FilterNode(ctx, predicate);
+        node.register(visit(rel.input()));
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
     @Override public Node<Object[]> visit(ProjectPhysicalRel rel) {
         ProjectNode node = new ProjectNode(ctx, expressionFactory.project(ctx, 
rel.projects(), rel.rowType()));
         node.register(visit(rel.input()));
@@ -204,6 +220,21 @@ public class PhysicalRelImplementor implements 
PhysicalRelVisitor<Node<Object[]>
     }
 
     /** */
+    private Predicate<Object[]> partitionFilter(DistributionFunction function, 
ImmutableIntList keys, int partitions) {
+        assert function.type() == RelDistribution.Type.HASH_DISTRIBUTED;
+        assert !F.isEmpty(ctx.partitions());
+
+        boolean[] filter = new boolean[partitions];
+
+        for (int part : ctx.partitions())
+            filter[part] = true;
+
+        ToIntFunction<Object> partFunction = 
function.partitionFunction(partitionService, partitions, keys);
+
+        return o -> filter[partFunction.applyAsInt(o)];
+    }
+
+    /** */
     public Node<Object[]> go(PhysicalRel root) {
         return visit(root);
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index 1edb424..238de49 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -95,6 +96,13 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTrimExchange rel) {
+        RelNode input = visit((IgniteRel) rel.getInput());
+
+        return new IgniteTrimExchange(cluster, rel.getTraitSet(), input);
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteProject rel) {
         RelNode input = visit((IgniteRel) rel.getInput());
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
index aae90d9..18655bf 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
@@ -40,9 +40,11 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -84,6 +86,7 @@ public class FragmentSplitter implements 
IgniteRelVisitor<IgniteRel> {
     @Override public IgniteRel visit(IgniteSender rel) {
         // a split may happen on BiRel inputs merge. A sender node cannot be a 
BiRel input.
         assert rel != cutPoint;
+        assert cutPoint != null;
 
         RelNode input = rel.getInput();
         RelNode newInput = visit((IgniteRel) input);
@@ -96,30 +99,25 @@ public class FragmentSplitter implements 
IgniteRelVisitor<IgniteRel> {
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteAggregate rel) {
-        boolean split = rel == cutPoint;
+        boolean split = cutPoint(rel);
 
         RelNode input = rel.getInput();
         RelNode newInput = visit((IgniteRel) input);
 
         if (input != newInput) {
-            IgniteDistribution newDistr = IgniteDistributions.aggregate(mq,
-                newInput, rel.getGroupSet(), rel.getGroupSets(), 
rel.getAggCallList());
+            checkDistributionEqual(input, newInput);
 
-            assert newDistr != null;
-
-            RelTraitSet traits = rel.getTraitSet()
-                .replace(newDistr);
-
-            rel = (IgniteAggregate) rel.copy(traits, 
ImmutableList.of(newInput));
+            rel = (IgniteAggregate) rel.copy(rel.getTraitSet(), 
ImmutableList.of(newInput));
         }
 
-        return split ? split(rel, rel.getTraitSet()) : rel;
+        return split ? split(rel) : rel;
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteMapAggregate rel) {
         // a split may happen on BiRel inputs merge. An map aggregate node 
cannot be a BiRel input.
         assert rel != cutPoint;
+        assert cutPoint != null;
 
         RelNode input = rel.getInput();
         RelNode newInput = visit((IgniteRel) input);
@@ -143,7 +141,7 @@ public class FragmentSplitter implements 
IgniteRelVisitor<IgniteRel> {
         // a split may happen on BiRel inputs merge. An reduce aggregate node 
doesn't have a
         // physical mapping (because it always goes after receiver node), so, 
its merge with
         // any input cannot cause the split.
-        assert rel != cutPoint;
+        assert cutPoint == null;
 
         if (U.assertionsEnabled()) {
             RelNode input = rel.getInput();
@@ -157,7 +155,7 @@ public class FragmentSplitter implements 
IgniteRelVisitor<IgniteRel> {
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteFilter rel) {
-        boolean split = rel == cutPoint;
+        boolean split = cutPoint(rel);
 
         RelNode input = rel.getInput();
         IgniteRel newInput = visit((IgniteRel) input);
@@ -169,12 +167,32 @@ public class FragmentSplitter implements 
IgniteRelVisitor<IgniteRel> {
             rel = (IgniteFilter) rel.copy(traits, ImmutableList.of(newInput));
         }
 
-        return split ? split(rel, rel.getTraitSet()) : rel;
+        return split ? split(rel) : rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTrimExchange rel) {
+        boolean split = cutPoint(rel);
+
+        // after split we need to remove the exchange, return an original
+        // distribution and propagate the distribution to a fragment root.
+        if (split)
+            return split((IgniteRel) rel.getInput());
+        else if (cutPoint == null)
+            return (IgniteRel) rel.getInput();
+
+        RelNode input = rel.getInput();
+        IgniteRel newInput = visit((IgniteRel) input);
+
+        if (input == newInput)
+            return rel;
+
+        return (IgniteRel) rel.copy(rel.getTraitSet(), F.asList(newInput));
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteProject rel) {
-        boolean split = rel == cutPoint;
+        boolean split = cutPoint(rel);
 
         RelNode input = rel.getInput();
         RelNode newInput = visit((IgniteRel) input);
@@ -186,12 +204,15 @@ public class FragmentSplitter implements 
IgniteRelVisitor<IgniteRel> {
             rel = (IgniteProject) rel.copy(traits, ImmutableList.of(newInput));
         }
 
-        return split ? split(rel, rel.getTraitSet()) : rel;
+        return split ? split(rel) : rel;
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteJoin rel) {
-        boolean split = rel == cutPoint;
+        if (cutPoint == null)
+            return rel; // no need to check deeper
+
+        boolean split = cutPoint(rel);
 
         RelNode left = rel.getLeft();
         RelNode right = rel.getRight();
@@ -208,32 +229,35 @@ public class FragmentSplitter implements 
IgniteRelVisitor<IgniteRel> {
             rel = (IgniteJoin) rel.copy(rel.getTraitSet(), 
ImmutableList.of(newLeft, newRight));
         }
 
-        return split ? split(rel, rel.getTraitSet()) : rel;
+        return split ? split(rel) : rel;
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteTableModify rel) {
-        boolean split = rel == cutPoint;
+        boolean split = cutPoint(rel);
 
         RelNode input = rel.getInput();
         RelNode newInput = visit((IgniteRel) input);
 
-        if (input != newInput)
+        if (input != newInput) {
+            checkDistributionEqual(input, newInput);
+
             rel = (IgniteTableModify) rel.copy(rel.getTraitSet(), 
ImmutableList.of(newInput));
+        }
 
-        return split ? split(rel, rel.getTraitSet()) : rel;
+        return split ? split(rel) : rel;
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteTableScan rel) {
-        return rel == cutPoint ? split(rel, rel.getTraitSet()) : rel;
+        return cutPoint(rel) ? split(rel) : rel;
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteReceiver rel) {
         // a split may happen on BiRel inputs merge. A receiver doesn't have a
         // physical mapping, so, its merge with any input cannot cause the 
split.
-        assert rel != cutPoint;
+        assert cutPoint == null;
 
         return rel;
     }
@@ -242,7 +266,7 @@ public class FragmentSplitter implements 
IgniteRelVisitor<IgniteRel> {
     @Override public IgniteRel visit(IgniteValues rel) {
         // a split may happen on BiRel inputs merge. A values node doesn't 
have a
         // physical mapping, so, its merge with any input cannot cause the 
split.
-        assert rel != cutPoint;
+        assert cutPoint == null;
 
         return rel;
     }
@@ -258,8 +282,19 @@ public class FragmentSplitter implements 
IgniteRelVisitor<IgniteRel> {
     }
 
     /** */
-    private IgniteRel split(IgniteRel input, RelTraitSet traits) {
+    public boolean cutPoint(RelNode rel) {
+        boolean res = rel == cutPoint;
+
+        if (res)
+            cutPoint = null;
+
+        return res;
+    }
+
+    /** */
+    private IgniteRel split(IgniteRel input) {
         RelOptCluster cluster = input.getCluster();
+        RelTraitSet traits = input.getTraitSet();
 
         Fragment fragment = new Fragment(new IgniteSender(cluster, traits, 
input));
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index cacaed6..1cbfc9e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
 
 /**
@@ -66,6 +67,11 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> 
{
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTrimExchange rel) {
+        return visitChildren(rel);
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteProject rel) {
         return visitChildren(rel);
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 94b2a34..cc178ce 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -34,6 +34,11 @@ public interface IgniteRelVisitor<T> {
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
      */
+    T visit(IgniteTrimExchange rel);
+
+    /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}
+     */
     T visit(IgniteProject rel);
 
     /**
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
new file mode 100644
index 0000000..4126060
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.RelTargetAware;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+
+import static 
org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
+import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
+
+/**
+ *
+ */
+public class IgniteTrimExchange extends SingleRel implements IgniteRel, 
RelTargetAware {
+    /** */
+    private RelTarget target;
+
+    /** */
+    public IgniteTrimExchange(RelOptCluster cluster, RelTraitSet traits, 
RelNode input) {
+        super(cluster, traits, input);
+
+        assert 
input.getTraitSet().getTrait(DistributionTraitDef.INSTANCE).getType() == 
BROADCAST_DISTRIBUTED;
+        assert traits.getTrait(DistributionTraitDef.INSTANCE).getType() == 
HASH_DISTRIBUTED;
+    }
+
+    /** */
+    @Override public RelNode copy(RelTraitSet traits, List<RelNode> inputs) {
+        RelNode input = sole(inputs);
+
+        assert 
input.getTraitSet().getTrait(DistributionTraitDef.INSTANCE).getType() == 
BROADCAST_DISTRIBUTED;
+        assert traits.getTrait(DistributionTraitDef.INSTANCE).getType() == 
HASH_DISTRIBUTED;
+
+        return new IgniteTrimExchange(getCluster(), traits, input);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** */
+    public RelTarget target() {
+        return target;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void target(RelTarget target) {
+        this.target = target;
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/PhysicalRelVisitor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/PhysicalRelVisitor.java
index 5bbbd0e..028daa0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/PhysicalRelVisitor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/PhysicalRelVisitor.java
@@ -34,6 +34,11 @@ public interface PhysicalRelVisitor<T> {
     /**
      * See {@link PhysicalRelVisitor#visit(PhysicalRel)}
      */
+    T visit(TrimExchangePhysicalRel rel);
+
+    /**
+     * See {@link PhysicalRelVisitor#visit(PhysicalRel)}
+     */
     T visit(ProjectPhysicalRel rel);
 
     /**
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverPhysicalRel.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverPhysicalRel.java
index ffe0ba3..b2d4894 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverPhysicalRel.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/ReceiverPhysicalRel.java
@@ -125,6 +125,8 @@ public class ReceiverPhysicalRel implements PhysicalRel {
         for (int i = 0; i < sourcesSize; i++)
             sources.add((UUID) in.readObject());
 
+        this.sources = sources;
+
         int collationsSize = in.readInt();
 
         if (collationsSize == -1)
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
index d73a744..0466219 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToPhysicalConverter.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.serialize;
 
 import com.google.common.collect.ImmutableList;
 import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
@@ -27,6 +28,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.RexToExpTran
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggCallExp;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.type.DataType;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.RelTarget;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
@@ -40,10 +42,13 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  * Converts RelNode tree to physical rel tree.
@@ -84,6 +89,19 @@ public class RelToPhysicalConverter implements 
IgniteRelVisitor<PhysicalRel> {
     }
 
     /** {@inheritDoc} */
+    @Override public PhysicalRel visit(IgniteTrimExchange rel) {
+        IgniteDistribution distr = rel.distribution();
+        RelTarget target = rel.target();
+
+        assert distr.getType() == RelDistribution.Type.HASH_DISTRIBUTED;
+        assert target != null && target.mapping() != null && 
!F.isEmpty(target.mapping().assignments());
+
+        int partitions = target.mapping().assignments().size();
+
+        return new TrimExchangePhysicalRel(distr.function(), distr.getKeys(), 
partitions, visit((IgniteRel) rel.getInput()));
+    }
+
+    /** {@inheritDoc} */
     @Override public PhysicalRel visit(IgniteProject rel) {
         return new 
ProjectPhysicalRel(DataType.fromType(rel.getInput().getRowType()),
             rexTranslator.translate(rel.getProjects()), visit((IgniteRel) 
rel.getInput()));
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableModifyPhysicalRel.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableModifyPhysicalRel.java
index 651ff7b..9ce72c3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableModifyPhysicalRel.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TableModifyPhysicalRel.java
@@ -99,6 +99,8 @@ public class TableModifyPhysicalRel implements PhysicalRel {
         for (int i = 0; i < tableNameSize; i++)
             tableName.add(in.readUTF());
 
+        this.tableName = tableName;
+
         operation = TableModify.Operation.values()[in.readByte()];
 
         int columnsSize = in.readInt();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TrimExchangePhysicalRel.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TrimExchangePhysicalRel.java
new file mode 100644
index 0000000..d7cdc3a
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/TrimExchangePhysicalRel.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionFunction;
+
+/**
+ *
+ */
+public class TrimExchangePhysicalRel implements PhysicalRel {
+    /** */
+    private DistributionFunction function;
+
+    /** */
+    private ImmutableIntList keys;
+
+    /** */
+    private int partitions;
+
+    /** */
+    private PhysicalRel input;
+
+    /** */
+    public TrimExchangePhysicalRel() {
+    }
+
+    /** */
+    public TrimExchangePhysicalRel(DistributionFunction function, 
ImmutableIntList keys, int partitions, PhysicalRel input) {
+        this.function = function;
+        this.keys = keys;
+        this.partitions = partitions;
+        this.input = input;
+    }
+
+    /** */
+    public DistributionFunction function() {
+        return function;
+    }
+
+    /** */
+    public ImmutableIntList distributionKeys() {
+        return keys;
+    }
+
+    /** */
+    public int partitions() {
+        return partitions;
+    }
+
+    /** */
+    public PhysicalRel input() {
+        return input;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(PhysicalRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(function);
+        out.writeObject(keys.toIntArray());
+        out.writeInt(partitions);
+        out.writeObject(input);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        function = (DistributionFunction) in.readObject();
+        keys = ImmutableIntList.of((int[]) in.readObject());
+        partitions = in.readInt();
+        input = (PhysicalRel) in.readObject();
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
index f94c0d9..b55c77d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTraitDef.java
@@ -21,8 +21,10 @@ import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.processors.query.calcite.rule.RuleUtils;
 
 /**
@@ -53,8 +55,17 @@ public class DistributionTraitDef extends 
RelTraitDef<IgniteDistribution> {
             return rel;
 
         RelTraitSet newTraits = rel.getTraitSet().replace(toDist);
-        RelNode input = RuleUtils.convert(rel, IgniteDistributions.any()); // 
erasing source distribution a bit reduces search space
-        RelNode newRel = planner.register(new IgniteExchange(rel.getCluster(), 
newTraits, input, toDist), rel);
+        RelNode newRel;
+
+        // special case
+        if (fromDist.getType() == RelDistribution.Type.BROADCAST_DISTRIBUTED
+            && toDist.getType() == RelDistribution.Type.HASH_DISTRIBUTED) {
+            newRel = planner.register(new IgniteTrimExchange(rel.getCluster(), 
newTraits, rel), rel);
+        }
+        else {
+            RelNode input = RuleUtils.convert(rel, IgniteDistributions.any()); 
// erasing source distribution a bit reduces search space
+            newRel = planner.register(new IgniteExchange(rel.getCluster(), 
newTraits, input, toDist), rel);
+        }
 
         if (!newRel.getTraitSet().equals(newTraits))
             newRel = planner.changeTraits(newRel, newTraits);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 4c0e492..1e4bb02 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -191,7 +191,7 @@ public class IgniteDistributions {
 
         IgniteDistribution out, in;
 
-        if (simpleAggregate(groupSet, groupSets)) {
+        if (simpleAggregate(groupSet, groupSets) && !groupSet.isEmpty()) {
             // re-hash by group keys
             in = hash(groupSet.asList());
             out = hash(ImmutableIntList.range(0, groupSet.cardinality()));

Reply via email to