This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new a4a1016  [CARBONDATA-3402] Added Inherit Inverted Index Property from 
Parent Table for Preagg & MV
a4a1016 is described below

commit a4a101619fd1b13fbb50b090f034103e57361e91
Author: Indhumathi27 <[email protected]>
AuthorDate: Thu May 30 11:12:38 2019 +0530

    [CARBONDATA-3402] Added Inherit Inverted Index Property from Parent Table 
for Preagg & MV
    
    1. Inherit Inverted Index Property from Parent Table for Preagg & MV datamap
    2. When Preaggregate and mv datamap are present, while loading data to
    preaggregate table, we should skip applying mv plan
    
    This closes #3248
---
 .../carbondata/mv/datamap/MVAnalyzerRule.scala     | 10 +++++-
 .../mv/rewrite/TestAllOperationsOnMV.scala         | 36 ++++++++++++++++++++++
 .../scala/org/apache/spark/util/DataMapUtil.scala  | 20 ++++++++++++
 3 files changed, 65 insertions(+), 1 deletion(-)

diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index 315c66b..558a5bb 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, 
UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, ScalaUDF}
-import org.apache.spark.sql.catalyst.plans.logical.{Command, 
DeserializeToObject, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Command, 
DeserializeToObject, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
@@ -67,6 +67,14 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
         needAnalysis = false
         attr
     }
+    plan.transform {
+      case aggregate@Aggregate(grp, aExp, child) =>
+        // check for if plan is for dataload for preaggregate table, then skip 
applying mv
+        if (aExp.exists(p => p.name.equals("preAggLoad") || 
p.name.equals("preAgg"))) {
+          needAnalysis = false
+        }
+        Aggregate(grp, aExp, child)
+    }
     val catalog = 
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
       DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
     if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index fee3d24..e88a565 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -424,5 +424,41 @@ class TestAllOperationsOnMV extends QueryTest with 
BeforeAndAfterEach {
     sql("drop table IF EXISTS maintable")
   }
 
+  test("test preagg and mv") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm_mv ")
+    sql("create datamap dm_mv using 'mv' as select name, sum(price) from 
maintable group by name")
+    sql("drop datamap if exists dm_pre ")
+    sql("create datamap dm_pre on table maintable using 'preaggregate' as 
select name, sum(price) from maintable group by name")
+    sql("insert into table maintable select 'abcd',21,20002")
+    checkAnswer(sql("select count(*) from dm_mv_table"), Seq(Row(2)))
+    checkAnswer(sql("select count(*) from maintable_dm_pre"), Seq(Row(2)))
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test inverted index  & no-inverted index inherited from parent table") 
{
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata' tblproperties('sort_columns'='name', 
'inverted_index'='name','sort_scope'='local_sort')")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm ")
+    sql("create datamap dm using 'mv' as select name, sum(price) from 
maintable group by name")
+    checkExistence(sql("describe formatted dm_table"), true, "Inverted Index 
Columns maintable_name")
+    checkAnswer(sql("select name, sum(price) from maintable group by name"), 
Seq(Row("abc", 2000)))
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test inverted index & no-inverted index inherited from parent table - 
Preaggregate") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata' tblproperties('sort_columns'='name', 
'inverted_index'='name','sort_scope'='local_sort')")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm ")
+    sql("create datamap dm on table maintable using 'preaggregate' as select 
name, sum(price) from maintable group by name")
+    checkExistence(sql("describe formatted maintable_dm"), true, "Inverted 
Index Columns maintable_name")
+    checkAnswer(sql("select name, sum(price) from maintable group by name"), 
Seq(Row("abc", 2000)))
+    sql("drop table IF EXISTS maintable")
+  }
+
 }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
index 2b5a041..faee180 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
@@ -146,6 +146,26 @@ object DataMapUtil {
       tableProperties
         .put(CarbonCommonConstants.DICTIONARY_EXCLUDE, 
newGlobalDictExclude.mkString(","))
     }
+
+    val parentInvertedIndex = 
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.INVERTED_INDEX, "").split(",")
+
+    val newInvertedIndex = getDataMapColumns(parentInvertedIndex, fields, 
fieldRelationMap)
+
+    val parentNoInvertedIndex = 
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.NO_INVERTED_INDEX, "").split(",")
+
+    val newNoInvertedIndex = getDataMapColumns(parentNoInvertedIndex, fields, 
fieldRelationMap)
+
+    if (newInvertedIndex.nonEmpty) {
+      tableProperties
+        .put(CarbonCommonConstants.INVERTED_INDEX, 
newInvertedIndex.mkString(","))
+    }
+    if (newNoInvertedIndex.nonEmpty) {
+      tableProperties
+        .put(CarbonCommonConstants.NO_INVERTED_INDEX, 
newNoInvertedIndex.mkString(","))
+    }
+
   }
 
   private def getDataMapColumns(parentColumns: Array[String], fields: 
Seq[Field],

Reply via email to