This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c474730 [CARBONDATA-3541]Select queries with Aggregation Functions
such as variance, stddev,etc fails with MV datamap
c474730 is described below
commit c47473002c92562e12bf39ac94b62c38e06c2fca
Author: Indhumathi27 <[email protected]>
AuthorDate: Fri Oct 4 16:27:53 2019 +0530
[CARBONDATA-3541]Select queries with Aggregation Functions such as
variance, stddev,etc fails with MV datamap
Problem:
Select queries with Aggregation Functions such as variance, stddev,etc with
group by fails with MV datamap after creation.This is because, while querying,
aggregation is not added to those columns
Solution:
Add aggregate function to those columns(variance, stddev etc) during
rewriting the plan for MV
This closes #3405
---
.../apache/carbondata/mv/datamap/MVHelper.scala | 27 +++++++++++++++++
.../mv/rewrite/MVIncrementalLoadingTestcase.scala | 35 ++++++++++++++++++++++
2 files changed, 62 insertions(+)
diff --git
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 8072d96..5a88c43 100644
---
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -626,6 +626,33 @@ object MVHelper {
case Alias(agg@AggregateExpression(fun@Count(Seq(child)), _, _,
_), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId =
left.exprId)
+ case Alias(agg@AggregateExpression(fun@Corr(l, r), _, _, _), name)
=>
+ val uFun = Sum(right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId =
left.exprId)
+ case Alias(agg@AggregateExpression(fun@VariancePop(child), _, _,
_), name) =>
+ val uFun = Sum(right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId =
left.exprId)
+ case Alias(agg@AggregateExpression(fun@VarianceSamp(child), _, _,
_), name) =>
+ val uFun = Sum(right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId =
left.exprId)
+ case Alias(agg@AggregateExpression(fun@StddevSamp(child), _, _,
_), name) =>
+ val uFun = Sum(right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId =
left.exprId)
+ case Alias(agg@AggregateExpression(fun@StddevPop(child), _, _, _),
name) =>
+ val uFun = Sum(right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId =
left.exprId)
+ case Alias(agg@AggregateExpression(fun@CovPopulation(l, r), _, _,
_), name) =>
+ val uFun = Sum(right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId =
left.exprId)
+ case Alias(agg@AggregateExpression(fun@CovSample(l, r), _, _, _),
name) =>
+ val uFun = Sum(right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId =
left.exprId)
+ case Alias(agg@AggregateExpression(fun@Skewness(child), _, _, _),
name) =>
+ val uFun = Sum(right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId =
left.exprId)
+ case Alias(agg@AggregateExpression(fun@Kurtosis(child), _, _, _),
name) =>
+ val uFun = Sum(right)
+ Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId =
left.exprId)
case _ =>
if (left.name != right.name) Alias(right, left.name)(exprId =
left.exprId) else right
}
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
index b47dc47..32b0567 100644
---
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -596,6 +596,41 @@ class MVIncrementalLoadingTestcase extends QueryTest with
BeforeAndAfterAll {
assert(TestUtil.verifyMVDataMap(analyzed, "datamap_com"))
}
+ test("test all aggregate functions") {
+ createTableFactTable("test_table")
+ createTableFactTable("test_table1")
+ loadDataToFactTable("test_table")
+ loadDataToFactTable("test_table1")
+ sql("drop datamap if exists datamap1")
+ sql(
+ "create datamap datamap_agg using 'mv' as select
variance(workgroupcategory),var_samp" +
+ "(projectcode), var_pop(projectcode),
stddev(projectcode),stddev_samp(workgroupcategory)," +
+ "corr(projectcode,workgroupcategory),skewness(workgroupcategory)," +
+
"kurtosis(workgroupcategory),covar_pop(projectcode,workgroupcategory),covar_samp"
+
+ "(projectcode,workgroupcategory),projectjoindate from test_table group
by projectjoindate")
+ val df = sql(
+ "select variance(workgroupcategory),var_samp(projectcode),
var_pop(projectcode), stddev" +
+
"(projectcode),stddev_samp(workgroupcategory),corr(projectcode,workgroupcategory),"
+
+
"skewness(workgroupcategory),kurtosis(workgroupcategory),covar_pop(projectcode,"
+
+
"workgroupcategory),covar_samp(projectcode,workgroupcategory),projectjoindate
from " +
+ "test_table group by projectjoindate")
+ val analyzed = df.queryExecution.analyzed
+ assert(TestUtil.verifyMVDataMap(analyzed, "datamap_agg"))
+ checkAnswer(sql(
+ "select variance(workgroupcategory),var_samp(projectcode),
var_pop(projectcode), stddev" +
+
"(projectcode),stddev_samp(workgroupcategory),corr(projectcode,workgroupcategory),"
+
+
"skewness(workgroupcategory),kurtosis(workgroupcategory),covar_pop(projectcode,"
+
+
"workgroupcategory),covar_samp(projectcode,workgroupcategory),projectjoindate
from " +
+ "test_table group by projectjoindate"),
+ sql(
+ "select variance(workgroupcategory),var_samp(projectcode),
var_pop(projectcode), stddev" +
+
"(projectcode),stddev_samp(workgroupcategory),corr(projectcode,workgroupcategory),"
+
+
"skewness(workgroupcategory),kurtosis(workgroupcategory),covar_pop(projectcode,"
+
+
"workgroupcategory),covar_samp(projectcode,workgroupcategory),projectjoindate
from " +
+ "test_table1 group by projectjoindate"))
+ }
+
+
override def afterAll(): Unit = {
sql("drop table if exists products")
sql("drop table if exists sales")