This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 32e85c3 IGNITE-14588 Fix pass-through-distribution traits for single
and reduce aggregates - Fixes #9039.
32e85c3 is described below
commit 32e85c3644fb7b9889cb1e226dbf789e7178fa82
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue May 4 09:35:31 2021 +0300
IGNITE-14588 Fix pass-through-distribution traits for single and reduce
aggregates - Fixes #9039.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../calcite/rel/agg/IgniteReduceAggregateBase.java | 11 +++++-
.../calcite/rel/agg/IgniteSingleAggregateBase.java | 11 +++++-
.../query/calcite/CalciteQueryProcessorTest.java | 36 +++++++++++++++++++
.../calcite/planner/AggregatePlannerTest.java | 41 +++++++++++++++++++++-
4 files changed, 96 insertions(+), 3 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
index 0c3f171..33dbca6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteReduceAggregateBase.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
-
import com.google.common.collect.ImmutableList;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.RelOptCluster;
@@ -39,6 +38,7 @@ import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
import
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import
org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
@@ -123,6 +123,15 @@ public abstract class IgniteReduceAggregateBase extends
SingleRel implements Tra
}
/** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>>
passThroughDistribution(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ if (TraitUtils.distribution(nodeTraits) ==
IgniteDistributions.single())
+ return Pair.of(nodeTraits, Commons.transform(inTraits, t ->
t.replace(IgniteDistributions.single())));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public List<Pair<RelTraitSet, List<RelTraitSet>>>
deriveRewindability(
RelTraitSet nodeTraits,
List<RelTraitSet> inputTraits
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
index 348f4b6..aa2cb95 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/agg/IgniteSingleAggregateBase.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.rel.agg;
import java.util.List;
-
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
@@ -32,6 +31,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import
org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
/**
*
@@ -55,6 +55,15 @@ public abstract class IgniteSingleAggregateBase extends
IgniteAggregate implemen
}
/** {@inheritDoc} */
+ @Override public Pair<RelTraitSet, List<RelTraitSet>>
passThroughDistribution(RelTraitSet nodeTraits,
+ List<RelTraitSet> inTraits) {
+ if (TraitUtils.distribution(nodeTraits) ==
IgniteDistributions.single())
+ return Pair.of(nodeTraits, Commons.transform(inTraits, t ->
t.replace(IgniteDistributions.single())));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public List<Pair<RelTraitSet, List<RelTraitSet>>>
deriveRewindability(
RelTraitSet nodeTraits,
List<RelTraitSet> inputTraits
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index cd14468..9b0195b 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -896,6 +896,42 @@ public class CalciteQueryProcessorTest extends
GridCommonAbstractTest {
/** */
@Test
+ public void aggregateNested() throws Exception {
+ String cacheName = "employer";
+
+ IgniteCache<Integer, Employer> employer = client.getOrCreateCache(new
CacheConfiguration<Integer, Employer>()
+ .setName(cacheName)
+ .setSqlSchema("PUBLIC")
+ .setIndexedTypes(Integer.class, Employer.class)
+ .setBackups(2)
+ );
+
+ awaitPartitionMapExchange(true, true, null);
+
+ List<Integer> keysNode0 = primaryKeys(grid(0).cache(cacheName), 2);
+ List<Integer> keysNode1 = primaryKeys(grid(1).cache(cacheName), 1);
+
+ employer.putAll(ImmutableMap.of(
+ keysNode0.get(0), new Employer("Igor", 10d),
+ keysNode0.get(1), new Employer("Roman", 20d) ,
+ keysNode1.get(0), new Employer("Nikolay", 30d)
+ ));
+
+ QueryEngine engine = Commons.lookupComponent(grid(1).context(),
QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> qry = engine.query(null, "PUBLIC",
+ "SELECT avg(salary) FROM " +
+ "(SELECT avg(salary) as salary FROM employer UNION ALL SELECT
salary FROM employer)");
+
+ assertEquals(1, qry.size());
+
+ List<List<?>> rows = qry.get(0).getAll();
+ assertEquals(1, rows.size());
+ assertEquals(20d, F.first(F.first(rows)));
+ }
+
+ /** */
+ @Test
public void query() throws Exception {
IgniteCache<Integer, Developer> developer = grid(1).createCache(new
CacheConfiguration<Integer, Developer>()
.setName("developer")
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
index 1d36838..d1682e0 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AggregatePlannerTest.java
@@ -20,7 +20,6 @@ package
org.apache.ignite.internal.processors.query.calcite.planner;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.sql.SqlExplainLevel;
@@ -39,6 +38,8 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleA
import
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
import
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
@@ -170,6 +171,44 @@ public class AggregatePlannerTest extends
AbstractAggregatePlannerTest {
assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
}
+ /**
+ * Test that aggregate has single distribution output even if parent node
accept random distibution inputs.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void distribution() throws Exception {
+ TestTable tbl =
createAffinityTable().addIndex(RelCollations.of(ImmutableIntList.of(3)),
"grp0");
+
+ IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+ publicSchema.addTable("TEST", tbl);
+
+ String sql = "SELECT AVG(val0), grp0 FROM TEST GROUP BY grp0 UNION ALL
SELECT val0, grp0 FROM test";
+
+ IgniteRel phys = physicalPlan(
+ sql,
+ publicSchema,
+ F.concat(algo.rulesToDisable,
"SortMapReduceAggregateConverterRule",
+ "HashMapReduceAggregateConverterRule")
+ );
+
+ IgniteSingleAggregateBase singleAgg = findFirstNode(phys,
byClass(algo.single));
+
+ assertEquals(IgniteDistributions.single(),
TraitUtils.distribution(singleAgg));
+
+ phys = physicalPlan(
+ sql,
+ publicSchema,
+ F.concat(algo.rulesToDisable, "SortSingleAggregateConverterRule",
+ "HashSingleAggregateConverterRule")
+ );
+
+ IgniteReduceAggregateBase rdcAgg = findFirstNode(phys,
byClass(algo.reduce));
+
+ assertEquals(IgniteDistributions.single(),
TraitUtils.distribution(rdcAgg));
+ }
+
/** */
enum AggregateAlgorithm {
/** */