Repository: carbondata
Updated Branches:
  refs/heads/master 0c200d834 -> e43be5e74


[CARBONDATA-2073][CARBONDATA-1516][Tests] Add test cases for pre-aggregate 
datamap

This closes #1857


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e43be5e7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e43be5e7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e43be5e7

Branch: refs/heads/master
Commit: e43be5e74f1aec4a258d0bcca9ea87893249ffb9
Parents: 0c200d8
Author: xubo245 <[email protected]>
Authored: Thu Feb 8 17:42:34 2018 +0800
Committer: Jacky Li <[email protected]>
Committed: Fri Mar 30 11:11:10 2018 +0800

----------------------------------------------------------------------
 .../preaggregate/TestPreAggCreateCommand.scala  | 123 +++-
 .../TestPreAggregateCompaction.scala            |   3 +
 .../preaggregate/TestPreAggregateDrop.scala     |  50 +-
 .../TestPreAggregateExpressions.scala           |  11 +-
 .../preaggregate/TestPreAggregateLoad.scala     | 585 ++++++++++++++++++-
 .../preaggregate/TestPreAggregateMisc.scala     |   6 +-
 .../TestPreAggregateTableSelection.scala        | 106 ++--
 .../TestPreAggregateWithSubQuery.scala          |  20 +-
 .../InsertIntoCarbonTableTestCase.scala         |   7 +-
 .../carbondata/spark/util/SparkQueryTest.scala  |  50 ++
 .../preaaggregate/PreAggregateUtil.scala        |   2 +-
 .../sql/hive/CarbonPreAggregateRules.scala      |   2 +-
 12 files changed, 869 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 8e499ba..e546fe8 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -35,6 +35,8 @@ import org.apache.carbondata.core.util.CarbonProperties
 
 class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
 
+  val timeSeries = TIMESERIES.toString
+
   override def beforeAll {
     sql("drop database if exists otherDB cascade")
     sql("drop table if exists PreAggMain")
@@ -55,9 +57,12 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test pre agg create table 2") {
+    dropDataMaps("PreAggMain", "preagg2")
     sql("create datamap preagg2 on table PreAggMain using 'preaggregate' as 
select a as a1,sum(b) as udfsum from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, 
"preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, 
"preaggmain_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), false, 
"preaggmain_a1")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), false, 
"preaggmain_udfsum")
     sql("drop datamap preagg2 on table PreAggMain")
   }
 
@@ -69,10 +74,22 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("drop datamap preagg11 on table PreAggMain1")
   }
 
+  test("test pre agg create table 6") {
+    sql("create datamap preagg12 on table PreAggMain1 using 'preaggregate' as 
select a as a1,sum(b) as sum from PreAggMain1 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, 
"preaggmain1_a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, 
"preaggmain1_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), false, 
"preaggmain1_a1")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), false, 
"preaggmain1_sum")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, 
"DICTIONARY")
+    sql("drop datamap preagg12 on table PreAggMain1")
+  }
+
   test("test pre agg create table 8") {
     sql("create datamap preagg14 on table PreAggMain1 using 'preaggregate' as 
select a as a1,sum(b) as sum from PreAggMain1 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, 
"preaggmain1_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, 
"preaggmain1_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), false, 
"preaggmain1_a1")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), false, 
"preaggmain1_sum")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, 
"DICTIONARY")
     sql("drop datamap preagg14 on table PreAggMain1")
   }
@@ -82,6 +99,7 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), true, 
"preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), true, 
"preaggmain_b_sum")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), true, 
"preaggmain_b_count")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), false, 
"preaggmain2_b_avg")
     sql("drop datamap preagg15 on table PreAggMain")
   }
 
@@ -132,7 +150,7 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     assert(exception.getMessage.equals("Distinct is not supported On Pre 
Aggregation"))
   }
 
-  test("test pre agg create table 15") {
+  test("test pre agg create table 15: don't support where") {
     intercept[Exception] {
       sql(
         s"""
@@ -149,7 +167,7 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
   test("test pre agg create table 16") {
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select 
column4, sum(column4) from maintable group by column4")
     val df = sql("select * from maintable_agg0")
-    val carbontable = getCarbontable(df.queryExecution.analyzed)
+    val carbontable = getCarbonTable(df.queryExecution.analyzed)
     assert(carbontable.getAllMeasures.size()==2)
     assert(carbontable.getAllDimensions.size()==0)
     sql("drop datamap agg0 on table maintable")
@@ -158,7 +176,7 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
   test("test pre agg create table 17") {
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select 
column1, sum(column1),column6, sum(column6) from maintable group by 
column6,column1")
     val df = sql("select * from maintable_agg0")
-    val carbontable = getCarbontable(df.queryExecution.analyzed)
+    val carbontable = getCarbonTable(df.queryExecution.analyzed)
     assert(carbontable.getAllMeasures.size()==2)
     assert(carbontable.getAllDimensions.size()==2)
     carbontable.getAllDimensions.asScala.foreach{ f =>
@@ -170,7 +188,7 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
   test("test pre agg create table 18") {
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select 
column1, count(column1),column6, count(column6) from maintable group by 
column6,column1")
     val df = sql("select * from maintable_agg0")
-    val carbontable = getCarbontable(df.queryExecution.analyzed)
+    val carbontable = getCarbonTable(df.queryExecution.analyzed)
     assert(carbontable.getAllMeasures.size()==2)
     assert(carbontable.getAllDimensions.size()==2)
     carbontable.getAllDimensions.asScala.foreach{ f =>
@@ -182,7 +200,7 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
   test("test pre agg create table 19") {
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select 
column3, sum(column3),column5, sum(column5) from maintable group by 
column3,column5")
     val df = sql("select * from maintable_agg0")
-    val carbontable = getCarbontable(df.queryExecution.analyzed)
+    val carbontable = getCarbonTable(df.queryExecution.analyzed)
     assert(carbontable.getAllMeasures.size()==2)
     assert(carbontable.getAllDimensions.size()==2)
     carbontable.getAllDimensions.asScala.foreach{ f =>
@@ -194,7 +212,7 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
   test("test pre agg create table 20") {
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select 
column3, sum(column3),column5, sum(column5) from maintable group by 
column3,column5,column2")
     val df = sql("select * from maintable_agg0")
-    val carbontable = getCarbontable(df.queryExecution.analyzed)
+    val carbontable = getCarbonTable(df.queryExecution.analyzed)
     assert(carbontable.getAllMeasures.size()==2)
     assert(carbontable.getAllDimensions.size()==3)
     carbontable.getAllDimensions.asScala.foreach{ f =>
@@ -203,7 +221,6 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("drop datamap agg0 on table maintable")
   }
 
-  val timeSeries = TIMESERIES.toString
   test("remove agg tables from show table command") {
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,"false")
     sql("DROP TABLE IF EXISTS tbl_1")
@@ -219,7 +236,7 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT)
   }
 
-  test("test pre agg  create table 21: create with preaggregate and 
hierarchy") {
+  test("test pre agg create table 22: create with preaggregate and 
granularity") {
     sql("DROP TABLE IF EXISTS maintabletime")
     sql(
       """
@@ -271,19 +288,18 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 
-  test("remove  agg tables from show table command") {
-    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,"false")
+  test("test pre agg create table 24: remove agg tables from show table 
command") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
 "false")
     sql("DROP TABLE IF EXISTS tbl_1")
     sql("create table if not exists  tbl_1(imei string,age int,mac string 
,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 
'carbondata' ")
     sql("create datamap agg1 on table tbl_1 using 'preaggregate' as select 
mac, sum(age) from tbl_1 group by mac")
     sql("create table if not exists  sparktable(imei string,age int,mac string 
,prodate timestamp,update timestamp,gamepoint double,contrid double) ")
     checkExistence(sql("show tables"), false, "tbl_1_agg1")
-    checkExistence(sql("show tables"), true, "sparktable","tbl_1")
+    checkExistence(sql("show tables"), true, "sparktable", "tbl_1")
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
 CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT)
   }
 
-
-  test("remove TimeSeries agg tables from show table command") {
+  test("test pre agg create table 25: remove TimeSeries agg tables from show 
table command") {
     sql("DROP TABLE IF EXISTS tbl_1")
     sql("create table if not exists  tbl_1(imei string,age int,mac string 
,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 
'carbondata' ")
     sql(
@@ -307,6 +323,25 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 
+  test("test pre agg create table 22: don't support create datamap if 
exists'") {
+    val e: Exception = intercept[AnalysisException] {
+      sql(
+        """
+          | CREATE DATAMAP IF EXISTS agg0 ON TABLE mainTable
+          | USING 'preaggregate'
+          | AS SELECT
+          |   column3,
+          |   sum(column3),
+          |   column5,
+          |   sum(column5)
+          | FROM maintable
+          | GROUP BY column3,column5,column2
+        """.stripMargin)
+      assert(true)
+    }
+    assert(e.getMessage.contains("identifier matching regex"))
+    sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
+  }
 
   test("test show tables filtered with datamaps") {
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,"false")
@@ -336,7 +371,65 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("use default")
   }
 
-  def getCarbontable(plan: LogicalPlan) : CarbonTable ={
+  // TODO: to be confirmed
+  test("test pre agg create table 26") {
+    sql("drop datamap if exists preagg2 on table PreAggMain")
+    sql("create datamap preagg2 on table PreAggMain using 'preaggregate' as 
select a as a1,sum(b) from PreAggMain group by a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, 
"preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, 
"preaggmain_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), false, 
"preaggmain_a1")
+    intercept[Exception] {
+      sql("select a1 from PreAggMain_preagg2").show()
+    }
+    sql("drop datamap if exists preagg2 on table PreAggMain")
+  }
+
+  test("test pre agg create table 27: select * and no group by") {
+    intercept[Exception] {
+      sql(
+        """
+          | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable
+          | USING 'preaggregate'
+          | AS SELECT *  FROM maintable
+        """.stripMargin)
+    }
+  }
+
+  // TODO : to be confirmed
+  test("test pre agg create table 28: select *") {
+    intercept[Exception] {
+      sql(
+        """
+          | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable
+          | USING 'preaggregate'
+          | AS SELECT *  FROM maintable
+          | group by a
+        """.stripMargin)
+    }
+  }
+
+  test("test pre agg create table 29") {
+    intercept[Exception] {
+      sql(
+        s"""
+           | create datamap preagg21 on table PreAggMain2
+           | using 'preaggregate'
+           | as select a as a1,sum(b)
+           | from PreAggMain2
+           | where a>'vishal'
+           | group by a
+         """.stripMargin)
+    }
+  }
+
+  test("test pre agg create table 30: DESCRIBE FORMATTED") {
+    dropDataMaps("PreAggMain", "preagg2")
+    intercept[Exception] {
+      sql("DESCRIBE FORMATTED PreAggMain_preagg2").show()
+    }
+  }
+
+  def getCarbonTable(plan: LogicalPlan) : CarbonTable ={
     var carbonTable : CarbonTable = null
     plan.transform {
       // first check if any preaTable1 scala function is applied it is present 
is in plan
@@ -368,5 +461,7 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists maintabletime")
     sql("drop table if exists showTables")
     sql("drop table if exists Preagg_twodb")
+    sql("DROP TABLE IF EXISTS tbl_1")
+    sql("DROP TABLE IF EXISTS sparktable")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
index 89cf8eb..42209b3 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
@@ -154,6 +154,9 @@ class TestPreAggregateCompaction extends QueryTest with 
BeforeAndAfterEach with
     segmentNamesSum = sql("show segments for table 
maintable_preagg_sum").collect().map(_.get(0).toString)
     segmentNamesSum should equal (Array("11", "10", "9", "8.1", "8", "7", "6", 
"5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0"))
     checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from 
maintable_preagg_sum group by maintable_id"), sumResult)
+    val mainTableSegment = sql("SHOW SEGMENTS FOR TABLE maintable")
+    val SegmentSequenceIds = mainTableSegment.collect().map { each => 
(each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
   }
 
   test("test if minor/major compaction is successful for pre-agg table") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
index a96a19d..8757c76 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
 
@@ -34,7 +35,7 @@ class TestPreAggregateDrop extends QueryTest with 
BeforeAndAfterAll {
       "create datamap preagg1 on table maintable using 'preaggregate' as 
select" +
       " a,sum(b) from maintable group by a")
     sql("drop datamap if exists preagg1 on table maintable")
-    checkExistence(sql("show tables"), false, "maintable_preagg1")
+    checkExistence(sql("SHOW DATAMAP ON TABLE maintable"), false, 
"maintable_preagg1")
   }
 
   test("dropping 1 aggregate table should not drop others") {
@@ -65,13 +66,11 @@ class TestPreAggregateDrop extends QueryTest with 
BeforeAndAfterAll {
     " a,sum(c) from maintable1 group by a")
 
     sql("drop datamap preagg_same on table maintable")
-    var showTables = sql("show tables")
-    val showdatamaps =sql("show datamap on table maintable1")
-    checkExistence(showTables, false, "maintable_preagg_same")
-    checkExistence(showdatamaps, true, "maintable1_preagg_same")
+    val showDataMaps = sql("SHOW DATAMAP ON TABLE maintable1")
+    checkExistence(showDataMaps, false, "maintable_preagg_same")
+    checkExistence(showDataMaps, true, "maintable1_preagg_same")
     sql("drop datamap preagg_same on table maintable1")
-    showTables = sql("show tables")
-    checkExistence(showTables, false, "maintable1_preagg_same")
+    checkExistence(sql("SHOW DATAMAP ON TABLE maintable1"), false, 
"maintable1_preagg_same")
     sql("drop table if exists maintable1")
   }
 
@@ -80,8 +79,7 @@ class TestPreAggregateDrop extends QueryTest with 
BeforeAndAfterAll {
     " a,sum(c) from maintable group by a")
 
     sql("drop datamap preagg_same1 on table maintable")
-    var showTables = sql("show tables")
-    checkExistence(showTables, false, "maintable_preagg_same1")
+    checkExistence(sql("SHOW DATAMAP ON TABLE maintable"), false, 
"maintable_preagg_same1")
     sql("create datamap preagg_same1 on table maintable using 'preaggregate' 
as select" +
         " a,sum(c) from maintable group by a")
     val showDatamaps =sql("show datamap on table maintable")
@@ -97,6 +95,40 @@ class TestPreAggregateDrop extends QueryTest with 
BeforeAndAfterAll {
     checkExistence(sql("show tables"), false, "maintable_preagg1", 
"maintable", "maintable_preagg2")
   }
 
+  test("drop datamap with 'if exists' when datamap not exists") {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql("CREATE TABLE maintable (a STRING, b STRING, c STRING) STORED BY 
'carbondata'")
+    sql("DROP DATAMAP IF EXISTS not_exists_datamap ON TABLE maintable")
+    checkExistence(sql("DESCRIBE FORMATTED maintable"), false, 
"not_exists_datamap")
+  }
+
+  test("drop datamap without 'if exists' when datamap not exists") {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql("CREATE TABLE maintable (a STRING, b STRING, c STRING) STORED BY 
'carbondata'")
+    sql("DROP DATAMAP IF EXISTS not_exists_datamap ON TABLE maintable")
+    val e = intercept[NoSuchDataMapException] {
+      sql("DROP DATAMAP not_exists_datamap ON TABLE maintable")
+    }
+    assert(e.getMessage.equals(
+      "Datamap with name not_exists_datamap does not exist under table 
maintable"))
+  }
+
+  test("drop datamap without 'if exists' when main table not exists") {
+    sql("DROP TABLE IF EXISTS maintable")
+    val e = intercept[ProcessMetaDataException] {
+      sql("DROP DATAMAP preagg3 ON TABLE maintable")
+    }
+    assert(e.getMessage.contains("Table or view 'maintable' not found in"))
+  }
+
+  test("drop datamap with 'if exists' when main table not exists") {
+    sql("DROP TABLE IF EXISTS maintable")
+    val e = intercept[ProcessMetaDataException] {
+      sql("DROP DATAMAP IF EXISTS preagg3 ON TABLE maintable")
+    }
+    assert(e.getMessage.contains("Table or view 'maintable' not found in"))
+  }
+
   override def afterAll() {
     sql("drop table if exists maintable")
     sql("drop table if exists maintable1")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
index 0b22c56..b3b71a6 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
@@ -31,6 +31,7 @@ class TestPreAggregateExpressions extends QueryTest with 
BeforeAndAfterAll {
     sql("CREATE TABLE mainTable(id int, name string, city string, age string) 
STORED BY 'org.apache.carbondata.format'")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table mainTable")
   }
+
   test("test pre agg create table with expression 1") {
     sql(
       s"""
@@ -99,12 +100,12 @@ class TestPreAggregateExpressions extends QueryTest with 
BeforeAndAfterAll {
          | """.stripMargin)
     checkExistence(sql("DESCRIBE FORMATTED mainTable_agg5"), true, 
"maintable_column_0_count")
   }
+
   test("test pre agg table selection with expression 1") {
     val df = sql("select name as NewName, count(age) as sum from mainTable 
group by name order by name")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
   }
 
-
   test("test pre agg table selection with expression 2") {
     val df = sql("select name as NewName, sum(case when age=35 then id else 0 
end) as sum from mainTable group by name order by name")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
@@ -112,6 +113,7 @@ class TestPreAggregateExpressions extends QueryTest with 
BeforeAndAfterAll {
 
   test("test pre agg table selection with expression 3") {
     val df = sql("select sum(case when age=35 then id else 0 end) from 
maintable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
     checkAnswer(df, Seq(Row(6.0)))
   }
 
@@ -129,10 +131,9 @@ class TestPreAggregateExpressions extends QueryTest with 
BeforeAndAfterAll {
 
   /**
    * Below method will be used to validate the table name is present in the 
plan or not
-   * @param plan
-   * query plan
-   * @param actualTableName
-   * table name to be validated
+   *
+   * @param plan            query plan
+   * @param actualTableName table name to be validated
    */
   def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit 
={
     var isValidPlan = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 9ab6db8..959da7e 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -18,40 +18,68 @@
 package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil4Test
-import org.scalatest.{BeforeAndAfterAll, Ignore}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
-
-import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.spark.util.SparkQueryTest
 
-class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
+class TestPreAggregateLoad extends SparkQueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach{
 
   val testData = s"$resourcesPath/sample.csv"
+  val p1 = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+      CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
 
   override def beforeAll(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+
     SparkUtil4Test.createTaskMockUp(sqlContext)
     sql("DROP TABLE IF EXISTS maintable")
   }
 
-  private def createAllAggregateTables(parentTableName: String): Unit = {
+  override protected def afterAll(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+        CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, 
p1)
+    sql("DROP TABLE IF EXISTS y ")
+    sql("DROP TABLE IF EXISTS maintable")
+    sql("DROP TABLE IF EXISTS maintbl")
+    sql("DROP TABLE IF EXISTS main_table")
+  }
+
+  override protected def beforeEach(): Unit = {
+    sql("DROP TABLE IF EXISTS main_table")
+    sql("DROP TABLE IF EXISTS segmaintable")
+  }
+
+  private def createAllAggregateTables(parentTableName: String, columnName: 
String = "age"): Unit = {
     sql(
-      s"""create datamap preagg_sum on table $parentTableName using 
'preaggregate' as select id,sum(age) from $parentTableName group by id"""
-        .stripMargin)
+      s"""
+         | create datamap preagg_sum
+         | on table $parentTableName
+         | using 'preaggregate'
+         | as select id,sum($columnName)
+         | from $parentTableName
+         | group by id
+       """.stripMargin)
     sql(
-      s"""create datamap preagg_avg on table $parentTableName using 
'preaggregate' as select id,avg(age) from $parentTableName group by id"""
+      s"""create datamap preagg_avg on table $parentTableName using 
'preaggregate' as select id,avg($columnName) from $parentTableName group by 
id"""
         .stripMargin)
     sql(
-      s"""create datamap preagg_count on table $parentTableName using 
'preaggregate' as select id,count(age) from $parentTableName group by id"""
+      s"""create datamap preagg_count on table $parentTableName using 
'preaggregate' as select id,count($columnName) from $parentTableName group by 
id"""
         .stripMargin)
     sql(
-      s"""create datamap preagg_min on table $parentTableName using 
'preaggregate' as select id,min(age) from $parentTableName group by id"""
+      s"""create datamap preagg_min on table $parentTableName using 
'preaggregate' as select id,min($columnName) from $parentTableName group by 
id"""
         .stripMargin)
     sql(
-      s"""create datamap preagg_max on table $parentTableName using 
'preaggregate' as select id,max(age) from $parentTableName group by id"""
+      s"""create datamap preagg_max on table $parentTableName using 
'preaggregate' as select id,max($columnName) from $parentTableName group by 
id"""
         .stripMargin)
   }
 
@@ -264,24 +292,25 @@ class TestPreAggregateLoad extends QueryTest with 
BeforeAndAfterAll {
     sql(
       s"""create datamap preagg_sum on table maintable using 'preaggregate' as 
select id, sum(age) from maintable group by id,name"""
         .stripMargin)
+    sql("reset")
     checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52, "xyz"))
   }
+
 test("check load and select for avg double datatype") {
   sql("drop table if exists maintbl ")
   sql("create table maintbl(year int,month int,name string,salary double) 
stored by 'carbondata' 
tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')")
   sql("insert into maintbl select 10,11,'babu',12.89")
   sql("insert into maintbl select 10,11,'babu',12.89")
-  sql("create datamap maintbl_douoble on table maintbl using 'preaggregate' as 
select name,avg(salary) from maintbl group by name")
+  sql("create datamap maintbl_double on table maintbl using 'preaggregate' as 
select name,avg(salary) from maintbl group by name")
   checkAnswer(sql("select name,avg(salary) from maintbl group by name"), 
Row("babu", 12.89))
 }
 
-
   test("check load and select for avg int datatype") {
     sql("drop table if exists maintbl ")
     sql("create table maintbl(year int,month int,name string,salary int) 
stored by 'carbondata' 
tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')")
     sql("insert into maintbl select 10,11,'babu',12")
     sql("insert into maintbl select 10,11,'babu',12")
-    sql("create datamap maintbl_douoble on table maintbl using 'preaggregate' 
as select name,avg(salary) from maintbl group by name")
+    sql("create datamap maintbl_double on table maintbl using 'preaggregate' 
as select name,avg(salary) from maintbl group by name")
     checkAnswer(sql("select name,avg(salary) from maintbl group by name"), 
Row("babu", 12.0))
   }
 
@@ -290,7 +319,7 @@ test("check load and select for avg double datatype") {
     sql("create table maintbl(year int,month int,name string,salary bigint) 
stored by 'carbondata' 
tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')")
     sql("insert into maintbl select 10,11,'babu',12")
     sql("insert into maintbl select 10,11,'babu',12")
-    sql("create datamap maintbl_douoble on table maintbl using 'preaggregate' 
as select name,avg(salary) from maintbl group by name")
+    sql("create datamap maintbl_double on table maintbl using 'preaggregate' 
as select name,avg(salary) from maintbl group by name")
     checkAnswer(sql("select name,avg(salary) from maintbl group by name"), 
Row("babu", 12.0))
   }
 
@@ -299,7 +328,7 @@ test("check load and select for avg double datatype") {
     sql("create table maintbl(year int,month int,name string,salary short) 
stored by 'carbondata' 
tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')")
     sql("insert into maintbl select 10,11,'babu',12")
     sql("insert into maintbl select 10,11,'babu',12")
-    sql("create datamap maintbl_douoble on table maintbl using 'preaggregate' 
as select name,avg(salary) from maintbl group by name")
+    sql("create datamap maintbl_double on table maintbl using 'preaggregate' 
as select name,avg(salary) from maintbl group by name")
     checkAnswer(sql("select name,avg(salary) from maintbl group by name"), 
Row("babu", 12.0))
   }
 
@@ -309,7 +338,7 @@ test("check load and select for avg double datatype") {
     sql("insert into maintbl select 10,11,'babu',12")
     sql("insert into maintbl select 10,11,'babu',12")
     val rows = sql("select name,avg(salary) from maintbl group by 
name").collect()
-    sql("create datamap maintbl_douoble on table maintbl using 'preaggregate' 
as select name,avg(salary) from maintbl group by name")
+    sql("create datamap maintbl_double on table maintbl using 'preaggregate' 
as select name,avg(salary) from maintbl group by name")
     checkAnswer(sql("select name,avg(salary) from maintbl group by name"), 
rows)
   }
 
@@ -415,8 +444,524 @@ test("check load and select for avg double datatype") {
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
     val rows = sql("select age,avg(age) from maintable group by age").collect()
-    sql("create datamap maintbl_douoble on table maintable using 
'preaggregate' as select avg(age) from maintable group by age")
+    sql("create datamap maintbl_double on table maintable using 'preaggregate' 
as select avg(age) from maintable group by age")
     checkAnswer(sql("select age,avg(age) from maintable group by age"), rows)
+    sql("drop table if exists maintable ")
+  }
+
+  test("test load into main table with pre-aggregate table: string") {
+    sql(
+      """
+        | CREATE TABLE main_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age STRING)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    createAllAggregateTables("main_table")
+
+    sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
+    checkAnswer(sql(s"SELECT * FROM main_table_preagg_sum"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    checkAnswer(sql(s"SELECT * FROM main_table_preagg_avg"),
+      Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55, 2)))
+    checkAnswer(sql(s"SELECT * FROM main_table_preagg_count"),
+      Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+    checkAnswer(sql(s"SELECT * FROM main_table_preagg_min"),
+      Seq(Row(1, "31"), Row(2, "27"), Row(3, "35"), Row(4, "26")))
+    checkAnswer(sql(s"SELECT * FROM main_table_preagg_max"),
+      Seq(Row(1, "31"), Row(2, "27"), Row(3, "35"), Row(4, "29")))
+
+    // check select and match or not match pre-aggregate table
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_sum")
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_avg", "main_table")
+
+    checkPreAggTable(sql("SELECT id, AVG(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_avg")
+    checkPreAggTable(sql("SELECT id, AVG(age) from main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT id, COUNT(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_count")
+    checkPreAggTable(sql("SELECT id, COUNT(age) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT id, MIN(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_min")
+    checkPreAggTable(sql("SELECT id, MIN(age) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT id, MAX(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_max")
+    checkPreAggTable(sql("SELECT id, MAX(age) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    // sub query should match pre-aggregate table
+    checkPreAggTable(sql("SELECT SUM(age) FROM main_table"),
+      true, "main_table_preagg_sum")
+    checkPreAggTable(sql("SELECT SUM(age) FROM main_table"),
+      false, "main_table_preagg_avg", "main_table")
+
+    checkPreAggTable(sql("SELECT AVG(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_avg")
+    checkPreAggTable(sql("SELECT AVG(age) from main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT COUNT(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_count")
+    checkPreAggTable(sql("SELECT COUNT(age) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT MIN(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_min")
+    checkPreAggTable(sql("SELECT MIN(age) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT MAX(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_max")
+    checkPreAggTable(sql("SELECT MAX(age) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+  }
+
+  test("test load into main table with pre-aggregate table: sum string 
column") {
+    sql(
+      """
+        | CREATE TABLE main_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age STRING)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    createAllAggregateTables("main_table", "name")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
+
+    checkAnswer(sql(s"SELECT * FROM main_table_preagg_sum"),
+      Seq(Row(1, null), Row(2, null), Row(3, null), Row(4, null)))
+    checkAnswer(sql(s"SELECT * FROM main_table_preagg_avg"),
+      Seq(Row(1, null, 0), Row(2, null, 0), Row(3, null, 0), Row(4, null, 0)))
+    checkAnswer(sql(s"SELECT * FROM main_table_preagg_count"),
+      Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+    checkAnswer(sql(s"SELECT * FROM main_table_preagg_min"),
+      Seq(Row(1, "david"), Row(2, "eason"), Row(3, "jarry"), Row(4, "kunal")))
+    checkAnswer(sql(s"SELECT * FROM main_table_preagg_max"),
+      Seq(Row(1, "david"), Row(2, "eason"), Row(3, "jarry"), Row(4, "vishal")))
+
+    // check select and match or not match pre-aggregate table
+    checkPreAggTable(sql("SELECT id, SUM(name) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_sum")
+    checkPreAggTable(sql("SELECT id, SUM(name) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_avg", "main_table")
+
+    checkPreAggTable(sql("SELECT id, AVG(name) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_avg")
+    checkPreAggTable(sql("SELECT id, AVG(name) from main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT id, COUNT(name) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_count")
+    checkPreAggTable(sql("SELECT id, COUNT(name) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT id, MIN(name) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_min")
+    checkPreAggTable(sql("SELECT id, MIN(name) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT id, MAX(name) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_max")
+    checkPreAggTable(sql("SELECT id, MAX(name) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    // sub query should match pre-aggregate table
+    checkPreAggTable(sql("SELECT SUM(name) FROM main_table"),
+      true, "main_table_preagg_sum")
+    checkPreAggTable(sql("SELECT SUM(name) FROM main_table"),
+      false, "main_table_preagg_avg", "main_table")
+
+    checkPreAggTable(sql("SELECT AVG(name) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_avg")
+    checkPreAggTable(sql("SELECT AVG(name) from main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT COUNT(name) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_count")
+    checkPreAggTable(sql("SELECT COUNT(name) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT MIN(name) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_min")
+    checkPreAggTable(sql("SELECT MIN(name) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+
+    checkPreAggTable(sql("SELECT MAX(name) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_max")
+    checkPreAggTable(sql("SELECT MAX(name) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum", "main_table")
+  }
+
+  test("test whether all segments are loaded into pre-aggregate table if 
segments are set on main table 2") {
+    sql("DROP TABLE IF EXISTS segmaintable")
+    sql(
+      """
+        | CREATE TABLE segmaintable(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    sql("set carbon.input.segments.default.segmaintable=0")
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 26)))
+    sql(
+      s"""
+         | CREATE DATAMAP preagg_sum
+         | ON TABLE segmaintable
+         | USING 'preaggregate'
+         | AS SELECT id, SUM(age)
+         | FROM segmaintable
+         | GROUP BY id
+       """.stripMargin)
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      false, "segmaintable_preagg_sum")
+
+    sql("reset")
+    checkAnswer(sql("SELECT * FROM segmaintable_preagg_sum"), Seq(Row(1, 26)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      true, "segmaintable_preagg_sum")
+  }
+
+  test("test whether all segments are loaded into pre-aggregate table if 
segments are set on main table 3") {
+    sql("DROP TABLE IF EXISTS segmaintable")
+    sql(
+      """
+        | CREATE TABLE segmaintable(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    sql("set carbon.input.segments.default.segmaintable=0")
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 26)))
+    sql(
+      s"""
+         | CREATE DATAMAP preagg_sum
+         | ON TABLE segmaintable
+         | USING 'preaggregate'
+         | AS SELECT id, SUM(age)
+         | FROM segmaintable
+         | GROUP BY id
+       """.stripMargin)
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    sql("reset")
+    checkAnswer(sql("SELECT * FROM segmaintable_preagg_sum"), Seq(Row(1, 26), 
Row(1, 26)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      true, "segmaintable_preagg_sum")
+  }
+
+  test("test whether all segments are loaded into pre-aggregate table if 
segments are set on main table 4") {
+    sql("DROP TABLE IF EXISTS segmaintable")
+    sql(
+      """
+        | CREATE TABLE segmaintable(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    //  check value before set segments
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 52)))
+
+    sql("set carbon.input.segments.default.segmaintable=0")
+    //  check value after set segments
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 26)))
+
+    sql(
+      s"""
+         | CREATE DATAMAP preagg_sum
+         | ON TABLE segmaintable
+         | USING 'preaggregate'
+         | AS SELECT id, SUM(age)
+         | FROM segmaintable
+         | GROUP BY id
+       """.stripMargin)
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    checkAnswer(sql("SELECT * FROM segmaintable_preagg_sum"), Seq(Row(1, 52), 
Row(1, 26)))
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 26)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      false, "segmaintable_preagg_sum")
+
+    // reset
+    sql("reset")
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 78)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      true, "segmaintable_preagg_sum")
+  }
+
+  test("test whether all segments are loaded into pre-aggregate table: auto 
merge and input segment") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("reset")
+    sql("DROP TABLE IF EXISTS segmaintable")
+    sql(
+      """
+        | CREATE TABLE segmaintable(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql("set carbon.input.segments.default.segmaintable=0")
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    //  check value before auto merge
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 26)))
+
+
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    //  check value after set segments and auto merge
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq.empty)
+
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      false, "segmaintable_preagg_sum")
+
+    sql(
+      s"""
+         | CREATE DATAMAP preagg_sum
+         | ON TABLE segmaintable
+         | USING 'preaggregate'
+         | AS SELECT id, SUM(age)
+         | FROM segmaintable
+         | GROUP BY id
+       """.stripMargin)
+
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    // reset
+    sql("reset")
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 130)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      true, "segmaintable_preagg_sum")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+  }
+
+  //TODO: need to check and fix
+  ignore("test whether all segments are loaded into pre-aggregate table: auto 
merge and no input segment") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("reset")
+    sql("DROP TABLE IF EXISTS segmaintable")
+    sql(
+      """
+        | CREATE TABLE segmaintable(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    sql(
+      s"""
+         | CREATE DATAMAP preagg_sum
+         | ON TABLE segmaintable
+         | USING 'preaggregate'
+         | AS SELECT id, SUM(age)
+         | FROM segmaintable
+         | GROUP BY id
+       """.stripMargin)
+
+    //  check value before auto merge
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 78)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      true, "segmaintable_preagg_sum")
+
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    //  check value after auto merge
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      true, "segmaintable_preagg_sum")
+
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 130)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      true, "segmaintable_preagg_sum")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+  }
+
+  test("test whether all segments are loaded into pre-aggregate table: create 
after auto merge and no input segment") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("reset")
+    sql("DROP TABLE IF EXISTS segmaintable")
+    sql(
+      """
+        | CREATE TABLE segmaintable(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(
+      s"""
+         | CREATE DATAMAP preagg_sum
+         | ON TABLE segmaintable
+         | USING 'preaggregate'
+         | AS SELECT id, SUM(age)
+         | FROM segmaintable
+         | GROUP BY id
+       """.stripMargin)
+
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 130)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      true, "segmaintable_preagg_sum")
+
+    sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)")
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      Seq(Row(1, 156)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"),
+      true, "segmaintable_preagg_sum")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+  }
+
+  //TODO: need to check and fix
+  ignore("test whether all segments are loaded into pre-aggregate table: 
mixed, load, auto merge and input segment") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("reset")
+    sql("DROP TABLE IF EXISTS main_table")
+    sql(
+      """
+        | CREATE TABLE main_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
+
+    createAllAggregateTables("main_table", "age")
+    sql("set carbon.input.segments.default.main_table=0")
+
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
+      false, "main_table_preagg_sum")
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"),
+      Seq(Row(1, 26)))
+
+    sql("reset")
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_sum")
+
+    sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
+
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"),
+      Seq(Row(1, 171), Row(2, 81), Row(3, 210), Row(4, 165)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_sum")
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+  }
+
+  //TODO: need to check and fix
+  ignore("test whether all segments are loaded into pre-aggregate table: auto 
merge and check pre-aggregate segment") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("reset")
+    sql("DROP TABLE IF EXISTS main_table")
+    sql(
+      """
+        | CREATE TABLE main_table(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    sql(
+      s"""
+         | CREATE DATAMAP preagg_sum
+         | ON TABLE main_table
+         | USING 'preaggregate'
+         | AS SELECT id, SUM(age)
+         | FROM main_table
+         | GROUP BY id
+       """.stripMargin)
+
+    sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
+
+    checkExistence(sql("show segments for table main_table_preagg_sum"), 
false, "Compacted")
+    sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)")
+
+    // check the data whether auto merge
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"),
+      Seq(Row(1, 109), Row(2, 27), Row(3, 70), Row(4, 55)))
+    checkExistence(sql("show segments for table main_table_preagg_sum"), true, 
"Compacted")
+
+    sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table")
+
+    checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"),
+      Seq(Row(1, 171), Row(2, 81), Row(3, 210), Row(4, 165)))
+    checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"),
+      true, "main_table_preagg_sum")
+
+    checkExistence(sql("show segments for table main_table_preagg_sum"), true, 
"Compacted")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
index 02314d7..8241288 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala
@@ -30,6 +30,7 @@ class TestPreAggregateMisc extends QueryTest with 
BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table mainTable")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table mainTable")
   }
+
   test("test PreAggregate With Set Segments property") {
     sql("create datamap agg1 on table mainTable using 'preaggregate' as select 
name,sum(age) from mainTable group by name")
     sql("SET carbon.input.segments.default.mainTable=0")
@@ -40,6 +41,7 @@ class TestPreAggregateMisc extends QueryTest with 
BeforeAndAfterAll {
     sql("drop datamap agg1 on table mainTable")
 
   }
+
   test("check preagg tbl properties sort columns inherit from main tbl") {
     sql("drop table if exists y ")
     sql(
@@ -71,8 +73,8 @@ class TestPreAggregateMisc extends QueryTest with 
BeforeAndAfterAll {
     assert(sortcolummatch && sortscopematch && blocksizematch)
   }
 
-
   override def afterAll: Unit = {
-    sql("drop table if exists mainTable")
+    sql("DROP TABLE IF EXISTS mainTable")
+    sql("DROP TABLE IF EXISTS y")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index f1a6092..8b98f17 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -20,12 +20,14 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
-import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
+import org.apache.carbondata.spark.util.SparkQueryTest
 
-class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
+class TestPreAggregateTableSelection extends SparkQueryTest with 
BeforeAndAfterAll {
+
+  val timeSeries = TIMESERIES.toString
 
   override def beforeAll: Unit = {
     sql("drop table if exists mainTable")
@@ -56,12 +58,6 @@ class TestPreAggregateTableSelection extends QueryTest with 
BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table mainTableavg")
   }
 
-  test("test sum and avg on same column should give proper results") {
-    val df = sql("select name, sum(id), avg(id) from maintable group by name")
-    checkAnswer(df, Seq(Row("david",1,1.0), Row("jarry",6,3.0), 
Row("kunal",4,4.0), Row("eason",2,2.0), Row("vishal",4,4.0)))
-  }
-
-
   test("test PreAggregate table selection 1") {
     val df = sql("select name from mainTable group by name")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
@@ -156,7 +152,14 @@ class TestPreAggregateTableSelection extends QueryTest 
with BeforeAndAfterAll {
     val df = sql("select count(id) from mainTable")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
   }
-  
+
+  test("test PreAggregate table selection 19: test sum and avg on same column 
should give proper results") {
+    val df = sql("select name, sum(id), avg(id) from maintable group by name")
+    checkAnswer(df, Seq(Row("david",1,1.0), Row("jarry",6,3.0), 
Row("kunal",4,4.0), Row("eason",2,2.0), Row("vishal",4,4.0)))
+    checkPreAggTable(df, false, "maintable_agg5", "maintable_agg1")
+    checkPreAggTable(df, true, "maintable_agg8")
+  }
+
   test("test PreAggregate table selection 20") {
     val df = sql("select name from mainTable group by name order by name")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
@@ -231,34 +234,12 @@ class TestPreAggregateTableSelection extends QueryTest 
with BeforeAndAfterAll {
     }
   }
 
-  test("test if pre-agg table is hit with filter condition") {
-    sql("drop table if exists filtertable")
-    sql("CREATE TABLE filtertable(id int, name string, city string, age 
string) STORED BY" +
-        " 'org.apache.carbondata.format' 
TBLPROPERTIES('dictionary_include'='name,age')")
-    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table filtertable")
-    sql("create datamap agg9 on table filtertable using 'preaggregate' as 
select name, age, sum(age) from filtertable group by name, age")
-    val df = sql("select name, sum(age) from filtertable where age = '29' 
group by name, age")
-    preAggTableValidator(df.queryExecution.analyzed, "filtertable_agg9")
-    checkAnswer(df, Row("vishal", 29))
-  }
 
   test("test PreAggregate table selection 29") {
     val df = sql("select sum(id) from mainTable group by name")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg2")
   }
 
-  test("test pre-agg table with group by condition") {
-    sql("drop table if exists grouptable")
-    sql("CREATE TABLE grouptable(id int, name string, city string, age string) 
STORED BY" +
-        " 'org.apache.carbondata.format' 
TBLPROPERTIES('dictionary_include'='name,age')")
-    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table grouptable")
-    sql(
-      "create datamap agg9 on table grouptable using 'preaggregate' as select 
sum(id) from grouptable group by city")
-    val df = sql("select sum(id) from grouptable group by city")
-    preAggTableValidator(df.queryExecution.analyzed, "grouptable_agg9")
-    checkAnswer(df, Seq(Row(3), Row(3), Row(4), Row(7)))
-  }
-
   test("test PreAggregate table selection 30") {
     val df = sql("select a.name from mainTable a group by a.name")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
@@ -274,14 +255,66 @@ class TestPreAggregateTableSelection extends QueryTest 
with BeforeAndAfterAll {
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
   }
 
- test("Test query with math operation hitting fact table") {
+ test("test PreAggregate table selection 33: Test query with math operation 
hitting fact table") {
     val df =  sql("select sum(id)+count(id) from maintable")
     preAggTableValidator(df.queryExecution.analyzed, "maintable")
   }
 
-  val timeSeries = TIMESERIES.toString
+  test("test PreAggregate table selection 34: test if pre-agg table is hit 
with filter condition") {
+    sql("DROP TABLE IF EXISTS filtertable")
+    sql(
+      """
+        | CREATE TABLE filtertable(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age STRING)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('dictionary_include'='name,age')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table filtertable")
+    sql(
+      """
+        | CREATE DATAMAP agg9
+        | ON TABLE filtertable
+        | USING 'preaggregate'
+        | AS SELECT name, age, SUM(age)
+        |     FROM filtertable
+        |     GROUP BY name, age
+      """.stripMargin)
+    val df = sql("SELECT name, SUM(age) FROM filtertable WHERE age = '29' 
GROUP BY name, age")
+    preAggTableValidator(df.queryExecution.analyzed, "filtertable_agg9")
+    checkAnswer(df, Row("vishal", 29))
+  }
 
-test("test PreAggregate table selection with timeseries and normal together") {
+  test("test PreAggregate table selection 35: test pre-agg table with group by 
condition") {
+    sql("DROP TABLE IF EXISTS grouptable")
+    sql(
+      """
+        | CREATE TABLE grouptable(
+        |     id INT,
+        |     name STRING,
+        |     city STRING,
+        |     age STRING)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('dictionary_include'='name,age')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table grouptable")
+    sql(
+      """
+        | CREATE DATAMAP agg9
+        | ON TABLE grouptable
+        | USING 'preaggregate'
+        | AS SELECT SUM(id)
+        |     FROM grouptable
+        |     GROUP BY city
+      """.stripMargin)
+    val df = sql("SELECT SUM(id) FROM grouptable GROUP BY city")
+    preAggTableValidator(df.queryExecution.analyzed, "grouptable_agg9")
+    checkAnswer(df, Seq(Row(3), Row(3), Row(4), Row(7)))
+  }
+
+  test("test PreAggregate table selection 36: test PreAggregate table 
selection with timeseries and normal together") {
     sql("drop table if exists maintabletime")
     sql(
       "create table maintabletime(year int,month int,name string,salary 
int,dob timestamp) stored" +
@@ -323,7 +356,7 @@ test("test PreAggregate table selection with timeseries and 
normal together") {
     sql("select var_samp(name) from maintabletime  where name='Mikka' ")
   }
 
-  test("test PreAggregate table selection For Sum And Avg in aggregate table 
with bigint") {
+  test("test PreAggregate table selection 38: for sum and avg in aggregate 
table with bigint") {
     val df = sql("select avg(age) from mainTableavg")
     preAggTableValidator(df.queryExecution.analyzed, "mainTableavg_agg0")
   }
@@ -341,7 +374,6 @@ test("test PreAggregate table selection with timeseries and 
normal together") {
     checkAnswer(df, Seq(Row(10,10.0)))
   }
 
-
   override def afterAll: Unit = {
     sql("drop table if exists mainTable")
     sql("drop table if exists mainTable_avg")
@@ -349,6 +381,8 @@ test("test PreAggregate table selection with timeseries and 
normal together") {
     sql("DROP TABLE IF EXISTS maintabletime")
     sql("DROP TABLE IF EXISTS maintabledict")
     sql("DROP TABLE IF EXISTS mainTableavg")
+    sql("DROP TABLE IF EXISTS filtertable")
+    sql("DROP TABLE IF EXISTS grouptable")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala
index 0f87803..af12be9 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala
@@ -26,7 +26,8 @@ import org.scalatest.BeforeAndAfterAll
 class TestPreAggregateWithSubQuery extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll: Unit = {
-    sql("drop table if exists mainTable")
+    sql("DROP TABLE IF EXISTS mainTable")
+    sql("DROP TABLE IF EXISTS mainTable1")
     sql("CREATE TABLE mainTable(id int, name string, city string, age string) 
STORED BY 'org.apache.carbondata.format'")
     sql("CREATE TABLE mainTable1(id int, name string, city string, age string) 
STORED BY 'org.apache.carbondata.format'")
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select 
name,sum(age) from mainTable group by name")
@@ -35,7 +36,19 @@ class TestPreAggregateWithSubQuery extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test sub query PreAggregate table selection 1") {
-    val df = sql("select t2.newnewname as newname from mainTable1 t1 join 
(select name as newnewname,sum(age) as sum from mainTable group by name )t2 on 
t1.name=t2.newnewname group by t2.newnewname")
+    val df = sql(
+      """
+        | SELECT t2.newnewname AS newname
+        | FROM mainTable1 t1
+        | JOIN (
+        |     select
+        |       name AS newnewname,
+        |       sum(age) AS sum
+        |     FROM mainTable
+        |     GROUP BY name ) t2
+        | ON t1.name = t2.newnewname
+        | GROUP BY t2.newnewname
+      """.stripMargin)
     matchTable(collectLogicalRelation(df.queryExecution.analyzed), 
"maintable_agg0")
   }
 
@@ -82,7 +95,8 @@ class TestPreAggregateWithSubQuery extends QueryTest with 
BeforeAndAfterAll {
   }
 
   override def afterAll: Unit = {
-    sql("drop table if exists mainTable")
+    sql("DROP TABLE IF EXISTS mainTable")
+    sql("DROP TABLE IF EXISTS mainTable1")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 3c2fd71..4860b32 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -18,18 +18,15 @@ package org.apache.carbondata.spark.testsuite.allqueries
 
 import java.io.File
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.spark.sql.test.util.QueryTest
-
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.CarbonProperties
 
 class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   var timeStampPropOrig: String = _

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/util/SparkQueryTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/util/SparkQueryTest.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/util/SparkQueryTest.scala
new file mode 100644
index 0000000..bad300b
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/util/SparkQueryTest.scala
@@ -0,0 +1,50 @@
+package org.apache.carbondata.spark.util
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, DataFrame}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.test.util.QueryTest
+
+class SparkQueryTest extends QueryTest {
+
+  /**
+   * check whether the pre-aggregate tables are in DataFrame
+   *
+   * @param df DataFrame
+   * @param exists whether the preAggTableNames exists
+   * @param preAggTableNames preAggTableNames
+   */
+  def checkPreAggTable(df: DataFrame, exists: Boolean, preAggTableNames: 
String*): Unit = {
+    val plan = df.queryExecution.analyzed
+    for (preAggTableName <- preAggTableNames) {
+      var isValidPlan = false
+      plan.transform {
+        // first check if any preaTable1 scala function is applied it is 
present is in plan
+        // then call is from create preaTable1regate table class so no need to 
transform the query plan
+        case ca: CarbonRelation =>
+          if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+            val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation]
+            if 
(relation.carbonTable.getTableName.equalsIgnoreCase(preAggTableName)) {
+              isValidPlan = true
+            }
+          }
+          ca
+        case logicalRelation: LogicalRelation =>
+          if 
(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+            val relation = 
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+            if 
(relation.carbonTable.getTableName.equalsIgnoreCase(preAggTableName)) {
+              isValidPlan = true
+            }
+          }
+          logicalRelation
+      }
+
+      if (exists != isValidPlan) {
+        assert(false)
+      } else {
+        assert(true)
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index b52d0e7..f4490ec 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -70,7 +70,7 @@ object PreAggregateUtil {
   /**
    * Below method will be used to validate the select plan
    * and get the required fields from select plan
-   * Currently only aggregate query is support any other type of query will 
fail
+   * Currently only aggregate query is support, any other type of query will 
fail
    *
    * @param plan
    * @param selectStmt

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index d2ffac7..e8374e3 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -63,7 +63,7 @@ case class AggExpToColumnMappingModel(
  * 1. Check plan is valid plan for updating the parent table plan with child 
table
  * 2. Updated the plan based on child schema
  *
- * Rules for Upadating the plan
+ * Rules for Updating the plan
  * 1. Grouping expression rules
  *    1.1 Change the parent attribute reference for of group expression
  * to child attribute reference

Reply via email to