[CARBONDATA-2540][CARBONDATA-2560][CARBONDATA-2568][MV] Add validations for unsupported MV queries
Problem: Validations are missing on the unsupported MV queries while creating MV datamap. Solution: Added validation for the unsupported MV queries while creating datamap. This closes #2478 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/985115f5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/985115f5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/985115f5 Branch: refs/heads/branch-1.4 Commit: 985115f56723a09cf2162ed99ac44690a63bd21e Parents: 1c3b8b8 Author: ravipesala <[email protected]> Authored: Sun Jul 15 17:40:07 2018 +0530 Committer: ravipesala <[email protected]> Committed: Tue Jul 31 00:11:26 2018 +0530 ---------------------------------------------------------------------- .../apache/carbondata/mv/datamap/MVHelper.scala | 45 ++++++++++++++++++- .../mv/rewrite/SummaryDatasetCatalog.scala | 7 +++ .../mv/rewrite/MVCreateTestCase.scala | 47 ++++++++++++++++++++ 3 files changed, 97 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/985115f5/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala index 389f5be..a52e4c9 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala @@ -34,9 +34,11 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier} +import org.apache.carbondata.datamap.DataMapManager import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, ModularPlan, Select} -import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite} +import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite, SummaryDatasetCatalog} import org.apache.carbondata.spark.util.CommonUtil /** @@ -50,7 +52,9 @@ object MVHelper { ifNotExistsSet: Boolean = false): Unit = { val dmProperties = dataMapSchema.getProperties.asScala val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString) - val logicalPlan = sparkSession.sql(updatedQuery).drop("preAgg").queryExecution.analyzed + val query = sparkSession.sql(updatedQuery) + val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed) + validateMVQuery(sparkSession, logicalPlan) val fullRebuild = isFullReload(logicalPlan) val fields = logicalPlan.output.map { attr => val name = updateColumnName(attr) @@ -118,6 +122,43 @@ object MVHelper { DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema) } + private def validateMVQuery(sparkSession: SparkSession, + logicalPlan: LogicalPlan): Unit = { + val dataMapProvider = DataMapManager.get().getDataMapProvider(null, + new DataMapSchema("", DataMapClassProvider.MV.getShortName), sparkSession) + var catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider, + DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog] + if (catalog == null) { + catalog = new SummaryDatasetCatalog(sparkSession) + } + val modularPlan = + catalog.mvSession.sessionState.modularizer.modularize( + catalog.mvSession.sessionState.optimizer.execute(logicalPlan)).next().semiHarmonized + + // Only queries which can be select , predicate , join, group by and having queries. + if (!modularPlan.isSPJGH) { + throw new UnsupportedOperationException("MV is not supported for this query") + } + val isValid = modularPlan match { + case g: GroupBy => + // Make sure all predicates are present in projections. + g.predicateList.forall{p => + g.outputList.exists{ + case a: Alias => + a.semanticEquals(p) || a.child.semanticEquals(p) + case other => other.semanticEquals(p) + } + } + case _ => true + } + if (!isValid) { + throw new UnsupportedOperationException("Group by columns must be present in project columns") + } + if (catalog.isMVWithSameQueryPresent(logicalPlan)) { + throw new UnsupportedOperationException("MV with same query present") + } + } + def updateColumnName(attr: Attribute): String = { val name = attr.name.replace("(", "_") http://git-wip-us.apache.org/repos/asf/carbondata/blob/985115f5/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala index 3f64c40..210ff65 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala @@ -190,6 +190,13 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) } /** + * Check already with same query present in mv + */ + private[mv] def isMVWithSameQueryPresent(query: LogicalPlan) : Boolean = { + lookupSummaryDataset(query).nonEmpty + } + + /** * API for test only * Tries to remove the data set for the given [[DataFrame]] from the catalog if it's registered */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/985115f5/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala index 222c9f0..d69df17 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala @@ -814,6 +814,53 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { sql("drop datamap if exists datamap_comp_maxsumminavg") } + test("jira carbondata-2540") { + + sql("drop datamap if exists mv_unional") + intercept[UnsupportedOperationException] { + sql( + "create datamap mv_unional using 'mv' as Select Z.deptname From (Select deptname,empname From fact_table1 Union All Select deptname,empname from fact_table2) Z") + sql("rebuild datamap mv_unional") + } + sql("drop datamap if exists mv_unional") + } + + test("jira carbondata-2533") { + + sql("drop datamap if exists MV_exp") + intercept[UnsupportedOperationException] { + sql( + "create datamap MV_exp using 'mv' as select sum(case when deptno=11 and (utilization=92) then salary else 0 end) as t from fact_table1 group by empname") + + sql("rebuild datamap MV_exp") + val frame = sql( + "select sum(case when deptno=11 and (utilization=92) then salary else 0 end) as t from fact_table1 group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "MV_exp")) + } + sql("drop datamap if exists MV_exp") + } + + test("jira carbondata-2560") { + + sql("drop datamap if exists MV_exp1") + sql("drop datamap if exists MV_exp2") + sql("create datamap MV_exp1 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname") + intercept[UnsupportedOperationException] { + sql( + "create datamap MV_exp2 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname") + + } + sql("show datamap").show() + sql("rebuild datamap MV_exp1") + val frame = sql( + "select empname, sum(utilization) from fact_table1 group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "MV_exp1")) + sql("drop datamap if exists MV_exp1") + sql("drop datamap if exists MV_exp2") + } + def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = { val tables = logicalPlan collect { case l: LogicalRelation => l.catalogTable.get
