This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push:
new 18d1d41 IGNITE-13579 Wrong distribution assertion in case of nested
request execution. This closes #8364
18d1d41 is described below
commit 18d1d415be60b6e027383f45b82b44f41ef878b4
Author: zstan <[email protected]>
AuthorDate: Fri Oct 16 16:01:28 2020 +0300
IGNITE-13579 Wrong distribution assertion in case of nested request
execution. This closes #8364
---
.../query/calcite/exec/exp/agg/Accumulators.java | 44 +++++++++++++++++++++-
.../query/calcite/prepare/PlannerPhase.java | 7 ++++
.../calcite/rel/AbstractIgniteNestedLoopJoin.java | 9 +++--
.../query/calcite/rel/IgniteProject.java | 4 +-
.../processors/query/calcite/trait/TraitUtils.java | 2 +-
.../query/calcite/trait/TraitsAwareIgniteRel.java | 37 ++++++++++--------
.../calcite/trait/TraitsPropagationContext.java | 2 -
.../processors/query/calcite/PlannerTest.java | 5 ++-
.../processors/query/calcite/QueryChecker.java | 18 ++++++---
.../calcite/rules/ProjectScanMergeRuleTest.java | 38 +++++++++++++++++++
10 files changed, 133 insertions(+), 33 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
index b3bfc3c..bce0dde 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
@@ -21,12 +21,12 @@ import java.math.BigDecimal;
import java.math.MathContext;
import java.util.List;
import java.util.function.Supplier;
-
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.util.typedef.F;
+import static org.apache.calcite.sql.type.SqlTypeName.ANY;
import static org.apache.calcite.sql.type.SqlTypeName.BIGINT;
import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
import static org.apache.calcite.sql.type.SqlTypeName.DOUBLE;
@@ -49,8 +49,10 @@ public class Accumulators {
return minFactory(call);
case "MAX":
return maxFactory(call);
+ case "SINGLE_VALUE":
+ return SingleVal.FACTORY;
default:
- throw new AssertionError();
+ throw new AssertionError(call.getAggregation().getName());
}
}
@@ -121,6 +123,44 @@ public class Accumulators {
}
/** */
+ private static class SingleVal implements Accumulator {
+ /** */
+ private Object holder;
+
+ /** */
+ public static final Supplier<Accumulator> FACTORY = SingleVal::new;
+
+ /** */
+ @Override public void add(Object... args) {
+ assert args.length == 1 : args.length;
+
+ holder = args[0];
+ }
+
+ /** */
+ @Override public void apply(Accumulator other) {
+ assert holder == null : "sudden apply for: " + other + " on
SingleVal";
+
+ holder = ((SingleVal)other).holder;
+ }
+
+ /** */
+ @Override public Object end() {
+ return holder;
+ }
+
+ /** */
+ @Override public List<RelDataType> argumentTypes(IgniteTypeFactory
typeFactory) {
+ return
F.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(ANY),
true));
+ }
+
+ /** */
+ @Override public RelDataType returnType(IgniteTypeFactory typeFactory)
{
+ return typeFactory.createSqlType(ANY);
+ }
+ }
+
+ /** */
public static class DecimalAvg implements Accumulator {
/** */
public static final Supplier<Accumulator> FACTORY = DecimalAvg::new;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index e06d2e4..0547393 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
+import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule;
+import org.apache.calcite.rel.rules.JoinCommuteRule;
+import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.rel.rules.SubQueryRemoveRule;
@@ -97,6 +100,10 @@ public enum PlannerPhase {
UnionMergeRule.INSTANCE,
UnionConverterRule.INSTANCE,
SortConverterRule.INSTANCE,
+ JoinCommuteRule.INSTANCE,
+ JoinPushThroughJoinRule.LEFT,
+ JoinPushThroughJoinRule.RIGHT,
+ AggregateReduceFunctionsRule.INSTANCE,
SortRemoveRule.INSTANCE);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
index e382599..65d1092 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIgniteNestedLoopJoin.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
@@ -226,11 +225,15 @@ public abstract class AbstractIgniteNestedLoopJoin
extends Join implements Trait
// over a left edge. The code below checks whether a desired collation
is possible and requires
// appropriate collation from the left edge.
+ RelCollation collation = TraitUtils.collation(nodeTraits);
+
RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
- RelTraitSet outTraits, leftTraits, rightTraits;
+ if (collation.equals(RelCollations.EMPTY))
+ return ImmutableList.of(Pair.of(nodeTraits,
+ ImmutableList.of(left.replace(RelCollations.EMPTY),
right.replace(RelCollations.EMPTY))));
- RelCollation collation = TraitUtils.collation(nodeTraits);
+ RelTraitSet outTraits, leftTraits, rightTraits;
if (!projectsLeft(collation))
collation = RelCollations.EMPTY;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index 1fd1e56..45d897a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -69,6 +69,7 @@ public class IgniteProject extends Project implements
TraitsAwareIgniteRel {
super(cluster, traits, input, projects, rowType);
}
+ /** */
public IgniteProject(RelInput input) {
super(changeTraits(input, IgniteConvention.INSTANCE));
}
@@ -123,9 +124,8 @@ public class IgniteProject extends Project implements
TraitsAwareIgniteRel {
srcKeys.add(src);
}
- if (srcKeys.size() == keys.size()) {
+ if (srcKeys.size() == keys.size())
return ImmutableList.of(Pair.of(nodeTraits,
ImmutableList.of(in.replace(hash(srcKeys, distribution.function())))));
- }
return ImmutableList.of(Pair.of(nodeTraits.replace(single()),
ImmutableList.of(in.replace(single()))));
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 4c7afbc..7c1e394 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -149,7 +149,7 @@ public class TraitUtils {
return rel;
}
- /** */
+ /** Change distribution and Convention. */
public static RelTraitSet fixTraits(RelTraitSet traits) {
if (distribution(traits) == any())
traits = traits.replace(single());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
index 2e2096d..0392a36 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsAwareIgniteRel.java
@@ -18,47 +18,52 @@
package org.apache.ignite.internal.processors.query.calcite.trait;
import java.util.List;
-
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.DeriveMode;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.fixTraits;
/** */
public interface TraitsAwareIgniteRel extends IgniteRel {
/** {@inheritDoc} */
@Override public default RelNode passThrough(RelTraitSet required) {
+ required = fixTraits(required);
+
List<RelNode> nodes = TraitsPropagationContext.forPassingThrough(this,
required)
.propagate(this::passThroughCollation)
.propagate(this::passThroughDistribution)
.propagate(this::passThroughRewindability)
.nodes(this::createNode);
- if (U.assertionsEnabled()) {
- RelNode first = F.first(nodes);
+ RelOptPlanner planner = getCluster().getPlanner();
+
+ assert planner instanceof VolcanoPlanner;
- if (first != null) {
- RelTraitSet traits = first.getTraitSet();
+ for (RelNode node : nodes) {
+ RelTraitSet traits = node.getTraitSet();
- for (int i = 1; i < nodes.size(); i++) {
- if (!traits.equals(nodes.get(i).getTraitSet()))
- throw new AssertionError("All produced nodes must have
equal traits. [nodes=" + nodes + "]");
- }
+ // try to fix traits somehow.
+ if (!traits.satisfies(required))
+ node = TraitUtils.enforce(node, required);
+
+ if (node != null) {
+ boolean satisfies = node.getTraitSet().satisfies(required);
+
+ assert satisfies : "current rel=" + getRelTypeName() + ",
traits=" + traits + ", required=" + required;
+
+ planner.register(node, this);
}
}
- RelOptPlanner planner = getCluster().getPlanner();
- for (int i = 1; i < nodes.size(); i++)
- planner.register(nodes.get(i), this);
-
- return F.first(nodes);
+ return RelOptRule.convert(this, required);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsPropagationContext.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsPropagationContext.java
index df5c579..4c936d4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsPropagationContext.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitsPropagationContext.java
@@ -19,7 +19,6 @@ package
org.apache.ignite.internal.processors.query.calcite.trait;
import java.util.List;
import java.util.Set;
-
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.plan.RelTraitSet;
@@ -48,7 +47,6 @@ public class TraitsPropagationContext {
Commons.transform(node.getInputs(), i ->
fixTraits(i.getCluster().traitSet()))));
return new TraitsPropagationContext(variants);
-
}
/**
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
index 2f9f7cd..f05a2e6 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
@@ -2631,9 +2631,10 @@ public class PlannerTest extends GridCommonAbstractTest {
RelNode phys = planner.transform(PlannerPhase.OPTIMIZATION,
desired, rel);
assertNotNull(phys);
-
assertEquals("IgniteCorrelatedNestedLoopJoin(condition=[=(CAST(+($0,
$1)):INTEGER, 2)], joinType=[inner])\n" +
+ assertEquals("" +
+ "IgniteCorrelatedNestedLoopJoin(condition=[=(CAST(+($0,
$1)):INTEGER, 2)], joinType=[inner])\n" +
" IgniteTableScan(table=[[PUBLIC, DEPT]],
requiredColunms=[{0}])\n" +
- " IgniteTableScan(table=[[PUBLIC, EMP]],
filters=[=(CAST(+($cor1.DEPTNO, $t0)):INTEGER, 2)], requiredColunms=[{2}])\n",
+ " IgniteTableScan(table=[[PUBLIC, EMP]],
filters=[=(CAST(+($cor2.DEPTNO, $t0)):INTEGER, 2)], requiredColunms=[{2}])\n",
RelOptUtil.toString(phys));
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
index 9ec4209..1c53e56 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/QueryChecker.java
@@ -100,6 +100,13 @@ public abstract class QueryChecker {
}
/**
+ * {@link #containsProject(java.lang.String, java.lang.String, int...)}
reverter.
+ */
+ public static Matcher<String> notContainsProject(String schema, String
tblName, int... requiredColunms) {
+ return CoreMatchers.not(containsProject(schema, tblName,
requiredColunms));
+ }
+
+ /**
* Ignite table|index scan with projects matcher.
*
* @param schema Schema name.
@@ -108,11 +115,12 @@ public abstract class QueryChecker {
* @return Matcher.
*/
public static Matcher<String> containsProject(String schema, String
tblName, int... requiredColunms) {
- return matches(".*Ignite(Table|Index)Scan\\(table=\\[\\[" + schema +
", " +
- tblName + "\\]\\], " + "requiredColunms=\\[\\{" +
+ Matcher<String> res =
matches(".*Ignite(Table|Index)Scan\\(table=\\[\\[" + schema + ", " +
+ tblName + "\\]\\], " + ".*requiredColunms=\\[\\{" +
Arrays.toString(requiredColunms)
.replaceAll("\\[", "")
- .replaceAll("]", "") + "\\}\\]\\).*");
+ .replaceAll("]", "") + "\\}\\].*");
+ return res;
}
/**
@@ -125,10 +133,10 @@ public abstract class QueryChecker {
*/
public static Matcher<String> containsOneProject(String schema, String
tblName, int... requiredColunms) {
return matchesOnce(".*Ignite(Table|Index)Scan\\(table=\\[\\[" + schema
+ ", " +
- tblName + "\\]\\], " + "requiredColunms=\\[\\{" +
+ tblName + "\\]\\], " + ".*requiredColunms=\\[\\{" +
Arrays.toString(requiredColunms)
.replaceAll("\\[", "")
- .replaceAll("]", "") + "\\}\\]\\).*");
+ .replaceAll("]", "") + "\\}\\].*");
}
/**
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
index abcc20a..983d82e 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/rules/ProjectScanMergeRuleTest.java
@@ -34,6 +34,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static
org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsIndexScan;
import static
org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsOneProject;
+import static
org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsProject;
import static
org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsTableScan;
import static
org.apache.ignite.internal.processors.query.calcite.QueryChecker.notContainsProject;
import static
org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest.Product;
@@ -82,6 +83,7 @@ public class ProjectScanMergeRuleTest extends
GridCommonAbstractTest {
devCache.put(1, new Product(1, "prod1", 1, "cat1", 11, "noname1"));
devCache.put(2, new Product(2, "prod2", 2, "cat1", 11, "noname2"));
devCache.put(3, new Product(3, "prod3", 3, "cat1", 12, "noname3"));
+ devCache.put(4, new Product(4, "prod4", 4, "cat1", 13, "noname4"));
awaitPartitionMapExchange();
}
@@ -106,6 +108,7 @@ public class ProjectScanMergeRuleTest extends
GridCommonAbstractTest {
.returns("noname1")
.returns("noname2")
.returns("noname3")
+ .returns("noname4")
.check();
checkQuery("SELECT SUBCAT_ID, NAME FROM products d;")
@@ -114,13 +117,48 @@ public class ProjectScanMergeRuleTest extends
GridCommonAbstractTest {
.returns(11, "noname1")
.returns(11, "noname2")
.returns(12, "noname3")
+ .returns(13, "noname4")
.check();
checkQuery("SELECT NAME FROM products d WHERE CAT_ID > 1;")
.matches(containsIndexScan("PUBLIC", "PRODUCTS"))
+ .matches(containsProject("PUBLIC", "PRODUCTS", 4, 7))
+ .returns("noname2")
+ .returns("noname3")
+ .returns("noname4")
+ .check();
+ }
+
+ /**
+ * Tests projects with nested requests.
+ */
+ @Test
+ public void testNestedProjects() {
+ checkQuery("SELECT NAME FROM products WHERE CAT_ID IN (SELECT CAT_ID
FROM products WHERE CAT_ID > 1) and ID > 2;")
+ .matches(containsIndexScan("PUBLIC", "PRODUCTS"))
+ .matches(notContainsProject("PUBLIC", "PRODUCTS"))
+ .returns("noname3")
+ .returns("noname4")
+ .check();
+
+ checkQuery("SELECT NAME FROM products WHERE CAT_ID IN (SELECT DISTINCT
CAT_ID FROM products WHERE CAT_ID > 1)")
+ .matches(containsIndexScan("PUBLIC", "PRODUCTS"))
.matches(notContainsProject("PUBLIC", "PRODUCTS"))
.returns("noname2")
.returns("noname3")
+ .returns("noname4")
+ .check();
+
+ checkQuery("SELECT NAME FROM products WHERE CAT_ID IN (SELECT DISTINCT
CAT_ID FROM products WHERE SUBCAT_ID > 11)")
+ .matches(containsTableScan("PUBLIC", "PRODUCTS"))
+ .returns("noname3")
+ .returns("noname4")
+ .check();
+
+ checkQuery("SELECT NAME FROM products WHERE CAT_ID = (SELECT CAT_ID
FROM products WHERE SUBCAT_ID = 13)")
+ .matches(containsTableScan("PUBLIC", "PRODUCTS"))
+ .matches(containsIndexScan("PUBLIC", "PRODUCTS"))
+ .returns("noname4")
.check();
}
}