[CARBONDATA-2540][CARBONDATA-2560][CARBONDATA-2568][MV] Add validations for 
unsupported MV queries

Problem: Validations are missing on the unsupported MV queries while creating 
MV datamap.
Solution: Added validation for the unsupported MV queries while creating 
datamap.

This closes #2478


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/985115f5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/985115f5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/985115f5

Branch: refs/heads/branch-1.4
Commit: 985115f56723a09cf2162ed99ac44690a63bd21e
Parents: 1c3b8b8
Author: ravipesala <[email protected]>
Authored: Sun Jul 15 17:40:07 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Tue Jul 31 00:11:26 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/mv/datamap/MVHelper.scala | 45 ++++++++++++++++++-
 .../mv/rewrite/SummaryDatasetCatalog.scala      |  7 +++
 .../mv/rewrite/MVCreateTestCase.scala           | 47 ++++++++++++++++++++
 3 files changed, 97 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/985115f5/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 389f5be..a52e4c9 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -34,9 +34,11 @@ import 
org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, 
RelationIdentifier}
+import org.apache.carbondata.datamap.DataMapManager
 import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, 
ModularPlan, Select}
-import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite}
+import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite, 
SummaryDatasetCatalog}
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
@@ -50,7 +52,9 @@ object MVHelper {
       ifNotExistsSet: Boolean = false): Unit = {
     val dmProperties = dataMapSchema.getProperties.asScala
     val updatedQuery = new 
CarbonSpark2SqlParser().addPreAggFunction(queryString)
-    val logicalPlan = 
sparkSession.sql(updatedQuery).drop("preAgg").queryExecution.analyzed
+    val query = sparkSession.sql(updatedQuery)
+    val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
+    validateMVQuery(sparkSession, logicalPlan)
     val fullRebuild = isFullReload(logicalPlan)
     val fields = logicalPlan.output.map { attr =>
       val name = updateColumnName(attr)
@@ -118,6 +122,43 @@ object MVHelper {
     DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
   }
 
+  private def validateMVQuery(sparkSession: SparkSession,
+      logicalPlan: LogicalPlan): Unit = {
+    val dataMapProvider = DataMapManager.get().getDataMapProvider(null,
+      new DataMapSchema("", DataMapClassProvider.MV.getShortName), 
sparkSession)
+    var catalog = 
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
+      DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+    if (catalog == null) {
+      catalog = new SummaryDatasetCatalog(sparkSession)
+    }
+    val modularPlan =
+      catalog.mvSession.sessionState.modularizer.modularize(
+        
catalog.mvSession.sessionState.optimizer.execute(logicalPlan)).next().semiHarmonized
+
+    // Only queries which can be select , predicate , join, group by and 
having queries.
+    if (!modularPlan.isSPJGH)  {
+      throw new UnsupportedOperationException("MV is not supported for this 
query")
+    }
+    val isValid = modularPlan match {
+      case g: GroupBy =>
+        // Make sure all predicates are present in projections.
+        g.predicateList.forall{p =>
+          g.outputList.exists{
+            case a: Alias =>
+              a.semanticEquals(p) || a.child.semanticEquals(p)
+            case other => other.semanticEquals(p)
+          }
+        }
+      case _ => true
+    }
+    if (!isValid) {
+      throw new UnsupportedOperationException("Group by columns must be 
present in project columns")
+    }
+    if (catalog.isMVWithSameQueryPresent(logicalPlan)) {
+      throw new UnsupportedOperationException("MV with same query present")
+    }
+  }
+
   def updateColumnName(attr: Attribute): String = {
     val name =
       attr.name.replace("(", "_")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/985115f5/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
index 3f64c40..210ff65 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -190,6 +190,13 @@ private[mv] class SummaryDatasetCatalog(sparkSession: 
SparkSession)
   }
 
   /**
+   * Check already with same query present in mv
+   */
+  private[mv] def isMVWithSameQueryPresent(query: LogicalPlan) : Boolean = {
+    lookupSummaryDataset(query).nonEmpty
+  }
+
+  /**
    * API for test only
    * Tries to remove the data set for the given [[DataFrame]] from the catalog 
if it's registered
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/985115f5/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 222c9f0..d69df17 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -814,6 +814,53 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("drop datamap if exists datamap_comp_maxsumminavg")
   }
 
+  test("jira carbondata-2540") {
+
+    sql("drop datamap if exists mv_unional")
+    intercept[UnsupportedOperationException] {
+      sql(
+        "create datamap mv_unional using 'mv' as Select Z.deptname From 
(Select deptname,empname From fact_table1 Union All Select deptname,empname 
from fact_table2) Z")
+      sql("rebuild datamap mv_unional")
+    }
+    sql("drop datamap if exists mv_unional")
+  }
+
+  test("jira carbondata-2533") {
+
+    sql("drop datamap if exists MV_exp")
+    intercept[UnsupportedOperationException] {
+      sql(
+        "create datamap MV_exp using 'mv' as select sum(case when deptno=11 
and (utilization=92) then salary else 0 end) as t from fact_table1 group by 
empname")
+
+      sql("rebuild datamap MV_exp")
+      val frame = sql(
+        "select sum(case when deptno=11 and (utilization=92) then salary else 
0 end) as t from fact_table1 group by empname")
+      val analyzed = frame.queryExecution.analyzed
+      assert(verifyMVDataMap(analyzed, "MV_exp"))
+    }
+    sql("drop datamap if exists MV_exp")
+  }
+
+  test("jira carbondata-2560") {
+
+    sql("drop datamap if exists MV_exp1")
+    sql("drop datamap if exists MV_exp2")
+    sql("create datamap MV_exp1 using 'mv' as select empname, sum(utilization) 
from fact_table1 group by empname")
+    intercept[UnsupportedOperationException] {
+      sql(
+        "create datamap MV_exp2 using 'mv' as select empname, sum(utilization) 
from fact_table1 group by empname")
+
+    }
+    sql("show datamap").show()
+    sql("rebuild datamap MV_exp1")
+    val frame = sql(
+      "select empname, sum(utilization) from fact_table1 group by empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "MV_exp1"))
+    sql("drop datamap if exists MV_exp1")
+    sql("drop datamap if exists MV_exp2")
+  }
+
   def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
     val tables = logicalPlan collect {
       case l: LogicalRelation => l.catalogTable.get

Reply via email to