This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new a4a1016 [CARBONDATA-3402] Added Inherit Inverted Index Property from
Parent Table for Preagg & MV
a4a1016 is described below
commit a4a101619fd1b13fbb50b090f034103e57361e91
Author: Indhumathi27 <[email protected]>
AuthorDate: Thu May 30 11:12:38 2019 +0530
[CARBONDATA-3402] Added Inherit Inverted Index Property from Parent Table
for Preagg & MV
1. Inherit Inverted Index Property from Parent Table for Preagg & MV datamap
2. When Preaggregate and mv datamap are present, while loading data to
preaggregate table, we should skip applying mv plan
This closes #3248
---
.../carbondata/mv/datamap/MVAnalyzerRule.scala | 10 +++++-
.../mv/rewrite/TestAllOperationsOnMV.scala | 36 ++++++++++++++++++++++
.../scala/org/apache/spark/util/DataMapUtil.scala | 20 ++++++++++++
3 files changed, 65 insertions(+), 1 deletion(-)
diff --git
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index 315c66b..558a5bb 100644
---
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias,
UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, ScalaUDF}
-import org.apache.spark.sql.catalyst.plans.logical.{Command,
DeserializeToObject, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Command,
DeserializeToObject, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -67,6 +67,14 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
needAnalysis = false
attr
}
+ plan.transform {
+ case aggregate@Aggregate(grp, aExp, child) =>
+ // check for if plan is for dataload for preaggregate table, then skip
applying mv
+ if (aExp.exists(p => p.name.equals("preAggLoad") ||
p.name.equals("preAgg"))) {
+ needAnalysis = false
+ }
+ Aggregate(grp, aExp, child)
+ }
val catalog =
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
diff --git
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index fee3d24..e88a565 100644
---
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -424,5 +424,41 @@ class TestAllOperationsOnMV extends QueryTest with
BeforeAndAfterEach {
sql("drop table IF EXISTS maintable")
}
+ test("test preagg and mv") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by
'carbondata'")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("drop datamap if exists dm_mv ")
+ sql("create datamap dm_mv using 'mv' as select name, sum(price) from
maintable group by name")
+ sql("drop datamap if exists dm_pre ")
+ sql("create datamap dm_pre on table maintable using 'preaggregate' as
select name, sum(price) from maintable group by name")
+ sql("insert into table maintable select 'abcd',21,20002")
+ checkAnswer(sql("select count(*) from dm_mv_table"), Seq(Row(2)))
+ checkAnswer(sql("select count(*) from maintable_dm_pre"), Seq(Row(2)))
+ sql("drop table IF EXISTS maintable")
+ }
+
+ test("test inverted index & no-inverted index inherited from parent table")
{
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by
'carbondata' tblproperties('sort_columns'='name',
'inverted_index'='name','sort_scope'='local_sort')")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("drop datamap if exists dm ")
+ sql("create datamap dm using 'mv' as select name, sum(price) from
maintable group by name")
+ checkExistence(sql("describe formatted dm_table"), true, "Inverted Index
Columns maintable_name")
+ checkAnswer(sql("select name, sum(price) from maintable group by name"),
Seq(Row("abc", 2000)))
+ sql("drop table IF EXISTS maintable")
+ }
+
+ test("test inverted index & no-inverted index inherited from parent table -
Preaggregate") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable(name string, c_code int, price int) stored by
'carbondata' tblproperties('sort_columns'='name',
'inverted_index'='name','sort_scope'='local_sort')")
+ sql("insert into table maintable select 'abc',21,2000")
+ sql("drop datamap if exists dm ")
+ sql("create datamap dm on table maintable using 'preaggregate' as select
name, sum(price) from maintable group by name")
+ checkExistence(sql("describe formatted maintable_dm"), true, "Inverted
Index Columns maintable_name")
+ checkAnswer(sql("select name, sum(price) from maintable group by name"),
Seq(Row("abc", 2000)))
+ sql("drop table IF EXISTS maintable")
+ }
+
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
index 2b5a041..faee180 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
@@ -146,6 +146,26 @@ object DataMapUtil {
tableProperties
.put(CarbonCommonConstants.DICTIONARY_EXCLUDE,
newGlobalDictExclude.mkString(","))
}
+
+ val parentInvertedIndex =
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+ .getOrElse(CarbonCommonConstants.INVERTED_INDEX, "").split(",")
+
+ val newInvertedIndex = getDataMapColumns(parentInvertedIndex, fields,
fieldRelationMap)
+
+ val parentNoInvertedIndex =
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+ .getOrElse(CarbonCommonConstants.NO_INVERTED_INDEX, "").split(",")
+
+ val newNoInvertedIndex = getDataMapColumns(parentNoInvertedIndex, fields,
fieldRelationMap)
+
+ if (newInvertedIndex.nonEmpty) {
+ tableProperties
+ .put(CarbonCommonConstants.INVERTED_INDEX,
newInvertedIndex.mkString(","))
+ }
+ if (newNoInvertedIndex.nonEmpty) {
+ tableProperties
+ .put(CarbonCommonConstants.NO_INVERTED_INDEX,
newNoInvertedIndex.mkString(","))
+ }
+
}
private def getDataMapColumns(parentColumns: Array[String], fields:
Seq[Field],