This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new e1274e2  refactoring
e1274e2 is described below

commit e1274e2f1d2cd311f5fa46d51df10f6f3d7b9b4b
Author: Igor Seliverstov <[email protected]>
AuthorDate: Fri Nov 15 17:36:10 2019 +0300

    refactoring
---
 .../calcite/metadata/IgniteMdDistribution.java     |  78 +++++++++++++--
 .../query/calcite/metadata/RelMetadataQueryEx.java |   4 +-
 .../query/calcite/prepare/IgnitePlanner.java       |  13 +++
 .../processors/query/calcite/rel/IgniteFilter.java |  19 +++-
 .../rel/{IgniteHashJoin.java => IgniteJoin.java}   |   8 +-
 .../query/calcite/rel/IgniteProject.java           |  25 ++++-
 .../query/calcite/rule/IgniteFilterRule.java       |   7 +-
 ...IgniteHashJoinRule.java => IgniteJoinRule.java} |  40 ++++----
 .../query/calcite/rule/IgniteProjectRule.java      |   4 +-
 .../processors/query/calcite/rule/IgniteRules.java |   2 +-
 .../query/calcite/schema/CalciteSchemaHolder.java  |   3 +-
 .../query/calcite/schema/IgniteSchema.java         |   9 +-
 .../{util/Implementor.java => serialize/Node.java} |   6 +-
 .../calcite/serialize/RelToNodeConverter.java      | 109 +++++++++++++++++++++
 .../SerializationContext.java}                     |  14 ++-
 .../query/calcite/util/IgniteRelShuttle.java       |   8 +-
 .../processors/query/calcite/util/Implementor.java |  14 +++
 .../query/calcite/CalciteQueryProcessorTest.java   |  88 +++++++++++++++--
 parent/pom.xml                                     |   2 +-
 19 files changed, 382 insertions(+), 71 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
index d58902a..7c0491d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdDistribution.java
@@ -25,6 +25,8 @@ import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.metadata.MetadataDef;
 import org.apache.calcite.rel.metadata.MetadataHandler;
@@ -45,6 +47,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.BROADCAST;
 import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.HASH;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionType.SINGLE;
 
 /**
  *
@@ -71,7 +74,7 @@ public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Dist
     }
 
     public DistributionTrait getDistributionTrait(Join join, RelMetadataQuery 
mq) {
-        return join(mq, join.getLeft(), join.getRight(), join.getCondition());
+        return join(mq, join.getLeft(), join.getRight(), 
join.analyzeCondition(), join.getJoinType());
     }
 
     public DistributionTrait getDistributionTrait(RelSubset rel, 
RelMetadataQuery mq) {
@@ -82,7 +85,7 @@ public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Dist
         return rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);
     }
 
-    public static DistributionTrait project(RelMetadataQuery mq, RelNode 
input, List<RexNode> projects) {
+    public static DistributionTrait project(RelMetadataQuery mq, RelNode 
input, List<? extends RexNode> projects) {
         DistributionTrait trait = distribution(input, mq);
 
         if (trait.type() == HASH) {
@@ -93,7 +96,7 @@ public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Dist
 
             Map<Integer, Integer> m = new HashMap<>(projects.size());
 
-            for (Ord<RexNode> node : Ord.zip(projects)) {
+            for (Ord<? extends RexNode> node : Ord.zip(projects)) {
                 if (node.e instanceof RexInputRef)
                     m.put( ((RexSlot) node.e).getIndex(), node.i);
                 else if (node.e.isA(SqlKind.CAST)) {
@@ -125,10 +128,71 @@ public class IgniteMdDistribution implements 
MetadataHandler<IgniteMetadata.Dist
         return distribution(input, mq);
     }
 
-    public static DistributionTrait join(RelMetadataQuery mq, RelNode left, 
RelNode right, RexNode condition) {
-        DistributionTrait leftDist = distribution(left, mq);
-
-        return leftDist.type() != BROADCAST ? leftDist : distribution(right, 
mq);
+    public static DistributionTrait join(RelMetadataQuery mq, RelNode left, 
RelNode right, JoinInfo joinInfo, JoinRelType joinType) {
+        /*
+         * Distributions table:
+         *
+         * ===============INNER JOIN==============
+         * hash + hash = hash
+         * broadcast + hash = hash
+         * hash + broadcast = hash
+         * broadcast + broadcast = broadcast
+         * single + single = single
+         *
+         * ===============LEFT JOIN===============
+         * hash + hash = hash
+         * hash + broadcast = hash
+         * broadcast + broadcast = broadcast
+         * single + single = single
+         *
+         * ===============RIGHT JOIN==============
+         * hash + hash = hash
+         * broadcast + hash = hash
+         * broadcast + broadcast = broadcast
+         * single + single = single
+         *
+         * ===========FULL JOIN/CROSS JOIN========
+         * broadcast + broadcast = broadcast
+         * single + single = single
+         *
+         *
+         * others are impossible TODO assertions
+         */
+
+        DistributionTrait leftDistr = distribution(left, mq);
+        DistributionTrait rightDistr;
+
+        switch (joinType) {
+            case FULL:
+            case LEFT:
+                return leftDistr;
+            case INNER:
+                rightDistr = distribution(right, mq);
+
+                if (joinInfo.keys().isEmpty()
+                    || (leftDistr.type() == HASH || leftDistr.type() == SINGLE)
+                    || (leftDistr.type() == BROADCAST && rightDistr.type() == 
BROADCAST))
+                    return leftDistr;
+
+                if (rightDistr == null)
+                    rightDistr = distribution(right, mq);
+
+                assert rightDistr.type() == HASH;
+
+                return IgniteDistributions.hash(joinInfo.leftKeys, 
rightDistr.destinationFunctionFactory());
+            case RIGHT:
+                rightDistr = distribution(right, mq);
+
+                if (leftDistr.type() == SINGLE
+                    || (leftDistr.type() == HASH && rightDistr.type() == HASH)
+                    || (leftDistr.type() == BROADCAST && rightDistr.type() == 
BROADCAST))
+                    return leftDistr;
+                assert rightDistr.type() == HASH;
+
+                return IgniteDistributions.hash(joinInfo.leftKeys, 
rightDistr.destinationFunctionFactory());
+            default:
+                throw new UnsupportedOperationException();
+        }
     }
 
     public static DistributionTrait distribution(RelNode rel, RelMetadataQuery 
mq) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index 16d662d..713900d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -22,7 +22,7 @@ import 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
@@ -41,7 +41,7 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
         PROVIDER.register(Arrays.asList(
             IgniteExchange.class,
             IgniteFilter.class,
-            IgniteHashJoin.class,
+            IgniteJoin.class,
             IgniteProject.class,
             IgniteTableScan.class,
             Receiver.class,
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index f568e63..1ae2a33 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -72,6 +72,7 @@ import org.apache.calcite.util.Pair;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
+import org.apache.ignite.internal.processors.query.calcite.serialize.Node;
 
 /**
  *
@@ -218,6 +219,18 @@ public class IgnitePlanner implements Planner, 
RelOptTable.ViewExpander {
         return root;
     }
 
+    public RelNode convert(Node node) {
+        ready();
+
+        return null;
+    }
+
+    public Node convert(RelNode node) {
+        ready();
+
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public RelRoot expandView(RelDataType rowType, String 
queryString, List<String> schemaPath, List<String> viewPath) {
         ready();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 6ddafb8..9e261d4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
+import com.google.common.collect.ImmutableSet;
 import java.util.Objects;
 import java.util.Set;
 import org.apache.calcite.plan.RelOptCluster;
@@ -24,9 +25,9 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
@@ -59,10 +60,20 @@ public final class IgniteFilter extends Filter implements 
IgniteRel {
   }
 
   public static IgniteFilter create(Filter filter, RelNode input) {
-    RelTraitSet traits = filter.getTraitSet()
+    RexNode condition = filter.getCondition();
+    Set<CorrelationId> variablesSet = filter.getVariablesSet();
+
+    return create(input, condition, variablesSet);
+  }
+
+  public static IgniteFilter create(RelNode input, RexNode condition, 
Set<CorrelationId> variablesSet) {
+    RelOptCluster cluster = input.getCluster();
+    RelMetadataQuery mq = cluster.getMetadataQuery();
+
+    RelTraitSet traits = cluster.traitSet()
         .replace(IgniteRel.IGNITE_CONVENTION)
-        .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.filter(RelMetadataQueryEx.instance(), input, 
filter.getCondition()));
+        .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.filter(mq, input, condition));
 
-    return new IgniteFilter(filter.getCluster(), traits, input, 
filter.getCondition(), filter.getVariablesSet());
+    return new IgniteFilter(cluster, traits, input, condition, 
ImmutableSet.copyOf(variablesSet));
   }
 }
\ No newline at end of file
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
similarity index 88%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
index dd3c725..ad60afd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
@@ -27,10 +27,10 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
-public final class IgniteHashJoin extends Join implements IgniteRel {
+public final class IgniteJoin extends Join implements IgniteRel {
   private final boolean semiJoinDone;
 
-  public IgniteHashJoin(
+  public IgniteJoin(
       RelOptCluster cluster,
       RelTraitSet traitSet,
       RelNode left,
@@ -43,9 +43,9 @@ public final class IgniteHashJoin extends Join implements 
IgniteRel {
     this.semiJoinDone = semiJoinDone;
   }
 
-  @Override public IgniteHashJoin copy(RelTraitSet traitSet, RexNode 
conditionExpr,
+  @Override public IgniteJoin copy(RelTraitSet traitSet, RexNode conditionExpr,
       RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) 
{
-    return new IgniteHashJoin(getCluster(), traitSet, left, right, 
conditionExpr, variablesSet, joinType, semiJoinDone);
+    return new IgniteJoin(getCluster(), traitSet, left, right, conditionExpr, 
variablesSet, joinType, semiJoinDone);
   }
 
   /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index e8ce5c1..85c7029 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -21,10 +21,12 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
@@ -48,11 +50,26 @@ public final class IgniteProject extends Project implements 
IgniteRel {
     return implementor.implement(this);
   }
 
+  /** Creates a LogicalProject. */
+  public static IgniteProject create(final RelNode input, List<? extends 
RexNode> projects, List<String> fieldNames) {
+    final RelOptCluster cluster = input.getCluster();
+    final RelDataType rowType =
+        RexUtil.createStructType(cluster.getTypeFactory(), projects,
+            fieldNames, SqlValidatorUtil.F_SUGGESTER);
+    return create(input, projects, rowType);
+  }
+
   public static IgniteProject create(Project project, RelNode input) {
-    RelTraitSet traits = project.getTraitSet()
+    return create(input, project.getProjects(), project.getRowType());
+  }
+
+  public static IgniteProject create(RelNode input, List<? extends RexNode> 
projects, RelDataType rowType) {
+    RelOptCluster cluster = input.getCluster();
+    RelMetadataQuery mq = cluster.getMetadataQuery();
+    RelTraitSet traits = cluster.traitSet()
         .replace(IgniteRel.IGNITE_CONVENTION)
-        .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.project(RelMetadataQueryEx.instance(), input, 
project.getProjects()));
+        .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.project(mq, input, projects));
 
-    return new IgniteProject(project.getCluster(), traits, input, 
project.getProjects(), project.getRowType());
+    return new IgniteProject(cluster, traits, input, projects, rowType);
   }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
index 3c3a5eb..4d82c98 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteFilterRule.java
@@ -18,11 +18,11 @@
 package org.apache.ignite.internal.processors.query.calcite.rule;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
@@ -36,14 +36,15 @@ public class IgniteFilterRule extends RelOptRule {
     public static final RelOptRule INSTANCE = new IgniteFilterRule();
 
     private IgniteFilterRule() {
-        super(Commons.any(LogicalFilter.class, RelNode.class), 
RelFactories.LOGICAL_BUILDER, "IgniteFilterRule");
+        super(Commons.any(LogicalFilter.class, RelNode.class), 
"IgniteFilterRule");
     }
 
     @Override public void onMatch(RelOptRuleCall call) {
         LogicalFilter filter = call.rel(0);
+        RelOptCluster cluster = filter.getCluster();
         RelNode input = filter.getInput();
 
-        final RelTraitSet traitSet = 
input.getTraitSet().replace(IgniteRel.IGNITE_CONVENTION);
+        final RelTraitSet traitSet = 
cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION);
 
         RelNode converted = convert(input, traitSet);
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
similarity index 79%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
index 5bb537f..9bd8527ee 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteHashJoinRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteJoinRule.java
@@ -20,15 +20,16 @@ package 
org.apache.ignite.internal.processors.query.calcite.rule;
 import com.google.common.collect.ImmutableList;
 import java.util.List;
 import java.util.Objects;
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDistribution;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
@@ -42,24 +43,23 @@ import static 
org.apache.ignite.internal.processors.query.calcite.trait.Distribu
 /**
  *
  */
-public class IgniteHashJoinRule extends RelOptRule {
-    public static final RelOptRule INSTANCE = new IgniteHashJoinRule();
+public class IgniteJoinRule extends RelOptRule {
+    public static final RelOptRule INSTANCE = new IgniteJoinRule();
 
-    public IgniteHashJoinRule() {
-        super(Commons.any(LogicalJoin.class, RelNode.class), 
RelFactories.LOGICAL_BUILDER, "IgniteHashJoinRule");
+    public IgniteJoinRule() {
+        super(Commons.any(Join.class, RelNode.class), 
RelFactories.LOGICAL_BUILDER, "IgniteJoinRule");
     }
 
     @Override public void onMatch(RelOptRuleCall call) {
-        LogicalJoin join = call.rel(0);
+        Join join = call.rel(0);
 
-        RelTraitSet leftTraits = join.getLeft().getTraitSet()
-            .replace(IgniteRel.IGNITE_CONVENTION);
+        RelOptCluster cluster = join.getCluster();
 
-        RelTraitSet rightTraits = join.getRight().getTraitSet()
+        RelTraitSet traitSet = cluster.traitSet()
             .replace(IgniteRel.IGNITE_CONVENTION);
 
-        RelNode left = convert(join.getLeft(), leftTraits);
-        RelNode right = convert(join.getRight(), rightTraits);
+        RelNode left = convert(join.getLeft(), traitSet);
+        RelNode right = convert(join.getRight(), traitSet);
 
         RelMetadataQuery mq = call.getMetadataQuery();
 
@@ -87,27 +87,29 @@ public class IgniteHashJoinRule extends RelOptRule {
         }
     }
 
-    private void transform(RelOptRuleCall call, LogicalJoin join, 
RelMetadataQuery mq, DistributionTrait leftDist, DistributionTrait rightDist) {
-        RelTraitSet leftTraits = join.getLeft().getTraitSet()
+    private void transform(RelOptRuleCall call, Join join, RelMetadataQuery 
mq, DistributionTrait leftDist, DistributionTrait rightDist) {
+        RelOptCluster cluster = join.getCluster();
+
+        RelTraitSet leftTraits = cluster.traitSet()
             .replace(IgniteRel.IGNITE_CONVENTION)
             .replace(leftDist);
 
-        RelTraitSet rightTraits = join.getRight().getTraitSet()
+        RelTraitSet rightTraits = cluster.traitSet()
             .replace(IgniteRel.IGNITE_CONVENTION)
             .replace(rightDist);
 
         RelNode left = convert(join.getLeft(), leftTraits);
         RelNode right = convert(join.getRight(), rightTraits);
 
-        RelTraitSet traitSet = join.getTraitSet()
+        RelTraitSet traitSet = cluster.traitSet()
             .replace(IgniteRel.IGNITE_CONVENTION)
-            .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.join(mq, left, right, join.getCondition()));
+            .replaceIf(DistributionTraitDef.INSTANCE, () -> 
IgniteMdDistribution.join(mq, left, right, join.analyzeCondition(), 
join.getJoinType()));
 
-        call.transformTo(new IgniteHashJoin(join.getCluster(), traitSet, left, 
right,
+        call.transformTo(new IgniteJoin(cluster, traitSet, left, right,
             join.getCondition(), join.getVariablesSet(), join.getJoinType(), 
join.isSemiJoinDone()));
     }
 
-    private boolean canTransform(LogicalJoin join, DistributionTrait leftDist, 
DistributionTrait rightDist) {
+    private boolean canTransform(Join join, DistributionTrait leftDist, 
DistributionTrait rightDist) {
         if (leftDist.type() == BROADCAST
             && rightDist.type() == BROADCAST)
             return true;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
index 9f7e3cc..f238d08 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteProjectRule.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rule;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
@@ -41,9 +42,10 @@ public class IgniteProjectRule extends RelOptRule {
 
     @Override public void onMatch(RelOptRuleCall call) {
         LogicalProject project = call.rel(0);
+        RelOptCluster cluster = project.getCluster();
         RelNode input = project.getInput();
 
-        final RelTraitSet traitSet = 
input.getTraitSet().replace(IgniteRel.IGNITE_CONVENTION);
+        final RelTraitSet traitSet = 
cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION);
 
         RelNode converted = convert(input, traitSet);
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
index 82f4960..d82147d7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
@@ -158,7 +158,7 @@ public class IgniteRules {
     public static final List<RelOptRule> IGNITE_RULES = ImmutableList.of(
         IgniteFilterRule.INSTANCE,
         IgniteProjectRule.INSTANCE,
-        IgniteHashJoinRule.INSTANCE);
+        IgniteJoinRule.INSTANCE);
 
     public static List<RelOptRule> logicalRules(Context ctx) {
         return ImmutableList.<RelOptRule>builder()
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java
index fb537e1..bad106f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CalciteSchemaHolder.java
@@ -23,6 +23,7 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener;
 
 /**
@@ -63,6 +64,6 @@ public class CalciteSchemaHolder implements 
SchemaChangeListener {
     private void rebuild() {
         SchemaPlus schema = Frameworks.createRootSchema(false);
         schemas.forEach(schema::add);
-        schema(schema);
+        schema(schema.getSubSchema(QueryUtils.DFLT_SCHEMA));
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
index 81f7265..18c8e5e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteSchema.java
@@ -55,9 +55,7 @@ public class IgniteSchema extends AbstractSchema {
      * @param cacheInfo Cache info.
      */
     public void onSqlTypeCreate(GridQueryTypeDescriptor typeDesc, 
GridCacheContextInfo cacheInfo) {
-        IgniteTable table = new IgniteTable(typeDesc.tableName(), 
cacheInfo.name(), Commons.rowType(typeDesc));
-
-        addTable(table.tableName(), table);
+        addTable(new IgniteTable(typeDesc.tableName(), cacheInfo.name(), 
Commons.rowType(typeDesc)));
     }
 
     /**
@@ -71,11 +69,10 @@ public class IgniteSchema extends AbstractSchema {
     }
 
     /**
-     * @param name Table name.
      * @param table Table.
      */
-    public void addTable(String name, Table table) {
-        tableMap.put(name, table);
+    public void addTable(IgniteTable table) {
+        tableMap.put(table.tableName(), table);
     }
 
     /**
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
similarity index 84%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
index 7a5a145..172b9f4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
@@ -14,13 +14,13 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.util;
+package org.apache.ignite.internal.processors.query.calcite.serialize;
 
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 
 /**
  *
  */
-public interface Implementor<T> {
-    T implement(IgniteRel other);
+public interface Node {
+    IgniteRel toRel(SerializationContext ctx);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToNodeConverter.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToNodeConverter.java
new file mode 100644
index 0000000..a385adf
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToNodeConverter.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+
+/**
+ *
+ */
+public class RelToNodeConverter {
+    static class ImplementorImpl implements Implementor<Node> {
+
+        @Override public Node implement(IgniteExchange rel) {
+            return null;
+        }
+
+        @Override public Node implement(IgniteFilter rel) {
+            return null;
+        }
+
+        @Override public Node implement(IgniteJoin rel) {
+            return null;
+        }
+
+        @Override public Node implement(IgniteProject rel) {
+            return null;
+        }
+
+        @Override public Node implement(IgniteTableScan rel) {
+            return null;
+        }
+
+        @Override public Node implement(Receiver rel) {
+            return null;
+        }
+
+        @Override public Node implement(Sender rel) {
+            return null;
+        }
+
+        @Override public Node implement(IgniteRel other) {
+            return null;
+        }
+    }
+
+    static class ExchangeNode implements Node {
+        @Override public IgniteRel toRel(SerializationContext ctx) {
+            return null;
+        }
+    }
+
+    static class FilterNode implements Node {
+        @Override public IgniteRel toRel(SerializationContext ctx) {
+            return null;
+        }
+    }
+
+    static class HashJoinNode implements Node {
+        @Override public IgniteRel toRel(SerializationContext ctx) {
+            return null;
+        }
+    }
+
+    static class ProjectNode implements Node {
+        @Override public IgniteRel toRel(SerializationContext ctx) {
+            return null;
+        }
+    }
+
+    static class TableScanNode implements Node {
+        @Override public IgniteRel toRel(SerializationContext ctx) {
+            return null;
+        }
+    }
+
+    static class ReceiverNode implements Node {
+        @Override public IgniteRel toRel(SerializationContext ctx) {
+            return null;
+        }
+    }
+
+    static class SenderNode implements Node {
+        @Override public IgniteRel toRel(SerializationContext ctx) {
+            return null;
+        }
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializationContext.java
similarity index 63%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializationContext.java
index 7a5a145..d2fcb32 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/SerializationContext.java
@@ -14,13 +14,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.util;
+package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 
 /**
  *
  */
-public interface Implementor<T> {
-    T implement(IgniteRel other);
+class SerializationContext {
+    RelOptCluster cluster;
+    RelOptSchema schema;
+    IgnitePlanner planner;
+    Context ctx;
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
index a236774..d4b0dc6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
@@ -20,7 +20,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
@@ -54,7 +54,7 @@ public class IgniteRelShuttle extends RelShuttleImpl {
         return rel;
     }
 
-    public RelNode visit(IgniteHashJoin rel) {
+    public RelNode visit(IgniteJoin rel) {
         return visitChildren(rel);
     }
 
@@ -71,8 +71,8 @@ public class IgniteRelShuttle extends RelShuttleImpl {
             return visit((Sender)rel);
         if (rel instanceof IgniteTableScan)
             return visit((IgniteTableScan)rel);
-        if (rel instanceof IgniteHashJoin)
-            return visit((IgniteHashJoin)rel);
+        if (rel instanceof IgniteJoin)
+            return visit((IgniteJoin)rel);
 
         return visitOther(rel);
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
index 7a5a145..c45af43 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Implementor.java
@@ -16,11 +16,25 @@
 
 package org.apache.ignite.internal.processors.query.calcite.util;
 
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 
 /**
  *
  */
 public interface Implementor<T> {
+    T implement(IgniteExchange rel);
+    T implement(IgniteFilter rel);
+    T implement(IgniteJoin rel);
+    T implement(IgniteProject rel);
+    T implement(IgniteTableScan rel);
+    T implement(Receiver rel);
+    T implement(Sender rel);
     T implement(IgniteRel other);
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index f947462..df532ad 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -78,7 +78,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
         IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
 
-        publicSchema.addTable("Developer", new IgniteTable("Developer", 
"Developer",
+        publicSchema.addTable(new IgniteTable("Developer", "Developer",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
@@ -86,7 +86,7 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
                 .field("cityId", Integer.class)
                 .build()));
 
-        publicSchema.addTable("Project", new IgniteTable("Project", "Project",
+        publicSchema.addTable(new IgniteTable("Project", "Project",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
@@ -95,23 +95,23 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
 
 
 
-        publicSchema.addTable("Country", new IgniteTable("Country", "Country",
+        publicSchema.addTable(new IgniteTable("Country", "Country",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
                 .field("countryCode", Integer.class)
                 .build()));
 
-        publicSchema.addTable("City", new IgniteTable("City", "City",
+        publicSchema.addTable(new IgniteTable("City", "City",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
                 .field("countryId", Integer.class)
                 .build()));
 
-        schema = Frameworks.createRootSchema(false);
-
-        schema.add("PUBLIC", publicSchema);
+        schema = Frameworks
+            .createRootSchema(false)
+            .add("PUBLIC", publicSchema);
 
         nodes = new ArrayList<>(4);
 
@@ -128,6 +128,45 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
             "FROM PUBLIC.Developer d JOIN (" +
             "SELECT pp.id as id0, pp.ver as ver0 FROM PUBLIC.Project pp" +
             ") p " +
+            "ON d.projectId = p.id0 + 1" +
+            "WHERE (d.projectId + 1) > ?";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(query);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+        }
+
+        assertNotNull(relRoot.rel);
+    }
+
+    @Test
+    public void testLogicalPlanDefaultSchema() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
+            "FROM Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.ver as ver0 FROM Project pp" +
+            ") p " +
             "ON d.projectId = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
@@ -164,6 +203,41 @@ public class CalciteQueryProcessorTest extends 
GridCommonAbstractTest {
     }
 
     @Test
+    public void testCorrelatedQuery() throws Exception {
+        String sql = "SELECT d.id, (SELECT p.name FROM Project p WHERE p.id = 
d.id) name, d.projectId " +
+            "FROM Developer d";
+
+        Context ctx = proc.context(Contexts.of(schema, registry, 
AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            ConventionTraitDef.INSTANCE
+        };
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = ctx.unwrap(Query.class);
+
+            assertNotNull(query);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+        }
+
+        assertNotNull(relRoot.rel);
+    }
+
+    @Test
     public void testHepPlaner() throws Exception {
         String sql = "SELECT d.id, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
diff --git a/parent/pom.xml b/parent/pom.xml
index 85febbe..5e4a850 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -139,7 +139,7 @@
         <zookeeper.version>3.5.5</zookeeper.version>
         <zstd.version>1.3.7-2</zstd.version>
         <opencensus.version>0.22.0</opencensus.version>
-        <calcite.version>1.20.0</calcite.version>
+        <calcite.version>1.21.0</calcite.version>
 
 
         <!-- Maven plugins versions -->

Reply via email to