[HOTFIX][PR 2575] Fixed modular plan creation only if valid datamaps are available
update query is failing in spark-2.2 cluster if mv jars are available because catalogs are not empty if datamap are created for other table also and returns true from isValidPlan() inside MVAnalyzerRule. This closes #2579 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b65bf9bc Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b65bf9bc Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b65bf9bc Branch: refs/heads/external-format Commit: b65bf9bc7104cbcfad1277c99090853d9e7b0386 Parents: f52c133 Author: ravipesala <ravi.pes...@gmail.com> Authored: Mon Jul 30 15:00:00 2018 +0530 Committer: kunal642 <kunalkapoor...@gmail.com> Committed: Thu Aug 2 16:52:21 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/datamap/DataMapCatalog.java | 4 +- .../carbondata/mv/datamap/MVAnalyzerRule.scala | 57 ++++++++++++++++---- .../mv/rewrite/SummaryDatasetCatalog.scala | 9 +++- .../mv/rewrite/MVCreateTestCase.scala | 4 ++ 4 files changed, 60 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b65bf9bc/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java index 89f2838..5dd4871 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java @@ -38,10 +38,10 @@ public interface DataMapCatalog<T> { void unregisterSchema(String dataMapName); /** - * List all registered schema catalogs + * List all registered valid schema catalogs * @return */ - T[] listAllSchema(); + T[] listAllValidSchema(); /** * It reloads/removes all registered schema catalogs http://git-wip-us.apache.org/repos/asf/carbondata/blob/b65bf9bc/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala index 483780f..9e0f8e5 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala @@ -16,8 +16,11 @@ */ package org.apache.carbondata.mv.datamap +import scala.collection.JavaConverters._ + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, ScalaUDF} import org.apache.spark.sql.catalyst.plans.logical.{Command, DeserializeToObject, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule @@ -79,27 +82,59 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { } } + /** + * Whether the plan is valid for doing modular plan matching and datamap replacing. + */ def isValidPlan(plan: LogicalPlan, catalog: SummaryDatasetCatalog): Boolean = { - !plan.isInstanceOf[Command] && !isDataMapExists(plan, catalog.listAllSchema()) && - !plan.isInstanceOf[DeserializeToObject] + if (!plan.isInstanceOf[Command] && !plan.isInstanceOf[DeserializeToObject]) { + val catalogs = extractCatalogs(plan) + !isDataMapReplaced(catalog.listAllValidSchema(), catalogs) && + isDataMapExists(catalog.listAllValidSchema(), catalogs) + } else { + false + } + } /** * Check whether datamap table already updated in the query. * - * @param plan - * @param mvs - * @return + * @param mvdataSetArray Array of available mvdataset which include modular plans + * @return Boolean whether already datamap replaced in the plan or not */ - def isDataMapExists(plan: LogicalPlan, mvs: Array[SummaryDataset]): Boolean = { - val catalogs = plan collect { - case l: LogicalRelation => l.catalogTable - } - catalogs.isEmpty || catalogs.exists { c => - mvs.exists { mv => + def isDataMapReplaced( + mvdataSetArray: Array[SummaryDataset], + catalogs: Seq[Option[CatalogTable]]): Boolean = { + catalogs.exists { c => + mvdataSetArray.exists { mv => val identifier = mv.dataMapSchema.getRelationIdentifier identifier.getTableName.equals(c.get.identifier.table) && identifier.getDatabaseName.equals(c.get.database) } } } + + /** + * Check whether any suitable datamaps(like datamap which parent tables are present in the plan) + * exists for this plan. + * + * @param mvs + * @return + */ + def isDataMapExists(mvs: Array[SummaryDataset], catalogs: Seq[Option[CatalogTable]]): Boolean = { + catalogs.exists { c => + mvs.exists { mv => + mv.dataMapSchema.getParentTables.asScala.exists { identifier => + identifier.getTableName.equals(c.get.identifier.table) && + identifier.getDatabaseName.equals(c.get.database) + } + } + } + } + + private def extractCatalogs(plan: LogicalPlan): Seq[Option[CatalogTable]] = { + val catalogs = plan collect { + case l: LogicalRelation => l.catalogTable + } + catalogs + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b65bf9bc/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala index 210ff65..026d6b7 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala @@ -152,7 +152,14 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) } - override def listAllSchema(): Array[SummaryDataset] = summaryDatasets.toArray + override def listAllValidSchema(): Array[SummaryDataset] = { + val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails + // Only select the enabled datamaps for the query. + val enabledDataSets = summaryDatasets.filter { p => + statusDetails.exists(_.getDataMapName.equalsIgnoreCase(p.dataMapSchema.getDataMapName)) + } + enabledDataSets.toArray + } /** * API for test only http://git-wip-us.apache.org/repos/asf/carbondata/blob/b65bf9bc/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala index 0b96202..9f834a9 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala @@ -889,7 +889,10 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { test("basic scenario") { sql("drop table if exists mvtable1") + sql("drop table if exists mvtable2") sql("create table mvtable1(name string,age int,salary int) stored by 'carbondata'") + sql("create table mvtable2(name string,age int,salary int) stored by 'carbondata'") + sql("create datamap MV11 using 'mv' as select name from mvtable2") sql(" insert into mvtable1 select 'n1',12,12") sql(" insert into mvtable1 select 'n1',12,12") sql(" insert into mvtable1 select 'n3',12,12") @@ -897,6 +900,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { sql("update mvtable1 set(name) = ('updatedName')").show() checkAnswer(sql("select count(*) from mvtable1 where name = 'updatedName'"),Seq(Row(4))) sql("drop table if exists mvtable1") + sql("drop table if exists mvtable2") } def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {