This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c474730  [CARBONDATA-3541]Select queries with Aggregation Functions 
such as variance, stddev,etc fails with MV datamap
c474730 is described below

commit c47473002c92562e12bf39ac94b62c38e06c2fca
Author: Indhumathi27 <[email protected]>
AuthorDate: Fri Oct 4 16:27:53 2019 +0530

    [CARBONDATA-3541]Select queries with Aggregation Functions such as 
variance, stddev,etc fails with MV datamap
    
    Problem:
    Select queries with Aggregation Functions such as variance, stddev,etc with 
group by fails with MV datamap after creation.This is because, while querying, 
aggregation is not added to those columns
    Solution:
    Add aggregate function to those columns(variance, stddev etc) during 
rewriting the plan for MV
    
    This closes #3405
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    | 27 +++++++++++++++++
 .../mv/rewrite/MVIncrementalLoadingTestcase.scala  | 35 ++++++++++++++++++++++
 2 files changed, 62 insertions(+)

diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 8072d96..5a88c43 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -626,6 +626,33 @@ object MVHelper {
             case Alias(agg@AggregateExpression(fun@Count(Seq(child)), _, _, 
_), name) =>
               val uFun = Sum(right)
               Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = 
left.exprId)
+            case Alias(agg@AggregateExpression(fun@Corr(l, r), _, _, _), name) 
=>
+              val uFun = Sum(right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = 
left.exprId)
+            case Alias(agg@AggregateExpression(fun@VariancePop(child), _, _, 
_), name) =>
+              val uFun = Sum(right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = 
left.exprId)
+            case Alias(agg@AggregateExpression(fun@VarianceSamp(child), _, _, 
_), name) =>
+              val uFun = Sum(right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = 
left.exprId)
+            case Alias(agg@AggregateExpression(fun@StddevSamp(child), _, _, 
_), name) =>
+              val uFun = Sum(right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = 
left.exprId)
+            case Alias(agg@AggregateExpression(fun@StddevPop(child), _, _, _), 
name) =>
+              val uFun = Sum(right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = 
left.exprId)
+            case Alias(agg@AggregateExpression(fun@CovPopulation(l, r), _, _, 
_), name) =>
+              val uFun = Sum(right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = 
left.exprId)
+            case Alias(agg@AggregateExpression(fun@CovSample(l, r), _, _, _), 
name) =>
+              val uFun = Sum(right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = 
left.exprId)
+            case Alias(agg@AggregateExpression(fun@Skewness(child), _, _, _), 
name) =>
+              val uFun = Sum(right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = 
left.exprId)
+            case Alias(agg@AggregateExpression(fun@Kurtosis(child), _, _, _), 
name) =>
+              val uFun = Sum(right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = 
left.exprId)
             case _ =>
               if (left.name != right.name) Alias(right, left.name)(exprId = 
left.exprId) else right
           }
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
index b47dc47..32b0567 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -596,6 +596,41 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     assert(TestUtil.verifyMVDataMap(analyzed, "datamap_com"))
   }
 
+  test("test all aggregate functions") {
+    createTableFactTable("test_table")
+    createTableFactTable("test_table1")
+    loadDataToFactTable("test_table")
+    loadDataToFactTable("test_table1")
+    sql("drop datamap if exists datamap1")
+    sql(
+      "create datamap datamap_agg using 'mv' as select 
variance(workgroupcategory),var_samp" +
+      "(projectcode), var_pop(projectcode), 
stddev(projectcode),stddev_samp(workgroupcategory)," +
+      "corr(projectcode,workgroupcategory),skewness(workgroupcategory)," +
+      
"kurtosis(workgroupcategory),covar_pop(projectcode,workgroupcategory),covar_samp"
 +
+      "(projectcode,workgroupcategory),projectjoindate from test_table group 
by projectjoindate")
+    val df = sql(
+      "select variance(workgroupcategory),var_samp(projectcode), 
var_pop(projectcode), stddev" +
+      
"(projectcode),stddev_samp(workgroupcategory),corr(projectcode,workgroupcategory),"
 +
+      
"skewness(workgroupcategory),kurtosis(workgroupcategory),covar_pop(projectcode,"
 +
+      
"workgroupcategory),covar_samp(projectcode,workgroupcategory),projectjoindate 
from " +
+      "test_table group by projectjoindate")
+    val analyzed = df.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_agg"))
+    checkAnswer(sql(
+      "select variance(workgroupcategory),var_samp(projectcode), 
var_pop(projectcode), stddev" +
+      
"(projectcode),stddev_samp(workgroupcategory),corr(projectcode,workgroupcategory),"
 +
+      
"skewness(workgroupcategory),kurtosis(workgroupcategory),covar_pop(projectcode,"
 +
+      
"workgroupcategory),covar_samp(projectcode,workgroupcategory),projectjoindate 
from " +
+      "test_table group by projectjoindate"),
+      sql(
+        "select variance(workgroupcategory),var_samp(projectcode), 
var_pop(projectcode), stddev" +
+        
"(projectcode),stddev_samp(workgroupcategory),corr(projectcode,workgroupcategory),"
 +
+        
"skewness(workgroupcategory),kurtosis(workgroupcategory),covar_pop(projectcode,"
 +
+        
"workgroupcategory),covar_samp(projectcode,workgroupcategory),projectjoindate 
from " +
+        "test_table1 group by projectjoindate"))
+  }
+
+
   override def afterAll(): Unit = {
     sql("drop table if exists products")
     sql("drop table if exists sales")

Reply via email to