This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1023ba9  [CARBONDATA-3402] Fix block complex data type and validate 
dmproperties for MV
1023ba9 is described below

commit 1023ba951cc623b8f312e66fa288744705a928de
Author: Indhumathi27 <[email protected]>
AuthorDate: Mon May 27 18:44:33 2019 +0530

    [CARBONDATA-3402] Fix block complex data type and validate dmproperties for 
MV
    
    This PR includes,
    
    Blocked complex data types with mv
    Fixed to_date function while creating mv datamap
    Added inheriting Global dictionary from parent table to child table for 
preaggregate & mv
    Validate DMproperties for MV
    
    This closes #3241
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    |  25 +++-
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  |  34 +++--
 .../carbondata/mv/rewrite/MVCoalesceTestCase.scala |  16 +--
 .../mv/rewrite/MVCountAndCaseTestCase.scala        |   9 +-
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   | 137 ++++++++++-----------
 .../mv/rewrite/MVIncrementalLoadingTestcase.scala  |  37 +++---
 .../mv/rewrite/MVMultiJoinTestCase.scala           |  11 +-
 .../carbondata/mv/rewrite/MVRewriteTestCase.scala  |   9 +-
 .../carbondata/mv/rewrite/MVSampleTestCase.scala   |  25 ++--
 .../carbondata/mv/rewrite/MVTPCDSTestCase.scala    |  28 ++---
 .../carbondata/mv/rewrite/MVTpchTestCase.scala     |  35 +++---
 .../mv/rewrite/TestAllOperationsOnMV.scala         |  61 +++++++++
 .../mv/rewrite/TestPartitionWithMV.scala           |  11 +-
 .../preaggregate/TestPreAggCreateCommand.scala     |   8 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   3 +-
 .../command/management/CarbonLoadDataCommand.scala |  12 +-
 .../scala/org/apache/spark/util/DataMapUtil.scala  |  58 ++++++---
 17 files changed, 282 insertions(+), 237 deletions(-)

diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 8d60a06..57082d7 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.command.{Field, 
PartitionerField, TableMod
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, 
CarbonDropTableCommand}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.types.{ArrayType, MapType, StructType}
 import org.apache.spark.util.{DataMapUtil, PartitionUtils}
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -60,6 +61,7 @@ object MVHelper {
         s"MV datamap does not support streaming"
       )
     }
+    MVUtil.validateDMProperty(dmProperties)
     val updatedQuery = new 
CarbonSpark2SqlParser().addPreAggFunction(queryString)
     val query = sparkSession.sql(updatedQuery)
     val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
@@ -71,6 +73,11 @@ object MVHelper {
     val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
     val fullRebuild = isFullReload(logicalPlan)
     val fields = logicalPlan.output.map { attr =>
+      if (attr.dataType.isInstanceOf[ArrayType] || 
attr.dataType.isInstanceOf[StructType] ||
+          attr.dataType.isInstanceOf[MapType]) {
+        throw new UnsupportedOperationException(
+          s"MV datamap is unsupported for ComplexData type column: " + 
attr.name)
+      }
       val name = updateColumnName(attr)
       val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName
       if (attr.dataType.typeName.startsWith("decimal")) {
@@ -312,13 +319,19 @@ object MVHelper {
     modularPlan.asCompactSQL
   }
 
+  def getUpdatedName(name: String): String = {
+    val updatedName = name.replace("(", "_")
+      .replace(")", "")
+      .replace(" ", "_")
+      .replace("=", "")
+      .replace(",", "")
+      .replace(".", "_")
+      .replace("`", "")
+    updatedName
+  }
+
   def updateColumnName(attr: Attribute): String = {
-    val name =
-      attr.name.replace("(", "_")
-        .replace(")", "")
-        .replace(" ", "_")
-        .replace("=", "")
-        .replace(",", "")
+    val name = getUpdatedName(attr.name)
     attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
   }
 
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 3d42c88..b267203 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.DataType
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -108,6 +109,8 @@ object MVUtil {
           fieldToDataMapFieldMap +=
           getFieldToDataMapFields(name, attr.dataType, None, "", arrayBuffer, 
"")
         }
+      case a@Alias(_, name) =>
+        checkIfComplexDataTypeExists(a)
     }
     fieldToDataMapFieldMap
   }
@@ -138,7 +141,8 @@ object MVUtil {
           } else {
             aggregateType = attr.aggregateFunction.nodeName
           }
-        case Alias(_, name) =>
+        case a@Alias(_, name) =>
+          checkIfComplexDataTypeExists(a)
           // In case of arithmetic expressions like sum(a)+sum(b)
           aggregateType = "arithmetic"
       }
@@ -251,12 +255,7 @@ object MVUtil {
       aggregateType: String,
       columnTableRelationList: ArrayBuffer[ColumnTableRelation],
       parenTableName: String) = {
-    var actualColumnName =
-      name.replace("(", "_")
-        .replace(")", "")
-        .replace(" ", "_")
-        .replace("=", "")
-        .replace(",", "")
+    var actualColumnName = MVHelper.getUpdatedName(name)
     if (qualifier.isDefined) {
       actualColumnName = qualifier.map(qualifier => qualifier + "_" + name)
         .getOrElse(actualColumnName)
@@ -285,4 +284,25 @@ object MVUtil {
         rawSchema = rawSchema), dataMapField)
     }
   }
+
+  private def checkIfComplexDataTypeExists(a: Alias): Unit = {
+    if (a.child.isInstanceOf[GetMapValue] || 
a.child.isInstanceOf[GetStructField] ||
+        a.child.isInstanceOf[GetArrayItem]) {
+      throw new UnsupportedOperationException(
+        s"MV datamap is unsupported for ComplexData type child column: " + 
a.child.simpleString)
+    }
+  }
+
+  def validateDMProperty(tableProperty: mutable.Map[String, String]): Unit = {
+    val tableProperties = Array("dictionary_include", "dictionary_exclude", 
"sort_columns",
+      "local_dictionary_include", "local_dictionary_exclude", 
"long_string_columns",
+      "no_inverted_index", "inverted_index", "column_meta_cache", 
"range_column")
+    val unsupportedProps = tableProperty
+      .filter(f => tableProperties.exists(prop => prop.equalsIgnoreCase(f._1)))
+    if (unsupportedProps.nonEmpty) {
+      throw new MalformedDataMapCommandException(
+        "DMProperties " + unsupportedProps.keySet.mkString(",") +
+        " are not allowed for this datamap")
+    }
+  }
 }
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCoalesceTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCoalesceTestCase.scala
index f2a27c7..255887d 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCoalesceTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCoalesceTestCase.scala
@@ -43,7 +43,7 @@ class MVCoalesceTestCase  extends QueryTest with 
BeforeAndAfterAll  {
     sql("rebuild datamap coalesce_test_main_mv")
 
     val frame = sql("select coalesce(sum(id),0) as sumid,name from 
coalesce_test_main group by name")
-    assert(verifyMVDataMap(frame.queryExecution.analyzed, 
"coalesce_test_main_mv"))
+    assert(TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, 
"coalesce_test_main_mv"))
     checkAnswer(frame, Seq(Row(3, "tom"), Row(3, "lily")))
 
     sql("drop datamap if exists coalesce_test_main_mv")
@@ -59,7 +59,7 @@ class MVCoalesceTestCase  extends QueryTest with 
BeforeAndAfterAll  {
     assert("MV doesn't support Coalesce".equals(exception.getMessage))
 
     val frame = sql("select coalesce(sum(id),0) as sumid,name from 
coalesce_test_main group by name")
-    assert(!verifyMVDataMap(frame.queryExecution.analyzed, 
"coalesce_test_main_mv"))
+    assert(!TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, 
"coalesce_test_main_mv"))
     checkAnswer(frame, Seq(Row(3, "tom"), Row(3, "lily")))
 
     sql("drop datamap if exists coalesce_test_main_mv")
@@ -72,20 +72,22 @@ class MVCoalesceTestCase  extends QueryTest with 
BeforeAndAfterAll  {
     sql("rebuild datamap coalesce_test_main_mv")
 
     val frame = sql("select sum(coalesce(id,0)) as sumid,name from 
coalesce_test_main group by name")
-    assert(verifyMVDataMap(frame.queryExecution.analyzed, 
"coalesce_test_main_mv"))
+    assert(TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, 
"coalesce_test_main_mv"))
     checkAnswer(frame, Seq(Row(3, "tom"), Row(3, "lily")))
 
     sql("drop datamap if exists coalesce_test_main_mv")
   }
 
+  override def afterAll(): Unit ={
+    drop
+  }
+}
+
+object TestUtil {
   def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
     val tables = logicalPlan collect {
       case l: LogicalRelation => l.catalogTable.get
     }
     tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
   }
-
-  override def afterAll(): Unit ={
-    drop
-  }
 }
\ No newline at end of file
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
index af4afb6..f4b7679 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
@@ -79,14 +79,7 @@ class MVCountAndCaseTestCase  extends QueryTest with 
BeforeAndAfterAll{
                        | GROUP BY MT.`3600`, MT.`2250410101`
                        | ORDER BY `3600` ASC LIMIT 5000""".stripMargin)
 
-    assert(verifyMVDataMap(frame.queryExecution.analyzed, "data_table_mv"))
-  }
-
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+    assert(TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, 
"data_table_mv"))
   }
 
   override def afterAll(): Unit = {
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 48f967f..62e320e 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -107,7 +107,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap1 using 'mv' as select empname, designation 
from fact_table1")
     val df = sql("select empname,designation from fact_table1")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
     checkAnswer(df, sql("select empname,designation from fact_table2"))
     sql(s"drop datamap datamap1")
   }
@@ -117,7 +117,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap2 using 'mv' as select empname, designation 
from fact_table1")
     val df = sql("select empname from fact_table1")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap2"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap2"))
     checkAnswer(df, sql("select empname from fact_table2"))
     sql(s"drop datamap datamap2")
   }
@@ -127,7 +127,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap3 using 'mv' as select empname, designation 
from fact_table1")
     val frame = sql("select empname, designation from fact_table1 where 
empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap3"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap3"))
 
     checkAnswer(frame, sql("select empname, designation from fact_table2 where 
empname='shivani'"))
     sql(s"drop datamap datamap3")
@@ -137,7 +137,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap4 using 'mv' as select empname, designation 
from fact_table1")
     val frame = sql("select designation from fact_table1 where 
empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap4"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap4"))
     checkAnswer(frame, sql("select designation from fact_table2 where 
empname='shivani'"))
     sql(s"drop datamap datamap4")
   }
@@ -146,7 +146,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap5 using 'mv' as select empname, designation 
from fact_table1 where empname='shivani'")
     val frame = sql("select designation from fact_table1 where 
empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap5"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap5"))
     checkAnswer(frame, sql("select designation from fact_table2 where 
empname='shivani'"))
     sql(s"drop datamap datamap5")
   }
@@ -155,7 +155,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap6 using 'mv' as select empname, designation 
from fact_table1 where empname='shivani'")
     val frame = sql("select empname,designation from fact_table1 where 
empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap6"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap6"))
     checkAnswer(frame, sql("select empname,designation from fact_table2 where 
empname='shivani'"))
     sql(s"drop datamap datamap6")
   }
@@ -165,7 +165,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select empname,designation from fact_table1 where empname='shivani' and 
designation='SA'")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap7"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap7"))
     checkAnswer(frame, sql("select empname,designation from fact_table2 where 
empname='shivani' and designation='SA'"))
     sql(s"drop datamap datamap7")
   }
@@ -174,7 +174,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap8 using 'mv' as select empname, designation 
from fact_table1 where empname='shivani'")
     val frame = sql("select empname,designation from fact_table1 where 
designation='SA'")
     val analyzed = frame.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed, "datamap8"))
+    assert(!TestUtil.verifyMVDataMap(analyzed, "datamap8"))
     checkAnswer(frame, sql("select empname,designation from fact_table2 where 
designation='SA'"))
     sql(s"drop datamap datamap8")
   }
@@ -183,7 +183,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap9 using 'mv' as select empname, 
designation,deptname  from fact_table1 where deptname='cloud'")
     val frame = sql("select empname,designation from fact_table1 where 
deptname='cloud'")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap9"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap9"))
     checkAnswer(frame, sql("select empname,designation from fact_table2 where 
deptname='cloud'"))
     sql(s"drop datamap datamap9")
   }
@@ -192,7 +192,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap10 using 'mv' as select empname, 
designation,deptname from fact_table1 where deptname='cloud'")
     val frame = sql("select empname,designation from fact_table1")
     val analyzed = frame.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed, "datamap10"))
+    assert(!TestUtil.verifyMVDataMap(analyzed, "datamap10"))
     checkAnswer(frame, sql("select empname,designation from fact_table2"))
     sql(s"drop datamap datamap10")
   }
@@ -201,7 +201,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap11 using 'mv' as select empname, 
designation,deptname from fact_table1 where deptname='cloud'")
     val frame = sql("select empname,designation from fact_table1 where 
designation='SA'")
     val analyzed = frame.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed, "datamap11"))
+    assert(!TestUtil.verifyMVDataMap(analyzed, "datamap11"))
     checkAnswer(frame, sql("select empname,designation from fact_table2 where 
designation='SA'"))
     sql(s"drop datamap datamap11")
   }
@@ -211,7 +211,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap12 using 'mv' as select empname, 
sum(utilization) from fact_table1 group by empname")
     val frame = sql("select empname, sum(utilization) from fact_table1 group 
by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap12"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap12"))
     checkAnswer(frame, sql("select empname, sum(utilization) from fact_table2 
group by empname"))
     sql(s"drop datamap datamap12")
   }
@@ -221,7 +221,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap13 using 'mv' as select empname, 
sum(utilization) from fact_table1 group by empname")
     val frame = sql("select sum(utilization) from fact_table1 group by 
empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap13"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap13"))
     checkAnswer(frame, sql("select sum(utilization) from fact_table2 group by 
empname"))
     sql(s"drop datamap datamap13")
   }
@@ -232,7 +232,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select empname,sum(utilization) from fact_table1 group by empname 
having empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap14"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap14"))
     checkAnswer(frame, sql("select empname,sum(utilization) from fact_table2 
where empname='shivani' group by empname"))
     sql(s"drop datamap datamap14")
   }
@@ -243,7 +243,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select empname, sum(utilization) from fact_table1 group by empname 
having empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap32"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap32"))
     checkAnswer(frame, sql( "select empname, sum(utilization) from fact_table2 
group by empname having empname='shivani'"))
     sql(s"drop datamap datamap32")
   }
@@ -253,7 +253,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select empname,sum(utilization) from fact_table1 where 
empname='shivani' group by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap15"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap15"))
     checkAnswer(frame, sql("select empname,sum(utilization) from fact_table2 
where empname='shivani' group by empname"))
     sql(s"drop datamap datamap15")
   }
@@ -262,7 +262,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap16 using 'mv' as select empname, 
sum(utilization) from fact_table1 where empname='shivani' group by empname")
     val frame = sql("select empname,sum(utilization) from fact_table1 group by 
empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed, "datamap16"))
+    assert(!TestUtil.verifyMVDataMap(analyzed, "datamap16"))
     checkAnswer(frame, sql("select empname,sum(utilization) from fact_table2 
group by empname"))
     sql(s"drop datamap datamap16")
   }
@@ -273,7 +273,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) 
from fact_table1 group" +
       " by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap17"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap17"))
     checkAnswer(frame, sql("select empname, sum(CASE WHEN utilization=27 THEN 
deptno ELSE 0 END) from fact_table2 group" +
                            " by empname"))
     sql(s"drop datamap datamap17")
@@ -285,7 +285,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from 
fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap18"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap18"))
     checkAnswer(frame, sql("select sum(CASE WHEN utilization=27 THEN deptno 
ELSE 0 END) from fact_table2 group by empname"))
     sql(s"drop datamap datamap18")
   }
@@ -296,7 +296,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select count(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from 
fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap19"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap19"))
     checkAnswer(frame, sql("select count(CASE WHEN utilization=27 THEN deptno 
ELSE 0 END) from fact_table2 group by empname"))
     sql(s"drop datamap datamap19")
   }
@@ -308,7 +308,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from 
fact_table1 where " +
       "empname='shivani' group by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap20"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap20"))
     checkAnswer(frame, sql("select sum(CASE WHEN utilization=27 THEN deptno 
ELSE 0 END) from fact_table2 where " +
                            "empname='shivani' group by empname"))
     sql(s"drop datamap datamap20")
@@ -320,7 +320,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 
t2 where t1.empname = t2.empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap21"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap21"))
     checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2 where t1.empname = t2.empname"))
     sql(s"drop datamap datamap21")
   }
@@ -332,7 +332,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 
where t1.empname = " +
       "t2.empname and t1.empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap22"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap22"))
     checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2 where t1.empname = " +
                            "t2.empname and t1.empname='shivani'"))
     sql(s"drop datamap datamap22")
@@ -346,7 +346,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 
where t1.empname = " +
       "t2.empname and t1.empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap23"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap23"))
     checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2 where t1.empname = " +
                            "t2.empname and t1.empname='shivani'"))
     sql(s"drop datamap datamap23")
@@ -358,7 +358,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 
where t1.empname = t2.empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed, "datamap24"))
+    assert(!TestUtil.verifyMVDataMap(analyzed, "datamap24"))
     checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2 where t1.empname = t2.empname"))
     sql(s"drop datamap datamap24")
   }
@@ -369,11 +369,11 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 
t2 where t1.empname = t2.empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed, "datamap25"))
+    assert(!TestUtil.verifyMVDataMap(analyzed, "datamap25"))
     val frame1 = sql(
       "select t1.empname as c1, t2.designation from fact_table1 t1 inner join 
fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3  on 
(t1.empname=t3.empname)")
     val analyzed1 = frame1.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed1, "datamap25"))
+    assert(TestUtil.verifyMVDataMap(analyzed1, "datamap25"))
     checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2 where t1.empname = t2.empname"))
     sql(s"drop datamap datamap25")
   }
@@ -384,7 +384,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 
t2,fact_table3 " +
       "t3  where t1.empname = t2.empname and t1.empname=t3.empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap26"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap26"))
     checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2,fact_table6 " +
                            "t3  where t1.empname = t2.empname and 
t1.empname=t3.empname"))
     sql(s"drop datamap datamap26")
@@ -396,7 +396,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  " +
       "where t1.empname = t2.empname group by t1.empname, t2.designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap27"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap27"))
     checkAnswer(frame, sql("select t1.empname, t2.designation, 
sum(t1.utilization) from fact_table4 t1,fact_table5 t2  " +
                            "where t1.empname = t2.empname group by t1.empname, 
t2.designation"))
     sql(s"drop datamap datamap27")
@@ -409,7 +409,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t2.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  where " +
       "t1.empname = t2.empname group by t2.designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap28"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap28"))
     checkAnswer(frame, sql("select t2.designation, sum(t1.utilization) from 
fact_table4 t1,fact_table5 t2  where " +
                            "t1.empname = t2.empname group by t2.designation"))
     sql(s"drop datamap datamap28")
@@ -422,7 +422,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t2.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  where " +
       "t1.empname = t2.empname and t1.empname='shivani' group by 
t2.designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap29"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap29"))
     checkAnswer(frame, sql("select t2.designation, sum(t1.utilization) from 
fact_table4 t1,fact_table5 t2  where " +
                            "t1.empname = t2.empname and t1.empname='shivani' 
group by t2.designation"))
     sql(s"drop datamap datamap29")
@@ -435,7 +435,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  " +
       "where t1.empname = t2.empname and t2.designation='SA' group by 
t1.empname, t2.designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap30"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap30"))
     checkAnswer(frame, sql("select t1.empname, t2.designation, 
sum(t1.utilization) from fact_table4 t1,fact_table5 t2  " +
                            "where t1.empname = t2.empname and 
t2.designation='SA' group by t1.empname, t2.designation"))
     sql(s"drop datamap datamap30")
@@ -447,7 +447,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select empname, designation, utilization+projectcode from fact_table1")
     val analyzed = frame.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed, "datamap31"))
+    assert(!TestUtil.verifyMVDataMap(analyzed, "datamap31"))
     checkAnswer(frame, sql("select empname, designation, 
utilization+projectcode from fact_table2"))
     sql(s"drop datamap datamap31")
   }
@@ -457,7 +457,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap32 using 'mv' as select empname, 
count(utilization) from fact_table1 group by empname")
     val frame = sql("select empname,count(utilization) from fact_table1 where 
empname='shivani' group by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap32"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap32"))
     checkAnswer(frame, sql("select empname,count(utilization) from fact_table2 
where empname='shivani' group by empname"))
     sql(s"drop datamap datamap32")
   }
@@ -467,7 +467,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap33 using 'mv' as select empname, 
avg(utilization) from fact_table1 group by empname")
     val frame = sql("select empname,avg(utilization) from fact_table1 where 
empname='shivani' group by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap33"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap33"))
     checkAnswer(frame, sql("select empname,avg(utilization) from fact_table2 
where empname='shivani' group by empname"))
     sql(s"drop datamap datamap33")
   }
@@ -479,7 +479,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1 left join fact_table2 t2  " +
       "on t1.empname = t2.empname group by t1.empname, t2.designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap34"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap34"))
     checkAnswer(frame, sql("select t1.empname, t2.designation, 
sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2  " +
                            "on t1.empname = t2.empname group by t1.empname, 
t2.designation"))
     sql(s"drop datamap datamap34")
@@ -490,7 +490,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select designation, sum(utilization) from fact_table1 where 
empname='shivani' group by designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap35"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap35"))
     checkAnswer(frame, sql("select designation, sum(utilization) from 
fact_table2 where empname='shivani' group by designation"))
     sql(s"drop datamap datamap35")
   }
@@ -500,7 +500,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select sum(utilization) from fact_table1 where empname='shivani' group 
by designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap36"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap36"))
     checkAnswer(frame, sql("select sum(utilization) from fact_table2 where 
empname='shivani' group by designation"))
     sql(s"drop datamap datamap36")
   }
@@ -512,7 +512,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, sum(t1.utilization) from fact_table1 t1,fact_table2 
t2  " +
       "where t1.empname = t2.empname group by t1.empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap37"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap37"))
     checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from 
fact_table3 t1,fact_table4 t2  " +
                            "where t1.empname = t2.empname group by t1.empname, 
t1.designation"))
     sql(s"drop datamap datamap37")
@@ -525,7 +525,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t1.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  " +
       "where t1.empname = t2.empname group by t1.empname,t1.designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap38"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap38"))
     checkAnswer(frame, sql("select t1.empname,t1.designation, 
sum(t1.utilization) from fact_table3 t1,fact_table4 t2  " +
                            "where t1.empname = t2.empname group by t1.empname, 
t1.designation"))
     sql(s"drop datamap datamap38")
@@ -538,7 +538,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t1.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  " +
       "where t1.empname = t2.empname and t1.empname='shivani' group by 
t1.empname,t1.designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap39"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap39"))
     checkAnswer(frame, sql("select t1.empname,t1.designation, 
sum(t1.utilization) from fact_table3 t1,fact_table4 t2  " +
                            "where t1.empname = t2.empname and 
t1.empname='shivani' group by t1.empname, t1.designation"))
     sql(s"drop datamap datamap39")
@@ -551,7 +551,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t1.designation, 
sum(t1.utilization),count(t1.utilization) from fact_table1 t1,fact_table2 t2  " 
+
       "where t1.empname = t2.empname and t1.empname='shivani' group by 
t1.empname,t1.designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap40"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap40"))
     checkAnswer(frame, sql("select t1.empname, t1.designation, 
sum(t1.utilization),count(t1.utilization) from fact_table3 t1,fact_table4 t2  " 
+
                            "where t1.empname = t2.empname and 
t1.empname='shivani' group by t1.empname,t1.designation"))
     sql(s"drop datamap datamap40")
@@ -564,7 +564,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1 left join fact_table2 t2  " +
       "on t1.empname = t2.empname where t1.empname='shivani' group by 
t1.empname, t2.designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap41"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap41"))
     checkAnswer(frame, sql("select t1.empname, t2.designation, 
sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2  " +
                            "on t1.empname = t2.empname where 
t1.empname='shivani' group by t1.empname, t2.designation"))
     sql(s"drop datamap datamap41")
@@ -577,7 +577,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join 
fact_table2 t2  " +
       "on t1.empname = t2.empname group by t1.empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap42"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap42"))
     checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from 
fact_table4 t1 left join fact_table5 t2  " +
                            "on t1.empname = t2.empname group by t1.empname"))
     sql(s"drop datamap datamap42")
@@ -590,7 +590,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join 
fact_table2 t2  " +
       "on t1.empname = t2.empname where t1.empname='shivani' group by 
t1.empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap43"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap43"))
     checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from 
fact_table4 t1 left join fact_table5 t2  " +
                            "on t1.empname = t2.empname where 
t1.empname='shivani' group by t1.empname"))
     sql(s"drop datamap datamap43")
@@ -603,7 +603,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join 
fact_table2 t2  " +
       "on t1.empname = t2.empname where t1.empname='shivani' group by 
t1.empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap44"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap44"))
     checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from 
fact_table4 t1 left join fact_table5 t2  " +
                            "on t1.empname = t2.empname where 
t1.empname='shivani' group by t1.empname"))
     sql(s"drop datamap datamap44")
@@ -618,7 +618,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1 left join fact_table2 t2  " +
       "on t1.empname = t2.empname where t2.designation='SA' group by 
t1.empname, t2.designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap45"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap45"))
     checkAnswer(frame, sql("select t1.empname, t2.designation, 
sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2  " +
                            "on t1.empname = t2.empname where 
t2.designation='SA' group by t1.empname, t2.designation"))
     sql(s"drop datamap datamap45")
@@ -635,7 +635,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select name,sum(salary) from test4 group by name")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "mv13"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "mv13"))
   }
 
   test("jira carbondata-2528-1") {
@@ -645,7 +645,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select empname,sum(salary) as total from fact_table1 group by empname 
order by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "MV_order"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "MV_order"))
   }
 
   test("jira carbondata-2528-2") {
@@ -655,7 +655,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select empname,sum(salary)+sum(utilization) as total from fact_table1 
group by empname order by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "MV_order"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "MV_order"))
   }
 
   test("jira carbondata-2528-3") {
@@ -665,7 +665,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select empname,sum(salary)+sum(utilization) as total from fact_table1 
group by empname order by empname DESC")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "MV_order"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "MV_order"))
     sql("drop datamap if exists MV_order")
   }
 
@@ -676,7 +676,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select empname,sum(salary)+sum(utilization) as total from fact_table1 
where empname = 'ravi' group by empname order by empname DESC")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "MV_order"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "MV_order"))
     sql("drop datamap if exists MV_order")
   }
 
@@ -689,11 +689,11 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamv2 using 'mv' as select country,sum(salary) from 
test1 group by country")
     val frame = sql("select country,sum(salary) from test1 where country='USA' 
group by country")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamv2"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamv2"))
     sql("insert into test1 select 'name1','USA',12,23")
     val frame1 = sql("select country,sum(salary) from test1 where 
country='USA' group by country")
     val analyzed1 = frame1.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed1, "datamv2"))
+    assert(TestUtil.verifyMVDataMap(analyzed1, "datamv2"))
     sql("drop datamap if exists datamv2")
     sql("drop table if exists test1")
   }
@@ -705,7 +705,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select sum(salary),substring(empname,2,5),designation from fact_table1 
group by substring(empname,2,5),designation")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "MV_exp"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "MV_exp"))
     sql("drop datamap if exists MV_exp")
   }
 
@@ -725,7 +725,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select doj,sum(salary) from xy.fact_tablexy group by doj")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "MV_exp"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "MV_exp"))
     sql("drop datamap if exists MV_exp")
     sql("""drop database if exists xy cascade""")
   }
@@ -742,7 +742,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap map1 using 'mv' as select name,sum(salary) from 
mvtable1 group by name")
     val frame = sql("select name,sum(salary) from mvtable1 group by name limit 
1")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "map1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "map1"))
     sql("drop datamap if exists map1")
     sql("drop table if exists mvtable1")
   }
@@ -754,7 +754,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select 
empname,max(projectenddate),sum(salary),min(projectjoindate),avg(attendance) 
from fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_comp_maxsumminavg"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_comp_maxsumminavg"))
     sql("drop datamap if exists datamap_comp_maxsumminavg")
   }
 
@@ -778,7 +778,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       val frame = sql(
         "select sum(case when deptno=11 and (utilization=92) then salary else 
0 end) as t from fact_table1 group by empname")
       val analyzed = frame.queryExecution.analyzed
-      assert(verifyMVDataMap(analyzed, "MV_exp"))
+      assert(TestUtil.verifyMVDataMap(analyzed, "MV_exp"))
     }
     sql("drop datamap if exists MV_exp")
   }
@@ -797,7 +797,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select empname, sum(utilization) from fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "MV_exp1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "MV_exp1"))
     sql("drop datamap if exists MV_exp1")
     sql("drop datamap if exists MV_exp2")
   }
@@ -809,7 +809,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "select deptname as babu, sum(salary) from fact_table1 as tt group by 
deptname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap46"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap46"))
     sql("drop datamap if exists datamap46")
   }
 
@@ -820,7 +820,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "SELECT max(utilization) FROM fact_table1 WHERE salary IN (select 
min(salary) from fact_table1 group by empname ) group by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_subqry"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_subqry"))
     sql("drop datamap if exists datamap_subqry")
   }
 
@@ -832,7 +832,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     val frame = sql(
       "SELECT max(utilization) FROM fact_table1 WHERE salary IN (select 
min(salary) from fact_table1) group by empname")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_subqry"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_subqry"))
     sql("drop datamap if exists datamap_subqry")
   }
 
@@ -947,7 +947,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
 
     val frame = sql(querySQL)
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "all_table_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "all_table_mv"))
     assert(1 == frame.collect().size)
 
     sql("drop table if exists all_table")
@@ -963,20 +963,13 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
       val df = sql("select distinct(empname) from limit_fail limit 10")
       sql("select * from limit_fail limit 10").show()
       val analyzed = df.queryExecution.analyzed
-      assert(verifyMVDataMap(analyzed, "limit_fail_dm1"))
+      assert(TestUtil.verifyMVDataMap(analyzed, "limit_fail_dm1"))
     } catch {
       case ex: Exception =>
         assert(false)
     }
   }
 
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
-  }
-
   def drop(): Unit = {
     sql("drop table IF EXISTS fact_table1")
     sql("drop table IF EXISTS fact_table2")
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
index b663ecf..31b41f1 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -60,7 +60,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     val query: String = "select empname from test_table"
     val df1 = sql(s"$query")
     val analyzed1 = df1.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed1, "datamap1"))
+    assert(!TestUtil.verifyMVDataMap(analyzed1, "datamap1"))
     sql(s"rebuild datamap datamap1")
     val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
       CarbonCommonConstants.DATABASE_DEFAULT_NAME,
@@ -73,7 +73,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     assert(segmentList.containsAll( segmentMap.get("default.test_table")))
     val df2 = sql(s"$query")
     val analyzed2 = df2.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed2, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed2, "datamap1"))
     loadDataToFactTable("test_table")
     loadDataToFactTable("test_table1")
     sql(s"rebuild datamap datamap1")
@@ -86,12 +86,12 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
       sql("select empname, designation from test_table1"))
     val df3 = sql(s"$query")
     val analyzed3 = df3.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed3, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed3, "datamap1"))
     loadDataToFactTable("test_table")
     loadDataToFactTable("test_table1")
     val df4 = sql(s"$query")
     val analyzed4 = df4.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed4, "datamap1"))
+    assert(!TestUtil.verifyMVDataMap(analyzed4, "datamap1"))
     checkAnswer(sql("select empname, designation from test_table"),
       sql("select empname, designation from test_table1"))
   }
@@ -141,7 +141,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     var df = sql(
       s"""select a, sum(b) from main_table group by a""".stripMargin)
     var analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
     checkAnswer(sql(" select a, sum(b) from testtable group by a"),
       sql(" select a, sum(b) from main_table group by a"))
     sql("update main_table set(a) = ('aaa') where b = 'abc'").show(false)
@@ -162,7 +162,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     assert(segmentList.containsAll( segmentMap.get("default.main_table")))
     df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
     analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
     checkAnswer(sql(" select a, sum(b) from testtable group by a"),
       sql(" select a, sum(b) from main_table group by a"))
     sql("drop table IF EXISTS main_table")
@@ -327,12 +327,12 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"rebuild datamap datamap1")
     val df = sql("select a, sum(c) from main_table  group by a")
     val analyzed = df.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed, "datamap1"))
+    assert(!TestUtil.verifyMVDataMap(analyzed, "datamap1"))
     sql("reset")
     checkAnswer(sql("select a, sum(c) from main_table  group by a"), 
Seq(Row("a", 1), Row("b", 2)))
     val df1= sql("select a, sum(c) from main_table  group by a")
     val analyzed1 = df1.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed1, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed1, "datamap1"))
     sql("drop table IF EXISTS main_table")
   }
 
@@ -406,7 +406,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     assert(segmentList.containsAll( segmentMap.get("default.test_table")))
     val df2 = sql(s"$query")
     val analyzed2 = df2.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed2, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed2, "datamap1"))
     loadDataToFactTable("test_table")
     loadDataToFactTable("test_table1")
     loadMetadataDetails = 
SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
@@ -418,12 +418,12 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
       sql("select empname, designation from test_table1"))
     val df3 = sql(s"$query")
     val analyzed3 = df3.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed3, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed3, "datamap1"))
     loadDataToFactTable("test_table")
     loadDataToFactTable("test_table1")
     val df4 = sql(s"$query")
     val analyzed4 = df4.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed4, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed4, "datamap1"))
     checkAnswer(sql("select empname, designation from test_table"),
       sql("select empname, designation from test_table1"))
   }
@@ -441,7 +441,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap1 using 'mv' as select a, sum(b) from 
main_table group by a")
     var df = sql(s"""select a, sum(b) from main_table group by 
a""".stripMargin)
     var analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
     checkAnswer(sql(" select a, sum(b) from testtable group by a"),
       sql(" select a, sum(b) from main_table group by a"))
     sql("update main_table set(a) = ('aaa') where b = 'abc'").show(false)
@@ -458,7 +458,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     assert(segmentList.containsAll(segmentMap.get("default.main_table")))
     df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
     analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
     checkAnswer(sql(" select a, sum(b) from testtable group by a"),
       sql(" select a, sum(b) from main_table group by a"))
     sql("drop table IF EXISTS main_table")
@@ -478,7 +478,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap1 using 'mv' as select a, sum(b) from 
main_table group by a")
     var df = sql(s"""select a, sum(b) from main_table group by 
a""".stripMargin)
     var analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
     checkAnswer(sql(" select a, sum(b) from testtable group by a"),
       sql(" select a, sum(b) from main_table group by a"))
     sql("delete from  main_table  where b = 'abc'").show(false)
@@ -495,7 +495,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     assert(segmentList.containsAll(segmentMap.get("default.main_table")))
     df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
     analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
     checkAnswer(sql(" select a, sum(b) from testtable group by a"),
       sql(" select a, sum(b) from main_table group by a"))
     sql("drop table IF EXISTS main_table")
@@ -567,13 +567,6 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     assert(segmentList.containsAll(segmentMap.get("default.test_table")))
   }
 
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName + "_table"))
-  }
-
   override def afterAll(): Unit = {
     sql("drop table if exists products")
     sql("drop table if exists sales")
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
index 4e3eb10..19bc343 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
@@ -52,7 +52,7 @@ class MVMultiJoinTestCase extends QueryTest with 
BeforeAndAfterAll {
         "select p.title,c.title,c.pid,p.aid from areas as p inner join areas 
as c on " +
         "c.pid=p.aid where p.title = 'hebei'")
     val frame = sql(mvSQL)
-    assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
+    assert(TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
     checkAnswer(frame, Seq(Row("hebei","shijiazhuang"), Row("hebei","handan")))
   }
 
@@ -73,17 +73,10 @@ class MVMultiJoinTestCase extends QueryTest with 
BeforeAndAfterAll {
        """.stripMargin
     sql("create datamap table_mv using 'mv' as " + "select 
sdr.name,sum(sdr.score),dim.age,dim_other.height,count(dim.name) as c1, 
count(dim_other.name) as c2 from sdr_table sdr left join dim_table dim on 
sdr.name = dim.name left join dim_table dim_other on sdr.name = dim_other.name 
group by sdr.name,dim.age,dim_other.height")
     val frame = sql(mvSQL)
-    assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
+    assert(TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
     checkAnswer(frame, Seq(Row("lily",80,30,160),Row("tom",120,20,170)))
   }
 
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
-  }
-
   def drop: Unit ={
     sql("drop table if exists areas")
     sql("drop table if exists dim_table")
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala
index 3af1413..9b7727b 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala
@@ -80,14 +80,7 @@ class MVRewriteTestCase extends QueryTest with 
BeforeAndAfterAll {
                                                       | GROUP BY MT.`3600`, 
MT.`2250410101`
                                                       | ORDER BY `3600` ASC 
LIMIT 5000""".stripMargin)
 
-    assert(verifyMVDataMap(frame.queryExecution.analyzed, "data_table_mv"))
-  }
-
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+    assert(TestUtil.verifyMVDataMap(frame.queryExecution.analyzed, 
"data_table_mv"))
   }
 
   override def afterAll(): Unit = {
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
index 1747e51..892e23f 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
@@ -85,7 +85,7 @@ class MVSampleTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_sm1 using 'mv' as ${sampleTestCases(0)._2}")
     val df = sql(sampleTestCases(0)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed, "datamap_sm1"))
+    assert(!TestUtil.verifyMVDataMap(analyzed, "datamap_sm1"))
     sql(s"drop datamap datamap_sm1")
   }
 
@@ -94,7 +94,7 @@ class MVSampleTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_sm2 using 'mv' as ${sampleTestCases(2)._2}")
     val df = sql(sampleTestCases(2)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_sm2"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_sm2"))
     sql(s"drop datamap datamap_sm2")
   }
 
@@ -103,7 +103,7 @@ class MVSampleTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_sm3 using 'mv' as ${sampleTestCases(3)._2}")
     val df = sql(sampleTestCases(3)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_sm3"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_sm3"))
     sql(s"drop datamap datamap_sm3")
   }
 
@@ -112,7 +112,7 @@ class MVSampleTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_sm4 using 'mv' as ${sampleTestCases(4)._2}")
     val df = sql(sampleTestCases(4)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_sm4"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_sm4"))
     sql(s"drop datamap datamap_sm4")
   }
 
@@ -121,7 +121,7 @@ class MVSampleTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_sm5 using 'mv' as ${sampleTestCases(5)._2}")
     val df = sql(sampleTestCases(5)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed, "datamap_sm5"))
+    assert(!TestUtil.verifyMVDataMap(analyzed, "datamap_sm5"))
     sql(s"drop datamap datamap_sm5")
   }
 
@@ -130,7 +130,7 @@ class MVSampleTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_sm6 using 'mv' as ${sampleTestCases(6)._2}")
     val df = sql(sampleTestCases(6)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_sm6"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_sm6"))
     sql(s"drop datamap datamap_sm6")
   }
 
@@ -139,7 +139,7 @@ class MVSampleTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_sm7 using 'mv' as ${sampleTestCases(7)._2}")
     val df = sql(sampleTestCases(7)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_sm7"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_sm7"))
     sql(s"drop datamap datamap_sm7")
   }
 
@@ -148,19 +148,10 @@ class MVSampleTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_sm8 using 'mv' as ${sampleTestCases(8)._2}")
     val df = sql(sampleTestCases(8)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_sm8"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_sm8"))
     sql(s"drop datamap datamap_sm8")
   }
 
-
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
-  }
-
-
   def drop(): Unit = {
     sql("use default")
     sql("drop database if exists sample cascade")
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
index 7304527..3a6b17e 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
@@ -53,7 +53,7 @@ class MVTPCDSTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_tpcds1 using 'mv' as 
${tpcds_1_4_testCases(0)._2}")
     val df = sql(tpcds_1_4_testCases(0)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_tpcds1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_tpcds1"))
     sql(s"drop datamap datamap_tpcds1")
   }
 
@@ -62,7 +62,7 @@ class MVTPCDSTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_tpcds3 using 'mv' as 
${tpcds_1_4_testCases(2)._2}")
     val df = sql(tpcds_1_4_testCases(2)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_tpcds3"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_tpcds3"))
     sql(s"drop datamap datamap_tpcds3")
   }
 
@@ -71,7 +71,7 @@ class MVTPCDSTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_tpcds4 using 'mv' as 
${tpcds_1_4_testCases(3)._2}")
     val df = sql(tpcds_1_4_testCases(3)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_tpcds4"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_tpcds4"))
     sql(s"drop datamap datamap_tpcds4")
   }
 
@@ -80,7 +80,7 @@ class MVTPCDSTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_tpcds5 using 'mv' as 
${tpcds_1_4_testCases(4)._2}")
     val df = sql(tpcds_1_4_testCases(4)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_tpcds5"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_tpcds5"))
     sql(s"drop datamap datamap_tpcds5")
   }
 
@@ -89,7 +89,7 @@ class MVTPCDSTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_tpcds6 using 'mv' as 
${tpcds_1_4_testCases(5)._2}")
     val df = sql(tpcds_1_4_testCases(5)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_tpcds6"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_tpcds6"))
     sql(s"drop datamap datamap_tpcds6")
   }
 
@@ -98,7 +98,7 @@ class MVTPCDSTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_tpcds8 using 'mv' as 
${tpcds_1_4_testCases(7)._2}")
     val df = sql(tpcds_1_4_testCases(7)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_tpcds8"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_tpcds8"))
     sql(s"drop datamap datamap_tpcds8")
   }
 
@@ -107,7 +107,7 @@ class MVTPCDSTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_tpcds11 using 'mv' as 
${tpcds_1_4_testCases(10)._2}")
     val df = sql(tpcds_1_4_testCases(10)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_tpcds11"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_tpcds11"))
     sql(s"drop datamap datamap_tpcds11")
   }
 
@@ -116,7 +116,7 @@ class MVTPCDSTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_tpcds15 using 'mv' as 
${tpcds_1_4_testCases(14)._2}")
     val df = sql(tpcds_1_4_testCases(14)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_tpcds15"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_tpcds15"))
     sql(s"drop datamap datamap_tpcds15")
   }
 
@@ -125,20 +125,10 @@ class MVTPCDSTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"create datamap datamap_tpcds16 using 'mv' as 
${tpcds_1_4_testCases(15)._2}")
     val df = sql(tpcds_1_4_testCases(15)._3)
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap_tpcds16"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_tpcds16"))
     sql(s"drop datamap datamap_tpcds16")
   }
 
-
-
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
-  }
-
-
   def drop(): Unit = {
     sql("use default")
     sql("drop database if exists tpcds cascade")
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
index b5d874a..d17881a 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
@@ -73,7 +73,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll 
{
     sql("create datamap datamap1 using 'mv' as select l_returnflag, 
l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as 
sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as 
count_order from lineitem group by l_returnflag, l_linestatus,l_shipdate")
     val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as 
sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as 
count_order from lineitem where l_shipdate <= date('1998-09-02') group by 
l_returnflag, l_linestatus order by l_returnflag, l_linestatus")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap1"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap1"))
 //    checkAnswer(df, sql("select l_returnflag, l_linestatus, sum(l_quantity) 
as sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as 
count_order from lineitem1 where l_shipdate <= date('1998-09-02') group by 
l_returnflag, l_linestatus order by l_returnflag, l_linestatus"))
     sql(s"drop datamap datamap1")
   }
@@ -83,7 +83,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll 
{
     sql("create datamap datamap2 using 'mv' as select l_returnflag, 
l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as 
sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem group 
by l_returnflag, l_linestatus,l_shipdate order by l_returnflag, l_linestatus")
     val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as 
sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem where 
l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by 
l_returnflag, l_linestatus")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap2"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap2"))
 //    checkAnswer(df, sql("select l_returnflag, l_linestatus, sum(l_quantity) 
as sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem1 
where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus 
order by l_returnflag, l_linestatus"))
     sql(s"drop datamap datamap2")
   }
@@ -93,7 +93,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll 
{
     sql("create datamap datamap3 using 'mv' as select l_returnflag, 
l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as 
sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem group 
by l_returnflag, l_linestatus,l_shipdate")
     val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as 
sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price from lineitem where 
l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by 
l_returnflag, l_linestatus")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap3"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap3"))
 //    checkAnswer(df, sql("select l_returnflag, l_linestatus, sum(l_quantity) 
as sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price from lineitem1 where 
l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by 
l_returnflag, l_linestatus"))
     sql(s"drop datamap datamap3")
   }
@@ -103,7 +103,7 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap4 using 'mv' as select l_orderkey, 
sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority 
from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = 
o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and 
l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, 
o_shippriority")
     val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) 
as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where 
c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey 
and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group 
by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate 
limit 10")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap4"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap4"))
 //    checkAnswer(df, sql("select l_orderkey, sum(l_extendedprice * (1 - 
l_discount)) as revenue, o_orderdate, o_shippriority from customer1, orders1, 
lineitem1 where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and 
l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > 
date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by 
revenue desc, o_orderdate limit 10"))
     sql(s"drop datamap datamap4")
   }
@@ -113,7 +113,7 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap5 using 'mv' as select l_orderkey, 
sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, 
o_shippriority,c_mktsegment,l_shipdate, c_custkey as c1, o_custkey as 
c2,o_orderkey as o1  from customer, orders, lineitem where c_custkey = 
o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, 
o_shippriority,c_mktsegment,l_shipdate,c_custkey,o_custkey, o_orderkey ")
     val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) 
as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where 
c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey 
and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group 
by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate 
limit 10")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap5"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap5"))
 //    checkAnswer(df, sql("select l_orderkey, sum(l_extendedprice * (1 - 
l_discount)) as revenue, o_orderdate, o_shippriority from customer1, orders1, 
lineitem1 where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and 
l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > 
date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by 
revenue desc, o_orderdate limit 10"))
     sql(s"drop datamap datamap5")
   }
@@ -123,7 +123,7 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap5 using 'mv' as select l_orderkey, 
sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, 
o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where 
c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey 
and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group 
by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate")
     val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) 
as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where 
c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey 
and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group 
by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate 
limit 10")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap5"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap5"))
 //    checkAnswer(df, sql("select l_orderkey, sum(l_extendedprice * (1 - 
l_discount)) as revenue, o_orderdate, o_shippriority from customer1, orders1, 
lineitem1 where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and 
l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > 
date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by 
revenue desc, o_orderdate limit 10"))
     sql(s"drop datamap datamap5")
   }
@@ -133,7 +133,7 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap6 using 'mv' as select o_orderpriority, 
count(*) as order_count from orders where o_orderdate >= date('1993-07-01') and 
o_orderdate < date('1993-10-01') and exists ( select * from lineitem where 
l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by 
o_orderpriority order by o_orderpriority")
     val df = sql("select o_orderpriority, count(*) as order_count from orders 
where o_orderdate >= date('1993-07-01') and o_orderdate < date('1993-10-01') 
and exists ( select * from lineitem where l_orderkey = o_orderkey and 
l_commitdate < l_receiptdate ) group by o_orderpriority order by 
o_orderpriority")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap6"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap6"))
 //    checkAnswer(df, sql("select o_orderpriority, count(*) as order_count 
from orders1 where o_orderdate >= date('1993-07-01') and o_orderdate < 
date('1993-10-01') and exists ( select * from lineitem1 where l_orderkey = 
o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by 
o_orderpriority"))
     sql(s"drop datamap datamap6")
   }
@@ -143,7 +143,7 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap7 using 'mv' as select n_name, 
sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, 
lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = 
o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and 
s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and 
o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by 
n_name")
     val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as 
revenue from customer, orders, lineitem, supplier, nation, region where 
c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and 
c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = 
r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and 
o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap7"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap7"))
 //    checkAnswer(df, sql("select n_name, sum(l_extendedprice * (1 - 
l_discount)) as revenue from customer1, orders1, lineitem1, supplier1, nation1, 
region1 where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = 
s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and 
n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= 
date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order 
by revenue desc"))
     sql(s"drop datamap datamap7")
   }
@@ -153,7 +153,7 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap8 using 'mv' as select 
n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue, 
sum(c_custkey), sum(o_custkey), sum(l_orderkey),sum(o_orderkey), 
sum(l_suppkey), sum(s_suppkey), sum(c_nationkey), sum(s_nationkey), 
sum(n_nationkey), sum(n_regionkey), sum(r_regionkey)  from customer, orders, 
lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = 
o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey an [...]
     val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as 
revenue from customer, orders, lineitem, supplier, nation, region where 
c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and 
c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = 
r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and 
o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap8"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap8"))
 //    checkAnswer(df, sql("select n_name, sum(l_extendedprice * (1 - 
l_discount)) as revenue from customer1, orders1, lineitem1, supplier1, nation1, 
region1 where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = 
s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and 
n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= 
date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order 
by revenue desc"))
     sql(s"drop datamap datamap8")
   }
@@ -163,7 +163,7 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * 
l_discount) as revenue, count(l_shipdate), sum(l_discount),sum(l_quantity)  
from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < 
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
     val df = sql("select sum(l_extendedprice * l_discount) as revenue from 
lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < 
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap9"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap9"))
     assert(verifyAgg(analyzed))
 //    checkAnswer(df, sql("select sum(l_extendedprice * l_discount) as revenue 
from lineitem1 where l_shipdate >= date('1994-01-01') and l_shipdate < 
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24"))
     sql(s"drop datamap datamap9")
@@ -174,7 +174,7 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap10 using 'mv' as select sum(l_extendedprice * 
l_discount) as revenue,l_shipdate,l_discount,l_quantity from lineitem group by 
l_shipdate,l_discount,l_quantity")
     val df = sql("select sum(l_extendedprice * l_discount) as revenue from 
lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < 
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap10"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap10"))
     assert(verifyAgg(analyzed))
 //    checkAnswer(df, sql("select sum(l_extendedprice * l_discount) as revenue 
from lineitem1 where l_shipdate >= date('1994-01-01') and l_shipdate < 
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24"))
     sql(s"drop datamap datamap10")
@@ -185,7 +185,7 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , 
l_extendedprice , l_discount, s_suppkey,l_suppkey, o_orderkey,l_orderkey, 
c_custkey, o_custkey, s_nationkey,  n1.n_nationkey, c_nationkey from 
supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and 
o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = 
n1.n_nationkey and c_nationkey = n1.n_nationkey")
     val df = sql("select year(l_shipdate) as l_year, l_extendedprice * (1 - 
l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where 
s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and 
s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name 
= 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between 
date('1995-01-01') and date('1996-12-31')")
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap11"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap11"))
 //    checkAnswer(df, sql("select year(l_shipdate) as l_year, l_extendedprice 
* (1 - l_discount) as volume from supplier1,lineitem1,orders1,customer1,nation1 
n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = 
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and 
( (n1.n_name = 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between 
date('1995-01-01') and date('1996-12-31')"))
     sql(s"drop datamap datamap11")
   }
@@ -195,7 +195,7 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, 
l_extendedprice ,l_discount,s_suppkey, l_suppkey,o_orderkey,l_orderkey, 
c_custkey,o_custkey,s_nationkey,  n1.n_nationkey,c_nationkey from 
supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and 
o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = 
n1.n_nationkey and c_nationkey = n1.n_nationkey")
     val df = sql("select supp_nation, l_year, sum(volume) as revenue from ( 
select n1.n_name as supp_nation, year(l_shipdate) as l_year, l_extendedprice * 
(1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 
where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = 
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and 
( (n1.n_name = 'FRANCE' ) or (n1.n_name = 'GERMANY') ) and l_shipdate between 
date('1995-01-01') and date('19 [...]
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap12"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap12"))
 //    checkAnswer(df, sql("select supp_nation, l_year, sum(volume) as revenue 
from ( select n1.n_name as supp_nation, year(l_shipdate) as l_year, 
l_extendedprice * (1 - l_discount) as volume from 
supplier1,lineitem1,orders1,customer1,nation1 n1 where s_suppkey = l_suppkey 
and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = 
n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE' ) 
or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01' [...]
     sql(s"drop datamap datamap12")
   }
@@ -205,18 +205,11 @@ class MVTpchTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap datamap13 using 'mv' as select n1.n_name as 
supp_nation, n2.n_name as cust_nation, l_shipdate, l_extendedprice * (1 - 
l_discount) as volume from supplier,lineitem,orders,customer,nation n1,nation 
n2 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = 
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey")
     val df = sql("select supp_nation, cust_nation, l_year, sum(volume) as 
revenue from ( select n1.n_name as supp_nation, n2.n_name as cust_nation, 
year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from 
supplier,lineitem,orders,customer,nation n1,nation n2 where s_suppkey = 
l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey 
= n1.n_nationkey and c_nationkey = n2.n_nationkey and ( (n1.n_name = 'FRANCE' 
and n2.n_name = 'GERMANY') or (n1.n_ [...]
     val analyzed = df.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "datamap13"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap13"))
 //    checkAnswer(df, sql("select supp_nation, cust_nation, l_year, 
sum(volume) as revenue from ( select n1.n_name as supp_nation, n2.n_name as 
cust_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as 
volume from supplier,lineitem1,orders1,customer1,nation1 n1,nation1 n2 where 
s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and 
s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey and ( (n1.n_name 
= 'FRANCE' and n2.n_name = 'GERMA [...]
     sql(s"drop datamap datamap13")
   }
 
-
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
-  }
   def verifyAgg(logicalPlan: LogicalPlan): Boolean = {
     var aggExpExists = false
     logicalPlan transformExpressions {
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index 159cdbc..fee3d24 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -363,5 +363,66 @@ class TestAllOperationsOnMV extends QueryTest with 
BeforeAndAfterEach {
     sql("drop table IF EXISTS maintable")
   }
 
+  test("test block complex data types") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code array<int>, price 
struct<b:int>,type map<string, string>) stored by 'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000, 'ab\002type1'")
+    sql("drop datamap if exists dm ")
+    intercept[UnsupportedOperationException] {
+      sql("create datamap dm using 'mv' as select c_code from maintable")
+    }.getMessage.contains("MV datamap is unsupported for ComplexData type 
column: c_code")
+    intercept[UnsupportedOperationException] {
+      sql("create datamap dm using 'mv' as select price from maintable")
+    }.getMessage.contains("MV datamap is unsupported for ComplexData type 
column: price")
+    intercept[UnsupportedOperationException] {
+      sql("create datamap dm using 'mv' as select type from maintable")
+    }.getMessage.contains("MV datamap is unsupported for ComplexData type 
column: type")
+    intercept[UnsupportedOperationException] {
+      sql("create datamap dm using 'mv' as select price.b from maintable")
+    }.getMessage.contains("MV datamap is unsupported for ComplexData type 
child column: price")
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("validate dmproperties") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm ")
+    intercept[MalformedCarbonCommandException] {
+      sql("create datamap dm using 'mv' 
dmproperties('dictionary_include'='name', 'sort_columns'='name') as select name 
from maintable")
+    }.getMessage.contains("DMProperties dictionary_include,sort_columns are 
not allowed for this datamap")
+  }
+
+  test("test todate UDF function with mv") {
+    sql("drop table IF EXISTS maintable")
+    sql("CREATE TABLE maintable (CUST_ID int,CUST_NAME 
String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 
int) STORED BY 'org.apache.carbondata.format'")
+  sql("insert into maintable values(1, 'abc', 'abc001', '1975-06-11 
01:00:03.0','1975-06-11 02:00:03.0', 120, 1234,4.34,24.56,12345, 2464, 45)")
+    sql("drop datamap if exists dm ")
+    sql("create datamap dm using 'mv' as select max(to_date(dob)) , 
min(to_date(dob)) from maintable where to_date(dob)='1975-06-11' or 
to_date(dob)='1975-06-23'")
+    checkExistence(sql("select max(to_date(dob)) , min(to_date(dob)) from 
maintable where to_date(dob)='1975-06-11' or to_date(dob)='1975-06-23'"), true, 
"1975-06-11 1975-06-11")
+  }
+
+  test("test global dictionary inherited from parent table") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata' tblproperties('dictionary_include'='name')")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm ")
+    sql("create datamap dm using 'mv' as select name, sum(price) from 
maintable group by name")
+    checkExistence(sql("describe formatted dm_table"), true, "Global 
Dictionary maintable_name")
+    checkAnswer(sql("select name, sum(price) from maintable group by name"), 
Seq(Row("abc", 2000)))
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test global dictionary inherited from parent table - Preaggregate") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata' tblproperties('dictionary_include'='name')")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm ")
+    sql("create datamap dm on table maintable using 'preaggregate' as select 
name, sum(price) from maintable group by name")
+    checkExistence(sql("describe formatted maintable_dm"), true, "Global 
Dictionary maintable_name")
+    checkAnswer(sql("select name, sum(price) from maintable group by name"), 
Seq(Row("abc", 2000)))
+    sql("drop table IF EXISTS maintable")
+  }
+
 }
 
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
index 56b1f9f..6245dae 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
@@ -98,7 +98,7 @@ class TestPartitionWithMV extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select empname, sum(year) from partitionone group by 
empname, year, month,day"), Seq(Row("v", 2014)))
     val df1 = sql(s"select empname, sum(year) from partitionone group by 
empname, year, month,day")
     val analyzed1 = df1.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed1, "p1"))
+    assert(TestUtil.verifyMVDataMap(analyzed1, "p1"))
     assert(CarbonEnv.getCarbonTable(Some("partition_mv"), 
"p1_table")(sqlContext.sparkSession).isHivePartitionTable)
   }
 
@@ -117,7 +117,7 @@ class TestPartitionWithMV extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select * from partitionone"), Seq(Row("k",2,2014,1,1), 
Row("v",2,2015,1,1)))
     val df1 = sql(s"select empname, sum(year) from partitionone group by 
empname, year, month,day")
     val analyzed1 = df1.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed1, "p1"))
+    assert(TestUtil.verifyMVDataMap(analyzed1, "p1"))
     checkAnswer(sql("select * from p1_table"), Seq(Row("k",2014,2014,1,1), 
Row("v",2015,2015,1,1)))
   }
 
@@ -678,11 +678,4 @@ class TestPartitionWithMV extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists partitionone")
   }
 
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName + "_table"))
-  }
-
 }
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 4149f6e..5aecb08 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -206,8 +206,8 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select 
column3, sum(column3),column5, sum(column5) from maintable group by 
column3,column5")
     val df = sql("select * from maintable_agg0")
     val carbontable = getCarbonTable(df.queryExecution.analyzed)
-    assert(carbontable.getAllMeasures.size()==3)
-    assert(carbontable.getAllDimensions.size()==1)
+    assert(carbontable.getAllMeasures.size()==2)
+    assert(carbontable.getAllDimensions.size()==2)
     carbontable.getAllDimensions.asScala.foreach{ f =>
       assert(!f.getEncoder.contains(Encoding.DICTIONARY))
     }
@@ -218,8 +218,8 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select 
column3, sum(column3),column5, sum(column5) from maintable group by 
column3,column5,column2")
     val df = sql("select * from maintable_agg0")
     val carbontable = getCarbonTable(df.queryExecution.analyzed)
-    assert(carbontable.getAllMeasures.size()==3)
-    assert(carbontable.getAllDimensions.size()==2)
+    assert(carbontable.getAllMeasures.size()==2)
+    assert(carbontable.getAllDimensions.size()==3)
     carbontable.getAllDimensions.asScala.foreach{ f =>
       assert(!f.getEncoder.contains(Encoding.DICTIONARY))
     }
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 703ae37..2b0b2cb 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -971,7 +971,8 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
    */
   def isDataTypeSupportedForDictionary_Exclude(columnDataType: String): 
Boolean = {
     val dataTypes =
-      Array("string", "timestamp", "int", "long", "bigint", "struct", "array", 
"map", "binary")
+      Array("string", "timestamp", "int", "integer", "long", "bigint", 
"struct", "array",
+        "map", "binary")
     dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
   }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index c5e5282..24a4aac 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -612,13 +612,11 @@ case class CarbonLoadDataCommand(
       (dataFrame, dataFrame)
     }
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    if (!table.isChildDataMap) {
-      GlobalDictionaryUtil.generateGlobalDictionary(
-        sparkSession.sqlContext,
-        carbonLoadModel,
-        hadoopConf,
-        dictionaryDataFrame)
-    }
+    GlobalDictionaryUtil.generateGlobalDictionary(
+      sparkSession.sqlContext,
+      carbonLoadModel,
+      hadoopConf,
+      dictionaryDataFrame)
     if (table.isHivePartitionTable) {
       rows = loadDataWithPartition(
         sparkSession,
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
index 24e28ad..2b5a041 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
@@ -115,32 +115,50 @@ object DataMapUtil {
     val parentDictExclude = 
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
       .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, "").split(",")
 
-    val newDictInclude =
-      parentDictInclude.flatMap(parentcol =>
-        fields.collect {
-          case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
-                      fieldRelationMap(col).columnTableRelationList.size == 1 
&&
-                      parentcol.equals(fieldRelationMap(col).
-                        columnTableRelationList.get.head.parentColumnName) =>
-            col.column
-        })
-
-    val newDictExclude = parentDictExclude.flatMap(parentcol =>
+    val parentGlobalDictInclude = 
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.DICTIONARY_INCLUDE, "").split(",")
+
+    val parentGlobalDictExclude = 
parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.DICTIONARY_EXCLUDE, "").split(",")
+
+    val newLocalDictInclude = getDataMapColumns(parentDictInclude, fields, 
fieldRelationMap)
+
+    val newLocalDictExclude = getDataMapColumns(parentDictExclude, fields, 
fieldRelationMap)
+
+    val newGlobalDictInclude = getDataMapColumns(parentGlobalDictInclude, 
fields, fieldRelationMap)
+
+    val newGlobalDictExclude = getDataMapColumns(parentGlobalDictExclude, 
fields, fieldRelationMap)
+
+    if (newLocalDictInclude.nonEmpty) {
+      tableProperties
+        .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, 
newLocalDictInclude.mkString(","))
+    }
+    if (newLocalDictExclude.nonEmpty) {
+      tableProperties
+        .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, 
newLocalDictExclude.mkString(","))
+    }
+
+    if (newGlobalDictInclude.nonEmpty) {
+      tableProperties
+        .put(CarbonCommonConstants.DICTIONARY_INCLUDE, 
newGlobalDictInclude.mkString(","))
+    }
+    if (newGlobalDictExclude.nonEmpty) {
+      tableProperties
+        .put(CarbonCommonConstants.DICTIONARY_EXCLUDE, 
newGlobalDictExclude.mkString(","))
+    }
+  }
+
+  private def getDataMapColumns(parentColumns: Array[String], fields: 
Seq[Field],
+      fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, 
DataMapField]) = {
+    val dataMapColumns = parentColumns.flatMap(parentcol =>
       fields.collect {
         case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
-                    fieldRelationMap(col).columnTableRelationList.size == 2 &&
+                    fieldRelationMap(col).columnTableRelationList.size == 1 &&
                     parentcol.equals(fieldRelationMap(col).
                       columnTableRelationList.get.head.parentColumnName) =>
           col.column
       })
-    if (newDictInclude.nonEmpty) {
-      tableProperties
-        .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, 
newDictInclude.mkString(","))
-    }
-    if (newDictExclude.nonEmpty) {
-      tableProperties
-        .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, 
newDictExclude.mkString(","))
-    }
+    dataMapColumns
   }
 
   /**

Reply via email to