Repository: carbondata Updated Branches: refs/heads/master 0c200d834 -> e43be5e74
[CARBONDATA-2073][CARBONDATA-1516][Tests] Add test cases for pre-aggregate datamap This closes #1857 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e43be5e7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e43be5e7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e43be5e7 Branch: refs/heads/master Commit: e43be5e74f1aec4a258d0bcca9ea87893249ffb9 Parents: 0c200d8 Author: xubo245 <[email protected]> Authored: Thu Feb 8 17:42:34 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Fri Mar 30 11:11:10 2018 +0800 ---------------------------------------------------------------------- .../preaggregate/TestPreAggCreateCommand.scala | 123 +++- .../TestPreAggregateCompaction.scala | 3 + .../preaggregate/TestPreAggregateDrop.scala | 50 +- .../TestPreAggregateExpressions.scala | 11 +- .../preaggregate/TestPreAggregateLoad.scala | 585 ++++++++++++++++++- .../preaggregate/TestPreAggregateMisc.scala | 6 +- .../TestPreAggregateTableSelection.scala | 106 ++-- .../TestPreAggregateWithSubQuery.scala | 20 +- .../InsertIntoCarbonTableTestCase.scala | 7 +- .../carbondata/spark/util/SparkQueryTest.scala | 50 ++ .../preaaggregate/PreAggregateUtil.scala | 2 +- .../sql/hive/CarbonPreAggregateRules.scala | 2 +- 12 files changed, 869 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index 8e499ba..e546fe8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -35,6 +35,8 @@ import org.apache.carbondata.core.util.CarbonProperties class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { + val timeSeries = TIMESERIES.toString + override def beforeAll { sql("drop database if exists otherDB cascade") sql("drop table if exists PreAggMain") @@ -55,9 +57,12 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { } test("test pre agg create table 2") { + dropDataMaps("PreAggMain", "preagg2") sql("create datamap preagg2 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) as udfsum from PreAggMain group by a") checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, "preaggmain_a") checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, "preaggmain_b_sum") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), false, "preaggmain_a1") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), false, "preaggmain_udfsum") sql("drop datamap preagg2 on table PreAggMain") } @@ -69,10 +74,22 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { sql("drop datamap preagg11 on table PreAggMain1") } + test("test pre agg create table 6") { + sql("create datamap preagg12 on table PreAggMain1 using 'preaggregate' as select a as a1,sum(b) as sum from PreAggMain1 group by a") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "preaggmain1_a") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "preaggmain1_b_sum") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), false, "preaggmain1_a1") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), false, "preaggmain1_sum") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "DICTIONARY") + sql("drop datamap preagg12 on table PreAggMain1") + } + test("test pre agg create table 8") { sql("create datamap preagg14 on table PreAggMain1 using 'preaggregate' as select a as a1,sum(b) as sum from PreAggMain1 group by a") checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, "preaggmain1_a") checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, "preaggmain1_b_sum") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), false, "preaggmain1_a1") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), false, "preaggmain1_sum") checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, "DICTIONARY") sql("drop datamap preagg14 on table PreAggMain1") } @@ -82,6 +99,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), true, "preaggmain_a") checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), true, "preaggmain_b_sum") checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), true, "preaggmain_b_count") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg15"), false, "preaggmain2_b_avg") sql("drop datamap preagg15 on table PreAggMain") } @@ -132,7 +150,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { assert(exception.getMessage.equals("Distinct is not supported On Pre Aggregation")) } - test("test pre agg create table 15") { + test("test pre agg create table 15: don't support where") { intercept[Exception] { sql( s""" @@ -149,7 +167,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { test("test pre agg create table 16") { sql("create datamap agg0 on table mainTable using 'preaggregate' as select column4, sum(column4) from maintable group by column4") val df = sql("select * from maintable_agg0") - val carbontable = getCarbontable(df.queryExecution.analyzed) + val carbontable = getCarbonTable(df.queryExecution.analyzed) assert(carbontable.getAllMeasures.size()==2) assert(carbontable.getAllDimensions.size()==0) sql("drop datamap agg0 on table maintable") @@ -158,7 +176,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { test("test pre agg create table 17") { sql("create datamap agg0 on table mainTable using 'preaggregate' as select column1, sum(column1),column6, sum(column6) from maintable group by column6,column1") val df = sql("select * from maintable_agg0") - val carbontable = getCarbontable(df.queryExecution.analyzed) + val carbontable = getCarbonTable(df.queryExecution.analyzed) assert(carbontable.getAllMeasures.size()==2) assert(carbontable.getAllDimensions.size()==2) carbontable.getAllDimensions.asScala.foreach{ f => @@ -170,7 +188,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { test("test pre agg create table 18") { sql("create datamap agg0 on table mainTable using 'preaggregate' as select column1, count(column1),column6, count(column6) from maintable group by column6,column1") val df = sql("select * from maintable_agg0") - val carbontable = getCarbontable(df.queryExecution.analyzed) + val carbontable = getCarbonTable(df.queryExecution.analyzed) assert(carbontable.getAllMeasures.size()==2) assert(carbontable.getAllDimensions.size()==2) carbontable.getAllDimensions.asScala.foreach{ f => @@ -182,7 +200,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { test("test pre agg create table 19") { sql("create datamap agg0 on table mainTable using 'preaggregate' as select column3, sum(column3),column5, sum(column5) from maintable group by column3,column5") val df = sql("select * from maintable_agg0") - val carbontable = getCarbontable(df.queryExecution.analyzed) + val carbontable = getCarbonTable(df.queryExecution.analyzed) assert(carbontable.getAllMeasures.size()==2) assert(carbontable.getAllDimensions.size()==2) carbontable.getAllDimensions.asScala.foreach{ f => @@ -194,7 +212,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { test("test pre agg create table 20") { sql("create datamap agg0 on table mainTable using 'preaggregate' as select column3, sum(column3),column5, sum(column5) from maintable group by column3,column5,column2") val df = sql("select * from maintable_agg0") - val carbontable = getCarbontable(df.queryExecution.analyzed) + val carbontable = getCarbonTable(df.queryExecution.analyzed) assert(carbontable.getAllMeasures.size()==2) assert(carbontable.getAllDimensions.size()==3) carbontable.getAllDimensions.asScala.foreach{ f => @@ -203,7 +221,6 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { sql("drop datamap agg0 on table maintable") } - val timeSeries = TIMESERIES.toString test("remove agg tables from show table command") { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,"false") sql("DROP TABLE IF EXISTS tbl_1") @@ -219,7 +236,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT) } - test("test pre agg create table 21: create with preaggregate and hierarchy") { + test("test pre agg create table 22: create with preaggregate and granularity") { sql("DROP TABLE IF EXISTS maintabletime") sql( """ @@ -271,19 +288,18 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable") } - test("remove agg tables from show table command") { - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,"false") + test("test pre agg create table 24: remove agg tables from show table command") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS, "false") sql("DROP TABLE IF EXISTS tbl_1") sql("create table if not exists tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ") sql("create datamap agg1 on table tbl_1 using 'preaggregate' as select mac, sum(age) from tbl_1 group by mac") sql("create table if not exists sparktable(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) ") checkExistence(sql("show tables"), false, "tbl_1_agg1") - checkExistence(sql("show tables"), true, "sparktable","tbl_1") + checkExistence(sql("show tables"), true, "sparktable", "tbl_1") CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS, CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT) } - - test("remove TimeSeries agg tables from show table command") { + test("test pre agg create table 25: remove TimeSeries agg tables from show table command") { sql("DROP TABLE IF EXISTS tbl_1") sql("create table if not exists tbl_1(imei string,age int,mac string ,prodate timestamp,update timestamp,gamepoint double,contrid double) stored by 'carbondata' ") sql( @@ -307,6 +323,25 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable") } + test("test pre agg create table 22: don't support create datamap if exists'") { + val e: Exception = intercept[AnalysisException] { + sql( + """ + | CREATE DATAMAP IF EXISTS agg0 ON TABLE mainTable + | USING 'preaggregate' + | AS SELECT + | column3, + | sum(column3), + | column5, + | sum(column5) + | FROM maintable + | GROUP BY column3,column5,column2 + """.stripMargin) + assert(true) + } + assert(e.getMessage.contains("identifier matching regex")) + sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable") + } test("test show tables filtered with datamaps") { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,"false") @@ -336,7 +371,65 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { sql("use default") } - def getCarbontable(plan: LogicalPlan) : CarbonTable ={ + // TODO: to be confirmed + test("test pre agg create table 26") { + sql("drop datamap if exists preagg2 on table PreAggMain") + sql("create datamap preagg2 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) from PreAggMain group by a") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, "preaggmain_a") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, "preaggmain_b_sum") + checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), false, "preaggmain_a1") + intercept[Exception] { + sql("select a1 from PreAggMain_preagg2").show() + } + sql("drop datamap if exists preagg2 on table PreAggMain") + } + + test("test pre agg create table 27: select * and no group by") { + intercept[Exception] { + sql( + """ + | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable + | USING 'preaggregate' + | AS SELECT * FROM maintable + """.stripMargin) + } + } + + // TODO : to be confirmed + test("test pre agg create table 28: select *") { + intercept[Exception] { + sql( + """ + | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable + | USING 'preaggregate' + | AS SELECT * FROM maintable + | group by a + """.stripMargin) + } + } + + test("test pre agg create table 29") { + intercept[Exception] { + sql( + s""" + | create datamap preagg21 on table PreAggMain2 + | using 'preaggregate' + | as select a as a1,sum(b) + | from PreAggMain2 + | where a>'vishal' + | group by a + """.stripMargin) + } + } + + test("test pre agg create table 30: DESCRIBE FORMATTED") { + dropDataMaps("PreAggMain", "preagg2") + intercept[Exception] { + sql("DESCRIBE FORMATTED PreAggMain_preagg2").show() + } + } + + def getCarbonTable(plan: LogicalPlan) : CarbonTable ={ var carbonTable : CarbonTable = null plan.transform { // first check if any preaTable1 scala function is applied it is present is in plan @@ -368,5 +461,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { sql("drop table if exists maintabletime") sql("drop table if exists showTables") sql("drop table if exists Preagg_twodb") + sql("DROP TABLE IF EXISTS tbl_1") + sql("DROP TABLE IF EXISTS sparktable") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala index 89cf8eb..42209b3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala @@ -154,6 +154,9 @@ class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString) segmentNamesSum should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0")) checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult) + val mainTableSegment = sql("SHOW SEGMENTS FOR TABLE maintable") + val SegmentSequenceIds = mainTableSegment.collect().map { each => (each.toSeq) (0) } + assert(!SegmentSequenceIds.contains("0.1")) } test("test if minor/major compaction is successful for pre-agg table") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala index a96a19d..8757c76 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException +import org.apache.carbondata.spark.exception.ProcessMetaDataException class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { @@ -34,7 +35,7 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { "create datamap preagg1 on table maintable using 'preaggregate' as select" + " a,sum(b) from maintable group by a") sql("drop datamap if exists preagg1 on table maintable") - checkExistence(sql("show tables"), false, "maintable_preagg1") + checkExistence(sql("SHOW DATAMAP ON TABLE maintable"), false, "maintable_preagg1") } test("dropping 1 aggregate table should not drop others") { @@ -65,13 +66,11 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { " a,sum(c) from maintable1 group by a") sql("drop datamap preagg_same on table maintable") - var showTables = sql("show tables") - val showdatamaps =sql("show datamap on table maintable1") - checkExistence(showTables, false, "maintable_preagg_same") - checkExistence(showdatamaps, true, "maintable1_preagg_same") + val showDataMaps = sql("SHOW DATAMAP ON TABLE maintable1") + checkExistence(showDataMaps, false, "maintable_preagg_same") + checkExistence(showDataMaps, true, "maintable1_preagg_same") sql("drop datamap preagg_same on table maintable1") - showTables = sql("show tables") - checkExistence(showTables, false, "maintable1_preagg_same") + checkExistence(sql("SHOW DATAMAP ON TABLE maintable1"), false, "maintable1_preagg_same") sql("drop table if exists maintable1") } @@ -80,8 +79,7 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { " a,sum(c) from maintable group by a") sql("drop datamap preagg_same1 on table maintable") - var showTables = sql("show tables") - checkExistence(showTables, false, "maintable_preagg_same1") + checkExistence(sql("SHOW DATAMAP ON TABLE maintable"), false, "maintable_preagg_same1") sql("create datamap preagg_same1 on table maintable using 'preaggregate' as select" + " a,sum(c) from maintable group by a") val showDatamaps =sql("show datamap on table maintable") @@ -97,6 +95,40 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { checkExistence(sql("show tables"), false, "maintable_preagg1", "maintable", "maintable_preagg2") } + test("drop datamap with 'if exists' when datamap not exists") { + sql("DROP TABLE IF EXISTS maintable") + sql("CREATE TABLE maintable (a STRING, b STRING, c STRING) STORED BY 'carbondata'") + sql("DROP DATAMAP IF EXISTS not_exists_datamap ON TABLE maintable") + checkExistence(sql("DESCRIBE FORMATTED maintable"), false, "not_exists_datamap") + } + + test("drop datamap without 'if exists' when datamap not exists") { + sql("DROP TABLE IF EXISTS maintable") + sql("CREATE TABLE maintable (a STRING, b STRING, c STRING) STORED BY 'carbondata'") + sql("DROP DATAMAP IF EXISTS not_exists_datamap ON TABLE maintable") + val e = intercept[NoSuchDataMapException] { + sql("DROP DATAMAP not_exists_datamap ON TABLE maintable") + } + assert(e.getMessage.equals( + "Datamap with name not_exists_datamap does not exist under table maintable")) + } + + test("drop datamap without 'if exists' when main table not exists") { + sql("DROP TABLE IF EXISTS maintable") + val e = intercept[ProcessMetaDataException] { + sql("DROP DATAMAP preagg3 ON TABLE maintable") + } + assert(e.getMessage.contains("Table or view 'maintable' not found in")) + } + + test("drop datamap with 'if exists' when main table not exists") { + sql("DROP TABLE IF EXISTS maintable") + val e = intercept[ProcessMetaDataException] { + sql("DROP DATAMAP IF EXISTS preagg3 ON TABLE maintable") + } + assert(e.getMessage.contains("Table or view 'maintable' not found in")) + } + override def afterAll() { sql("drop table if exists maintable") sql("drop table if exists maintable1") http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala index 0b22c56..b3b71a6 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala @@ -31,6 +31,7 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") } + test("test pre agg create table with expression 1") { sql( s""" @@ -99,12 +100,12 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { | """.stripMargin) checkExistence(sql("DESCRIBE FORMATTED mainTable_agg5"), true, "maintable_column_0_count") } + test("test pre agg table selection with expression 1") { val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name") preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") } - test("test pre agg table selection with expression 2") { val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name") preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1") @@ -112,6 +113,7 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { test("test pre agg table selection with expression 3") { val df = sql("select sum(case when age=35 then id else 0 end) from maintable") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1") checkAnswer(df, Seq(Row(6.0))) } @@ -129,10 +131,9 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { /** * Below method will be used to validate the table name is present in the plan or not - * @param plan - * query plan - * @param actualTableName - * table name to be validated + * + * @param plan query plan + * @param actualTableName table name to be validated */ def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={ var isValidPlan = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala index 9ab6db8..959da7e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala @@ -18,40 +18,68 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate import org.apache.spark.sql.Row -import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.util.SparkUtil4Test -import org.scalatest.{BeforeAndAfterAll, Ignore} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException - -import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException +import org.apache.carbondata.spark.util.SparkQueryTest -class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll { +class TestPreAggregateLoad extends SparkQueryTest with BeforeAndAfterAll with BeforeAndAfterEach{ val testData = s"$resourcesPath/sample.csv" + val p1 = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, + CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD) override def beforeAll(): Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") + .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, + CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD) + SparkUtil4Test.createTaskMockUp(sqlContext) sql("DROP TABLE IF EXISTS maintable") } - private def createAllAggregateTables(parentTableName: String): Unit = { + override protected def afterAll(): Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, + CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE) + .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, p1) + sql("DROP TABLE IF EXISTS y ") + sql("DROP TABLE IF EXISTS maintable") + sql("DROP TABLE IF EXISTS maintbl") + sql("DROP TABLE IF EXISTS main_table") + } + + override protected def beforeEach(): Unit = { + sql("DROP TABLE IF EXISTS main_table") + sql("DROP TABLE IF EXISTS segmaintable") + } + + private def createAllAggregateTables(parentTableName: String, columnName: String = "age"): Unit = { sql( - s"""create datamap preagg_sum on table $parentTableName using 'preaggregate' as select id,sum(age) from $parentTableName group by id""" - .stripMargin) + s""" + | create datamap preagg_sum + | on table $parentTableName + | using 'preaggregate' + | as select id,sum($columnName) + | from $parentTableName + | group by id + """.stripMargin) sql( - s"""create datamap preagg_avg on table $parentTableName using 'preaggregate' as select id,avg(age) from $parentTableName group by id""" + s"""create datamap preagg_avg on table $parentTableName using 'preaggregate' as select id,avg($columnName) from $parentTableName group by id""" .stripMargin) sql( - s"""create datamap preagg_count on table $parentTableName using 'preaggregate' as select id,count(age) from $parentTableName group by id""" + s"""create datamap preagg_count on table $parentTableName using 'preaggregate' as select id,count($columnName) from $parentTableName group by id""" .stripMargin) sql( - s"""create datamap preagg_min on table $parentTableName using 'preaggregate' as select id,min(age) from $parentTableName group by id""" + s"""create datamap preagg_min on table $parentTableName using 'preaggregate' as select id,min($columnName) from $parentTableName group by id""" .stripMargin) sql( - s"""create datamap preagg_max on table $parentTableName using 'preaggregate' as select id,max(age) from $parentTableName group by id""" + s"""create datamap preagg_max on table $parentTableName using 'preaggregate' as select id,max($columnName) from $parentTableName group by id""" .stripMargin) } @@ -264,24 +292,25 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll { sql( s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id,name""" .stripMargin) + sql("reset") checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52, "xyz")) } + test("check load and select for avg double datatype") { sql("drop table if exists maintbl ") sql("create table maintbl(year int,month int,name string,salary double) stored by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')") sql("insert into maintbl select 10,11,'babu',12.89") sql("insert into maintbl select 10,11,'babu',12.89") - sql("create datamap maintbl_douoble on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name") + sql("create datamap maintbl_double on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name") checkAnswer(sql("select name,avg(salary) from maintbl group by name"), Row("babu", 12.89)) } - test("check load and select for avg int datatype") { sql("drop table if exists maintbl ") sql("create table maintbl(year int,month int,name string,salary int) stored by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')") sql("insert into maintbl select 10,11,'babu',12") sql("insert into maintbl select 10,11,'babu',12") - sql("create datamap maintbl_douoble on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name") + sql("create datamap maintbl_double on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name") checkAnswer(sql("select name,avg(salary) from maintbl group by name"), Row("babu", 12.0)) } @@ -290,7 +319,7 @@ test("check load and select for avg double datatype") { sql("create table maintbl(year int,month int,name string,salary bigint) stored by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')") sql("insert into maintbl select 10,11,'babu',12") sql("insert into maintbl select 10,11,'babu',12") - sql("create datamap maintbl_douoble on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name") + sql("create datamap maintbl_double on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name") checkAnswer(sql("select name,avg(salary) from maintbl group by name"), Row("babu", 12.0)) } @@ -299,7 +328,7 @@ test("check load and select for avg double datatype") { sql("create table maintbl(year int,month int,name string,salary short) stored by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')") sql("insert into maintbl select 10,11,'babu',12") sql("insert into maintbl select 10,11,'babu',12") - sql("create datamap maintbl_douoble on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name") + sql("create datamap maintbl_double on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name") checkAnswer(sql("select name,avg(salary) from maintbl group by name"), Row("babu", 12.0)) } @@ -309,7 +338,7 @@ test("check load and select for avg double datatype") { sql("insert into maintbl select 10,11,'babu',12") sql("insert into maintbl select 10,11,'babu',12") val rows = sql("select name,avg(salary) from maintbl group by name").collect() - sql("create datamap maintbl_douoble on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name") + sql("create datamap maintbl_double on table maintbl using 'preaggregate' as select name,avg(salary) from maintbl group by name") checkAnswer(sql("select name,avg(salary) from maintbl group by name"), rows) } @@ -415,8 +444,524 @@ test("check load and select for avg double datatype") { sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") val rows = sql("select age,avg(age) from maintable group by age").collect() - sql("create datamap maintbl_douoble on table maintable using 'preaggregate' as select avg(age) from maintable group by age") + sql("create datamap maintbl_double on table maintable using 'preaggregate' as select avg(age) from maintable group by age") checkAnswer(sql("select age,avg(age) from maintable group by age"), rows) + sql("drop table if exists maintable ") + } + + test("test load into main table with pre-aggregate table: string") { + sql( + """ + | CREATE TABLE main_table( + | id INT, + | name STRING, + | city STRING, + | age STRING) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + createAllAggregateTables("main_table") + + sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table") + checkAnswer(sql(s"SELECT * FROM main_table_preagg_sum"), + Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + checkAnswer(sql(s"SELECT * FROM main_table_preagg_avg"), + Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55, 2))) + checkAnswer(sql(s"SELECT * FROM main_table_preagg_count"), + Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2))) + checkAnswer(sql(s"SELECT * FROM main_table_preagg_min"), + Seq(Row(1, "31"), Row(2, "27"), Row(3, "35"), Row(4, "26"))) + checkAnswer(sql(s"SELECT * FROM main_table_preagg_max"), + Seq(Row(1, "31"), Row(2, "27"), Row(3, "35"), Row(4, "29"))) + + // check select and match or not match pre-aggregate table + checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_sum") + checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"), + false, "main_table_preagg_avg", "main_table") + + checkPreAggTable(sql("SELECT id, AVG(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_avg") + checkPreAggTable(sql("SELECT id, AVG(age) from main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT id, COUNT(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_count") + checkPreAggTable(sql("SELECT id, COUNT(age) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT id, MIN(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_min") + checkPreAggTable(sql("SELECT id, MIN(age) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT id, MAX(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_max") + checkPreAggTable(sql("SELECT id, MAX(age) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + // sub query should match pre-aggregate table + checkPreAggTable(sql("SELECT SUM(age) FROM main_table"), + true, "main_table_preagg_sum") + checkPreAggTable(sql("SELECT SUM(age) FROM main_table"), + false, "main_table_preagg_avg", "main_table") + + checkPreAggTable(sql("SELECT AVG(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_avg") + checkPreAggTable(sql("SELECT AVG(age) from main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT COUNT(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_count") + checkPreAggTable(sql("SELECT COUNT(age) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT MIN(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_min") + checkPreAggTable(sql("SELECT MIN(age) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT MAX(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_max") + checkPreAggTable(sql("SELECT MAX(age) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + } + + test("test load into main table with pre-aggregate table: sum string column") { + sql( + """ + | CREATE TABLE main_table( + | id INT, + | name STRING, + | city STRING, + | age STRING) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + createAllAggregateTables("main_table", "name") + sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table") + + checkAnswer(sql(s"SELECT * FROM main_table_preagg_sum"), + Seq(Row(1, null), Row(2, null), Row(3, null), Row(4, null))) + checkAnswer(sql(s"SELECT * FROM main_table_preagg_avg"), + Seq(Row(1, null, 0), Row(2, null, 0), Row(3, null, 0), Row(4, null, 0))) + checkAnswer(sql(s"SELECT * FROM main_table_preagg_count"), + Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2))) + checkAnswer(sql(s"SELECT * FROM main_table_preagg_min"), + Seq(Row(1, "david"), Row(2, "eason"), Row(3, "jarry"), Row(4, "kunal"))) + checkAnswer(sql(s"SELECT * FROM main_table_preagg_max"), + Seq(Row(1, "david"), Row(2, "eason"), Row(3, "jarry"), Row(4, "vishal"))) + + // check select and match or not match pre-aggregate table + checkPreAggTable(sql("SELECT id, SUM(name) FROM main_table GROUP BY id"), + true, "main_table_preagg_sum") + checkPreAggTable(sql("SELECT id, SUM(name) FROM main_table GROUP BY id"), + false, "main_table_preagg_avg", "main_table") + + checkPreAggTable(sql("SELECT id, AVG(name) FROM main_table GROUP BY id"), + true, "main_table_preagg_avg") + checkPreAggTable(sql("SELECT id, AVG(name) from main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT id, COUNT(name) FROM main_table GROUP BY id"), + true, "main_table_preagg_count") + checkPreAggTable(sql("SELECT id, COUNT(name) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT id, MIN(name) FROM main_table GROUP BY id"), + true, "main_table_preagg_min") + checkPreAggTable(sql("SELECT id, MIN(name) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT id, MAX(name) FROM main_table GROUP BY id"), + true, "main_table_preagg_max") + checkPreAggTable(sql("SELECT id, MAX(name) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + // sub query should match pre-aggregate table + checkPreAggTable(sql("SELECT SUM(name) FROM main_table"), + true, "main_table_preagg_sum") + checkPreAggTable(sql("SELECT SUM(name) FROM main_table"), + false, "main_table_preagg_avg", "main_table") + + checkPreAggTable(sql("SELECT AVG(name) FROM main_table GROUP BY id"), + true, "main_table_preagg_avg") + checkPreAggTable(sql("SELECT AVG(name) from main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT COUNT(name) FROM main_table GROUP BY id"), + true, "main_table_preagg_count") + checkPreAggTable(sql("SELECT COUNT(name) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT MIN(name) FROM main_table GROUP BY id"), + true, "main_table_preagg_min") + checkPreAggTable(sql("SELECT MIN(name) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + + checkPreAggTable(sql("SELECT MAX(name) FROM main_table GROUP BY id"), + true, "main_table_preagg_max") + checkPreAggTable(sql("SELECT MAX(name) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum", "main_table") + } + + test("test whether all segments are loaded into pre-aggregate table if segments are set on main table 2") { + sql("DROP TABLE IF EXISTS segmaintable") + sql( + """ + | CREATE TABLE segmaintable( + | id INT, + | name STRING, + | city STRING, + | age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + sql("set carbon.input.segments.default.segmaintable=0") + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 26))) + sql( + s""" + | CREATE DATAMAP preagg_sum + | ON TABLE segmaintable + | USING 'preaggregate' + | AS SELECT id, SUM(age) + | FROM segmaintable + | GROUP BY id + """.stripMargin) + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + false, "segmaintable_preagg_sum") + + sql("reset") + checkAnswer(sql("SELECT * FROM segmaintable_preagg_sum"), Seq(Row(1, 26))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + true, "segmaintable_preagg_sum") + } + + test("test whether all segments are loaded into pre-aggregate table if segments are set on main table 3") { + sql("DROP TABLE IF EXISTS segmaintable") + sql( + """ + | CREATE TABLE segmaintable( + | id INT, + | name STRING, + | city STRING, + | age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + sql("set carbon.input.segments.default.segmaintable=0") + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 26))) + sql( + s""" + | CREATE DATAMAP preagg_sum + | ON TABLE segmaintable + | USING 'preaggregate' + | AS SELECT id, SUM(age) + | FROM segmaintable + | GROUP BY id + """.stripMargin) + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + sql("reset") + checkAnswer(sql("SELECT * FROM segmaintable_preagg_sum"), Seq(Row(1, 26), Row(1, 26))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + true, "segmaintable_preagg_sum") + } + + test("test whether all segments are loaded into pre-aggregate table if segments are set on main table 4") { + sql("DROP TABLE IF EXISTS segmaintable") + sql( + """ + | CREATE TABLE segmaintable( + | id INT, + | name STRING, + | city STRING, + | age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + // check value before set segments + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 52))) + + sql("set carbon.input.segments.default.segmaintable=0") + // check value after set segments + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 26))) + + sql( + s""" + | CREATE DATAMAP preagg_sum + | ON TABLE segmaintable + | USING 'preaggregate' + | AS SELECT id, SUM(age) + | FROM segmaintable + | GROUP BY id + """.stripMargin) + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + checkAnswer(sql("SELECT * FROM segmaintable_preagg_sum"), Seq(Row(1, 52), Row(1, 26))) + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 26))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + false, "segmaintable_preagg_sum") + + // reset + sql("reset") + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 78))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + true, "segmaintable_preagg_sum") + } + + test("test whether all segments are loaded into pre-aggregate table: auto merge and input segment") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + sql("reset") + sql("DROP TABLE IF EXISTS segmaintable") + sql( + """ + | CREATE TABLE segmaintable( + | id INT, + | name STRING, + | city STRING, + | age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + sql("set carbon.input.segments.default.segmaintable=0") + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + // check value before auto merge + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 26))) + + + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + // check value after set segments and auto merge + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq.empty) + + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + false, "segmaintable_preagg_sum") + + sql( + s""" + | CREATE DATAMAP preagg_sum + | ON TABLE segmaintable + | USING 'preaggregate' + | AS SELECT id, SUM(age) + | FROM segmaintable + | GROUP BY id + """.stripMargin) + + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + // reset + sql("reset") + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 130))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + true, "segmaintable_preagg_sum") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") + } + + //TODO: need to check and fix + ignore("test whether all segments are loaded into pre-aggregate table: auto merge and no input segment") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + sql("reset") + sql("DROP TABLE IF EXISTS segmaintable") + sql( + """ + | CREATE TABLE segmaintable( + | id INT, + | name STRING, + | city STRING, + | age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + sql( + s""" + | CREATE DATAMAP preagg_sum + | ON TABLE segmaintable + | USING 'preaggregate' + | AS SELECT id, SUM(age) + | FROM segmaintable + | GROUP BY id + """.stripMargin) + + // check value before auto merge + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 78))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + true, "segmaintable_preagg_sum") + + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + // check value after auto merge + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + true, "segmaintable_preagg_sum") + + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 130))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + true, "segmaintable_preagg_sum") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") + } + + test("test whether all segments are loaded into pre-aggregate table: create after auto merge and no input segment") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + sql("reset") + sql("DROP TABLE IF EXISTS segmaintable") + sql( + """ + | CREATE TABLE segmaintable( + | id INT, + | name STRING, + | city STRING, + | age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + sql( + s""" + | CREATE DATAMAP preagg_sum + | ON TABLE segmaintable + | USING 'preaggregate' + | AS SELECT id, SUM(age) + | FROM segmaintable + | GROUP BY id + """.stripMargin) + + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 130))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + true, "segmaintable_preagg_sum") + + sql(s"INSERT INTO segmaintable VALUES(1, 'xyz', 'bengaluru', 26)") + checkAnswer(sql(s"SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + Seq(Row(1, 156))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM segmaintable GROUP BY id"), + true, "segmaintable_preagg_sum") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") + } + + //TODO: need to check and fix + ignore("test whether all segments are loaded into pre-aggregate table: mixed, load, auto merge and input segment") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + sql("reset") + sql("DROP TABLE IF EXISTS main_table") + sql( + """ + | CREATE TABLE main_table( + | id INT, + | name STRING, + | city STRING, + | age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)") + + sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table") + + createAllAggregateTables("main_table", "age") + sql("set carbon.input.segments.default.main_table=0") + + checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"), + false, "main_table_preagg_sum") + checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"), + Seq(Row(1, 26))) + + sql("reset") + checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_sum") + + sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table") + sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table") + + checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"), + Seq(Row(1, 171), Row(2, 81), Row(3, 210), Row(4, 165))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_sum") + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") + } + + //TODO: need to check and fix + ignore("test whether all segments are loaded into pre-aggregate table: auto merge and check pre-aggregate segment") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + sql("reset") + sql("DROP TABLE IF EXISTS main_table") + sql( + """ + | CREATE TABLE main_table( + | id INT, + | name STRING, + | city STRING, + | age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)") + + sql( + s""" + | CREATE DATAMAP preagg_sum + | ON TABLE main_table + | USING 'preaggregate' + | AS SELECT id, SUM(age) + | FROM main_table + | GROUP BY id + """.stripMargin) + + sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)") + sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table") + + checkExistence(sql("show segments for table main_table_preagg_sum"), false, "Compacted") + sql(s"INSERT INTO main_table VALUES(1, 'xyz', 'bengaluru', 26)") + + // check the data whether auto merge + checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"), + Seq(Row(1, 109), Row(2, 27), Row(3, 70), Row(4, 55))) + checkExistence(sql("show segments for table main_table_preagg_sum"), true, "Compacted") + + sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table") + sql(s"LOAD DATA LOCAL INPATH '$testData' INTO TABLE main_table") + + checkAnswer(sql(s"SELECT id, SUM(age) FROM main_table GROUP BY id"), + Seq(Row(1, 171), Row(2, 81), Row(3, 210), Row(4, 165))) + checkPreAggTable(sql("SELECT id, SUM(age) FROM main_table GROUP BY id"), + true, "main_table_preagg_sum") + + checkExistence(sql("show segments for table main_table_preagg_sum"), true, "Compacted") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala index 02314d7..8241288 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala @@ -30,6 +30,7 @@ class TestPreAggregateMisc extends QueryTest with BeforeAndAfterAll { sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") } + test("test PreAggregate With Set Segments property") { sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name") sql("SET carbon.input.segments.default.mainTable=0") @@ -40,6 +41,7 @@ class TestPreAggregateMisc extends QueryTest with BeforeAndAfterAll { sql("drop datamap agg1 on table mainTable") } + test("check preagg tbl properties sort columns inherit from main tbl") { sql("drop table if exists y ") sql( @@ -71,8 +73,8 @@ class TestPreAggregateMisc extends QueryTest with BeforeAndAfterAll { assert(sortcolummatch && sortscopematch && blocksizematch) } - override def afterAll: Unit = { - sql("drop table if exists mainTable") + sql("DROP TABLE IF EXISTS mainTable") + sql("DROP TABLE IF EXISTS y") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index f1a6092..8b98f17 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -20,12 +20,14 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row} -import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES +import org.apache.carbondata.spark.util.SparkQueryTest -class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { +class TestPreAggregateTableSelection extends SparkQueryTest with BeforeAndAfterAll { + + val timeSeries = TIMESERIES.toString override def beforeAll: Unit = { sql("drop table if exists mainTable") @@ -56,12 +58,6 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTableavg") } - test("test sum and avg on same column should give proper results") { - val df = sql("select name, sum(id), avg(id) from maintable group by name") - checkAnswer(df, Seq(Row("david",1,1.0), Row("jarry",6,3.0), Row("kunal",4,4.0), Row("eason",2,2.0), Row("vishal",4,4.0))) - } - - test("test PreAggregate table selection 1") { val df = sql("select name from mainTable group by name") preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") @@ -156,7 +152,14 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { val df = sql("select count(id) from mainTable") preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3") } - + + test("test PreAggregate table selection 19: test sum and avg on same column should give proper results") { + val df = sql("select name, sum(id), avg(id) from maintable group by name") + checkAnswer(df, Seq(Row("david",1,1.0), Row("jarry",6,3.0), Row("kunal",4,4.0), Row("eason",2,2.0), Row("vishal",4,4.0))) + checkPreAggTable(df, false, "maintable_agg5", "maintable_agg1") + checkPreAggTable(df, true, "maintable_agg8") + } + test("test PreAggregate table selection 20") { val df = sql("select name from mainTable group by name order by name") preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") @@ -231,34 +234,12 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { } } - test("test if pre-agg table is hit with filter condition") { - sql("drop table if exists filtertable") - sql("CREATE TABLE filtertable(id int, name string, city string, age string) STORED BY" + - " 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='name,age')") - sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table filtertable") - sql("create datamap agg9 on table filtertable using 'preaggregate' as select name, age, sum(age) from filtertable group by name, age") - val df = sql("select name, sum(age) from filtertable where age = '29' group by name, age") - preAggTableValidator(df.queryExecution.analyzed, "filtertable_agg9") - checkAnswer(df, Row("vishal", 29)) - } test("test PreAggregate table selection 29") { val df = sql("select sum(id) from mainTable group by name") preAggTableValidator(df.queryExecution.analyzed, "maintable_agg2") } - test("test pre-agg table with group by condition") { - sql("drop table if exists grouptable") - sql("CREATE TABLE grouptable(id int, name string, city string, age string) STORED BY" + - " 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='name,age')") - sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table grouptable") - sql( - "create datamap agg9 on table grouptable using 'preaggregate' as select sum(id) from grouptable group by city") - val df = sql("select sum(id) from grouptable group by city") - preAggTableValidator(df.queryExecution.analyzed, "grouptable_agg9") - checkAnswer(df, Seq(Row(3), Row(3), Row(4), Row(7))) - } - test("test PreAggregate table selection 30") { val df = sql("select a.name from mainTable a group by a.name") preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") @@ -274,14 +255,66 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") } - test("Test query with math operation hitting fact table") { + test("test PreAggregate table selection 33: Test query with math operation hitting fact table") { val df = sql("select sum(id)+count(id) from maintable") preAggTableValidator(df.queryExecution.analyzed, "maintable") } - val timeSeries = TIMESERIES.toString + test("test PreAggregate table selection 34: test if pre-agg table is hit with filter condition") { + sql("DROP TABLE IF EXISTS filtertable") + sql( + """ + | CREATE TABLE filtertable( + | id INT, + | name STRING, + | city STRING, + | age STRING) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('dictionary_include'='name,age') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table filtertable") + sql( + """ + | CREATE DATAMAP agg9 + | ON TABLE filtertable + | USING 'preaggregate' + | AS SELECT name, age, SUM(age) + | FROM filtertable + | GROUP BY name, age + """.stripMargin) + val df = sql("SELECT name, SUM(age) FROM filtertable WHERE age = '29' GROUP BY name, age") + preAggTableValidator(df.queryExecution.analyzed, "filtertable_agg9") + checkAnswer(df, Row("vishal", 29)) + } -test("test PreAggregate table selection with timeseries and normal together") { + test("test PreAggregate table selection 35: test pre-agg table with group by condition") { + sql("DROP TABLE IF EXISTS grouptable") + sql( + """ + | CREATE TABLE grouptable( + | id INT, + | name STRING, + | city STRING, + | age STRING) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('dictionary_include'='name,age') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table grouptable") + sql( + """ + | CREATE DATAMAP agg9 + | ON TABLE grouptable + | USING 'preaggregate' + | AS SELECT SUM(id) + | FROM grouptable + | GROUP BY city + """.stripMargin) + val df = sql("SELECT SUM(id) FROM grouptable GROUP BY city") + preAggTableValidator(df.queryExecution.analyzed, "grouptable_agg9") + checkAnswer(df, Seq(Row(3), Row(3), Row(4), Row(7))) + } + + test("test PreAggregate table selection 36: test PreAggregate table selection with timeseries and normal together") { sql("drop table if exists maintabletime") sql( "create table maintabletime(year int,month int,name string,salary int,dob timestamp) stored" + @@ -323,7 +356,7 @@ test("test PreAggregate table selection with timeseries and normal together") { sql("select var_samp(name) from maintabletime where name='Mikka' ") } - test("test PreAggregate table selection For Sum And Avg in aggregate table with bigint") { + test("test PreAggregate table selection 38: for sum and avg in aggregate table with bigint") { val df = sql("select avg(age) from mainTableavg") preAggTableValidator(df.queryExecution.analyzed, "mainTableavg_agg0") } @@ -341,7 +374,6 @@ test("test PreAggregate table selection with timeseries and normal together") { checkAnswer(df, Seq(Row(10,10.0))) } - override def afterAll: Unit = { sql("drop table if exists mainTable") sql("drop table if exists mainTable_avg") @@ -349,6 +381,8 @@ test("test PreAggregate table selection with timeseries and normal together") { sql("DROP TABLE IF EXISTS maintabletime") sql("DROP TABLE IF EXISTS maintabledict") sql("DROP TABLE IF EXISTS mainTableavg") + sql("DROP TABLE IF EXISTS filtertable") + sql("DROP TABLE IF EXISTS grouptable") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala index 0f87803..af12be9 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala @@ -26,7 +26,8 @@ import org.scalatest.BeforeAndAfterAll class TestPreAggregateWithSubQuery extends QueryTest with BeforeAndAfterAll { override def beforeAll: Unit = { - sql("drop table if exists mainTable") + sql("DROP TABLE IF EXISTS mainTable") + sql("DROP TABLE IF EXISTS mainTable1") sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") sql("CREATE TABLE mainTable1(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name") @@ -35,7 +36,19 @@ class TestPreAggregateWithSubQuery extends QueryTest with BeforeAndAfterAll { } test("test sub query PreAggregate table selection 1") { - val df = sql("select t2.newnewname as newname from mainTable1 t1 join (select name as newnewname,sum(age) as sum from mainTable group by name )t2 on t1.name=t2.newnewname group by t2.newnewname") + val df = sql( + """ + | SELECT t2.newnewname AS newname + | FROM mainTable1 t1 + | JOIN ( + | select + | name AS newnewname, + | sum(age) AS sum + | FROM mainTable + | GROUP BY name ) t2 + | ON t1.name = t2.newnewname + | GROUP BY t2.newnewname + """.stripMargin) matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0") } @@ -82,7 +95,8 @@ class TestPreAggregateWithSubQuery extends QueryTest with BeforeAndAfterAll { } override def afterAll: Unit = { - sql("drop table if exists mainTable") + sql("DROP TABLE IF EXISTS mainTable") + sql("DROP TABLE IF EXISTS mainTable1") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala index 3c2fd71..4860b32 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala @@ -18,18 +18,15 @@ package org.apache.carbondata.spark.testsuite.allqueries import java.io.File -import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.spark.sql.test.util.QueryTest - import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.CarbonMetadata -import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.CarbonProperties class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { var timeStampPropOrig: String = _ http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/util/SparkQueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/util/SparkQueryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/util/SparkQueryTest.scala new file mode 100644 index 0000000..bad300b --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/util/SparkQueryTest.scala @@ -0,0 +1,50 @@ +package org.apache.carbondata.spark.util + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, DataFrame} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.test.util.QueryTest + +class SparkQueryTest extends QueryTest { + + /** + * check whether the pre-aggregate tables are in DataFrame + * + * @param df DataFrame + * @param exists whether the preAggTableNames exists + * @param preAggTableNames preAggTableNames + */ + def checkPreAggTable(df: DataFrame, exists: Boolean, preAggTableNames: String*): Unit = { + val plan = df.queryExecution.analyzed + for (preAggTableName <- preAggTableNames) { + var isValidPlan = false + plan.transform { + // first check if any preaTable1 scala function is applied it is present is in plan + // then call is from create preaTable1regate table class so no need to transform the query plan + case ca: CarbonRelation => + if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) { + val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation] + if (relation.carbonTable.getTableName.equalsIgnoreCase(preAggTableName)) { + isValidPlan = true + } + } + ca + case logicalRelation: LogicalRelation => + if (logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) { + val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] + if (relation.carbonTable.getTableName.equalsIgnoreCase(preAggTableName)) { + isValidPlan = true + } + } + logicalRelation + } + + if (exists != isValidPlan) { + assert(false) + } else { + assert(true) + } + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index b52d0e7..f4490ec 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -70,7 +70,7 @@ object PreAggregateUtil { /** * Below method will be used to validate the select plan * and get the required fields from select plan - * Currently only aggregate query is support any other type of query will fail + * Currently only aggregate query is support, any other type of query will fail * * @param plan * @param selectStmt http://git-wip-us.apache.org/repos/asf/carbondata/blob/e43be5e7/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index d2ffac7..e8374e3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -63,7 +63,7 @@ case class AggExpToColumnMappingModel( * 1. Check plan is valid plan for updating the parent table plan with child table * 2. Updated the plan based on child schema * - * Rules for Upadating the plan + * Rules for Updating the plan * 1. Grouping expression rules * 1.1 Change the parent attribute reference for of group expression * to child attribute reference
