This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 18d1d41  IGNITE-13579 Wrong distribution assertion in case of nested 
request execution. This closes #8364
18d1d41 is described below

commit 18d1d415be60b6e027383f45b82b44f41ef878b4
Author: zstan <[email protected]>
AuthorDate: Fri Oct 16 16:01:28 2020 +0300

    IGNITE-13579 Wrong distribution assertion in case of nested request 
execution. This closes #8364
---
 .../query/calcite/exec/exp/agg/Accumulators.java   | 44 +++++++++++++++++++++-
 .../query/calcite/prepare/PlannerPhase.java        |  7 ++++
 .../calcite/rel/AbstractIgniteNestedLoopJoin.java  |  9 +++--
 .../query/calcite/rel/IgniteProject.java           |  4 +-
 .../processors/query/calcite/trait/TraitUtils.java |  2 +-
 .../query/calcite/trait/TraitsAwareIgniteRel.java  | 37 ++++++++++--------
 .../calcite/trait/TraitsPropagationContext.java    |  2 -
 .../processors/query/calcite/PlannerTest.java      |  5 ++-
 .../processors/query/calcite/QueryChecker.java     | 18 ++++++---
 .../calcite/rules/ProjectScanMergeRuleTest.java    | 38 +++++++++++++++++++
 10 files changed, 133 insertions(+), 33 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
index b3bfc3c..bce0dde 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
@@ -21,12 +21,12 @@ import java.math.BigDecimal;
 import java.math.MathContext;
 import java.util.List;
 import java.util.function.Supplier;
-
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.util.typedef.F;
 
+import static org.apache.calcite.sql.type.SqlTypeName.ANY;
 import static org.apache.calcite.sql.type.SqlTypeName.BIGINT;
 import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
 import static org.apache.calcite.sql.type.SqlTypeName.DOUBLE;
@@ -49,8 +49,10 @@ public class Accumulators {
                 return minFactory(call);
             case "MAX":
                 return maxFactory(call);
+            case "SINGLE_VALUE":
+                return SingleVal.FACTORY;
             default:
-                throw new AssertionError();
+                throw new AssertionError(call.getAggregation().getName());
         }
     }
 
@@ -121,6 +123,44 @@ public class Accumulators {
     }
 
     /** */
+    private static class SingleVal implements Accumulator {
+        /** */
+        private Object holder;
+
+        /** */
+        public static final Supplier<Accumulator> FACTORY = SingleVal::new;
+
+        /** */
+        @Override public void add(Object... args) {
+            assert args.length == 1 : args.length;
+
+            holder = args[0];
+        }
+
+        /** */
+        @Override public void apply(Accumulator other) {
+            assert holder == null : "sudden apply for: " + other + " on 
SingleVal";
+
+            holder = ((SingleVal)other).holder;
+        }
+
+        /** */
+        @Override public Object end() {
+            return holder;
+        }
+
+        /** */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
+            return 
F.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(ANY), 
true));
+        }
+
+        /** */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) 
{
+            return typeFactory.createSqlType(ANY);
+        }
+    }
+
+    /** */
     public static class DecimalAvg implements Accumulator {
         /** */
         public static final Supplier<Accumulator> FACTORY = DecimalAvg::new;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index e06d2e4..0547393 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.processors.query.calcite.prepare;
 
+import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule;
+import org.apache.calcite.rel.rules.JoinCommuteRule;
+import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
 import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
 import org.apache.calcite.rel.rules.SortRemoveRule;
 import org.apache.calcite.rel.rules.SubQueryRemoveRule;
@@ -97,6 +100,10 @@ public enum PlannerPhase {
                 UnionMergeRule.INSTANCE,
                 UnionConverterRule.INSTANCE,
                 SortConverterRule.INSTANCE,
+                JoinCommuteRule.INSTANCE,
+                JoinPushThroughJoinRule.LEFT,
+                JoinPushThroughJoinRule.RIGHT,
+                AggregateReduceFunctionsRule.INSTANCE,
                 SortRemoveRule.INSTANCE);
         }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
index e382599..65d1092 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
@@ -226,11 +225,15 @@ public abstract class AbstractIgniteNestedLoopJoin 
extends Join implements Trait
         // over a left edge. The code below checks whether a desired collation 
is possible and requires
         // appropriate collation from the left edge.
 
+        RelCollation collation = TraitUtils.collation(nodeTraits);
+
         RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
 
-        RelTraitSet outTraits, leftTraits, rightTraits;
+        if (collation.equals(RelCollations.EMPTY))
+            return ImmutableList.of(Pair.of(nodeTraits,
+                ImmutableList.of(left.replace(RelCollations.EMPTY), 
right.replace(RelCollations.EMPTY))));
 
-        RelCollation collation = TraitUtils.collation(nodeTraits);
+        RelTraitSet outTraits, leftTraits, rightTraits;
 
         if (!projectsLeft(collation))
             collation = RelCollations.EMPTY;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index 1fd1e56..45d897a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -69,6 +69,7 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
         super(cluster, traits, input, projects, rowType);
     }
 
+    /** */
     public IgniteProject(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
     }
@@ -123,9 +124,8 @@ public class IgniteProject extends Project implements 
TraitsAwareIgniteRel {
             srcKeys.add(src);
         }
 
-        if (srcKeys.size() == keys.size()) {
+        if (srcKeys.size() == keys.size())
             return ImmutableList.of(Pair.of(nodeTraits, 
ImmutableList.of(in.replace(hash(srcKeys, distribution.function())))));
-        }
 
         return ImmutableList.of(Pair.of(nodeTraits.replace(single()), 
ImmutableList.of(in.replace(single()))));
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 4c7afbc..7c1e394 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -149,7 +149,7 @@ public class TraitUtils {
         return rel;
     }
 
-    /** */
+    /** Change distribution and Convention. */
     public static RelTraitSet fixTraits(RelTraitSet traits) {
         if (distribution(traits) == any())
             traits = traits.replace(single());
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
index 2e2096d..0392a36 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
@@ -18,47 +18,52 @@
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
 import java.util.List;
-
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.DeriveMode;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.fixTraits;
 
 /** */
 public interface TraitsAwareIgniteRel extends IgniteRel {
     /** {@inheritDoc} */
     @Override public default RelNode passThrough(RelTraitSet required) {
+        required = fixTraits(required);
+
         List<RelNode> nodes = TraitsPropagationContext.forPassingThrough(this, 
required)
             .propagate(this::passThroughCollation)
             .propagate(this::passThroughDistribution)
             .propagate(this::passThroughRewindability)
             .nodes(this::createNode);
 
-        if (U.assertionsEnabled()) {
-            RelNode first = F.first(nodes);
+        RelOptPlanner planner = getCluster().getPlanner();
+
+        assert planner instanceof VolcanoPlanner;
 
-            if (first != null) {
-                RelTraitSet traits = first.getTraitSet();
+        for (RelNode node : nodes) {
+            RelTraitSet traits = node.getTraitSet();
 
-                for (int i = 1; i < nodes.size(); i++) {
-                    if (!traits.equals(nodes.get(i).getTraitSet()))
-                        throw new AssertionError("All produced nodes must have 
equal traits. [nodes=" + nodes + "]");
-                }
+            // try to fix traits somehow.
+            if (!traits.satisfies(required))
+                node = TraitUtils.enforce(node, required);
+
+            if (node != null) {
+                boolean satisfies = node.getTraitSet().satisfies(required);
+
+                assert satisfies : "current rel=" + getRelTypeName() + ", 
traits=" + traits + ", required=" + required;
+
+                planner.register(node, this);
             }
         }
 
-        RelOptPlanner planner = getCluster().getPlanner();
-        for (int i = 1; i < nodes.size(); i++)
-            planner.register(nodes.get(i), this);
-
-        return F.first(nodes);
+        return RelOptRule.convert(this, required);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsPropagationContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsPropagationContext.java
index df5c579..4c936d4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsPropagationContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsPropagationContext.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.processors.query.calcite.trait;
 
 import java.util.List;
 import java.util.Set;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.plan.RelTraitSet;
@@ -48,7 +47,6 @@ public class TraitsPropagationContext {
                 Commons.transform(node.getInputs(), i -> 
fixTraits(i.getCluster().traitSet()))));
 
         return new TraitsPropagationContext(variants);
-
     }
 
     /**
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index 2f9f7cd..f05a2e6 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -2631,9 +2631,10 @@ public class PlannerTest extends GridCommonAbstractTest {
             RelNode phys = planner.transform(PlannerPhase.OPTIMIZATION, 
desired, rel);
 
             assertNotNull(phys);
-            
assertEquals("IgniteCorrelatedNestedLoopJoin(condition=[=(CAST(+($0, 
$1)):INTEGER, 2)], joinType=[inner])\n" +
+            assertEquals("" +
+                    "IgniteCorrelatedNestedLoopJoin(condition=[=(CAST(+($0, 
$1)):INTEGER, 2)], joinType=[inner])\n" +
                     "  IgniteTableScan(table=[[PUBLIC, DEPT]], 
requiredColunms=[{0}])\n" +
-                    "  IgniteTableScan(table=[[PUBLIC, EMP]], 
filters=[=(CAST(+($cor1.DEPTNO, $t0)):INTEGER, 2)], requiredColunms=[{2}])\n",
+                    "  IgniteTableScan(table=[[PUBLIC, EMP]], 
filters=[=(CAST(+($cor2.DEPTNO, $t0)):INTEGER, 2)], requiredColunms=[{2}])\n",
                 RelOptUtil.toString(phys));
         }
     }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index 9ec4209..1c53e56 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -100,6 +100,13 @@ public abstract class QueryChecker {
     }
 
     /**
+     * {@link #containsProject(java.lang.String, java.lang.String, int...)} 
reverter.
+     */
+    public static Matcher<String> notContainsProject(String schema, String 
tblName, int... requiredColunms) {
+        return CoreMatchers.not(containsProject(schema, tblName, 
requiredColunms));
+    }
+
+    /**
      * Ignite table|index scan with projects matcher.
      *
      * @param schema  Schema name.
@@ -108,11 +115,12 @@ public abstract class QueryChecker {
      * @return Matcher.
      */
     public static Matcher<String> containsProject(String schema, String 
tblName, int... requiredColunms) {
-        return matches(".*Ignite(Table|Index)Scan\\(table=\\[\\[" + schema + 
", " +
-            tblName + "\\]\\], " + "requiredColunms=\\[\\{" +
+        Matcher<String> res = 
matches(".*Ignite(Table|Index)Scan\\(table=\\[\\[" + schema + ", " +
+            tblName + "\\]\\], " + ".*requiredColunms=\\[\\{" +
             Arrays.toString(requiredColunms)
                 .replaceAll("\\[", "")
-                .replaceAll("]", "") + "\\}\\]\\).*");
+                .replaceAll("]", "") + "\\}\\].*");
+        return res;
     }
 
     /**
@@ -125,10 +133,10 @@ public abstract class QueryChecker {
      */
     public static Matcher<String> containsOneProject(String schema, String 
tblName, int... requiredColunms) {
         return matchesOnce(".*Ignite(Table|Index)Scan\\(table=\\[\\[" + schema 
+ ", " +
-            tblName + "\\]\\], " + "requiredColunms=\\[\\{" +
+            tblName + "\\]\\], " + ".*requiredColunms=\\[\\{" +
             Arrays.toString(requiredColunms)
                 .replaceAll("\\[", "")
-                .replaceAll("]", "") + "\\}\\]\\).*");
+                .replaceAll("]", "") + "\\}\\].*");
     }
 
     /**
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
index abcc20a..983d82e 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
@@ -34,6 +34,7 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static 
org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsIndexScan;
 import static 
org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsOneProject;
+import static 
org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsProject;
 import static 
org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsTableScan;
 import static 
org.apache.ignite.internal.processors.query.calcite.QueryChecker.notContainsProject;
 import static 
org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest.Product;
@@ -82,6 +83,7 @@ public class ProjectScanMergeRuleTest extends 
GridCommonAbstractTest {
         devCache.put(1, new Product(1, "prod1", 1, "cat1", 11, "noname1"));
         devCache.put(2, new Product(2, "prod2", 2, "cat1", 11, "noname2"));
         devCache.put(3, new Product(3, "prod3", 3, "cat1", 12, "noname3"));
+        devCache.put(4, new Product(4, "prod4", 4, "cat1", 13, "noname4"));
 
         awaitPartitionMapExchange();
     }
@@ -106,6 +108,7 @@ public class ProjectScanMergeRuleTest extends 
GridCommonAbstractTest {
             .returns("noname1")
             .returns("noname2")
             .returns("noname3")
+            .returns("noname4")
             .check();
 
         checkQuery("SELECT SUBCAT_ID, NAME FROM products d;")
@@ -114,13 +117,48 @@ public class ProjectScanMergeRuleTest extends 
GridCommonAbstractTest {
             .returns(11, "noname1")
             .returns(11, "noname2")
             .returns(12, "noname3")
+            .returns(13, "noname4")
             .check();
 
         checkQuery("SELECT NAME FROM products d WHERE CAT_ID > 1;")
             .matches(containsIndexScan("PUBLIC", "PRODUCTS"))
+            .matches(containsProject("PUBLIC", "PRODUCTS", 4, 7))
+            .returns("noname2")
+            .returns("noname3")
+            .returns("noname4")
+            .check();
+    }
+
+    /**
+     * Tests projects with nested requests.
+     */
+    @Test
+    public void testNestedProjects() {
+        checkQuery("SELECT NAME FROM products WHERE CAT_ID IN (SELECT CAT_ID 
FROM products WHERE CAT_ID > 1) and ID > 2;")
+            .matches(containsIndexScan("PUBLIC", "PRODUCTS"))
+            .matches(notContainsProject("PUBLIC", "PRODUCTS"))
+            .returns("noname3")
+            .returns("noname4")
+            .check();
+
+        checkQuery("SELECT NAME FROM products WHERE CAT_ID IN (SELECT DISTINCT 
CAT_ID FROM products WHERE CAT_ID > 1)")
+            .matches(containsIndexScan("PUBLIC", "PRODUCTS"))
             .matches(notContainsProject("PUBLIC", "PRODUCTS"))
             .returns("noname2")
             .returns("noname3")
+            .returns("noname4")
+            .check();
+
+        checkQuery("SELECT NAME FROM products WHERE CAT_ID IN (SELECT DISTINCT 
CAT_ID FROM products WHERE SUBCAT_ID > 11)")
+            .matches(containsTableScan("PUBLIC", "PRODUCTS"))
+            .returns("noname3")
+            .returns("noname4")
+            .check();
+
+        checkQuery("SELECT NAME FROM products WHERE CAT_ID = (SELECT CAT_ID 
FROM products WHERE SUBCAT_ID = 13)")
+            .matches(containsTableScan("PUBLIC", "PRODUCTS"))
+            .matches(containsIndexScan("PUBLIC", "PRODUCTS"))
+            .returns("noname4")
             .check();
     }
 }

Reply via email to