Repository: carbondata Updated Branches: refs/heads/master f5cdd5ca9 -> 67581cfe6
[Pre-Agg Test] Added SDV TestCase of preaggregate 1. Added test cases for pre-aggregate create, load and compaction. 2. Added test cases for time series in pre-aggregate. This closes #1981 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/67581cfe Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/67581cfe Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/67581cfe Branch: refs/heads/master Commit: 67581cfe6432ca1540916983ace24aa3645d8af4 Parents: f5cdd5c Author: Jatin <[email protected]> Authored: Thu Feb 15 20:25:20 2018 +0530 Committer: kunal642 <[email protected]> Committed: Wed Mar 21 12:06:42 2018 +0530 ---------------------------------------------------------------------- .../src/test/resources/testdatafileslist.txt | 3 +- .../sdv/generated/PreAggregateTestCase.scala | 216 +++++++++++++++++++ .../TimeSeriesPreAggregateTestCase.scala | 189 ++++++++++++++++ .../cluster/sdv/suite/SDVSuites.scala | 8 +- 4 files changed, 413 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/67581cfe/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt b/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt index 959ff6b..331a89d 100644 --- a/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt +++ b/integration/spark-common-cluster-test/src/test/resources/testdatafileslist.txt @@ -233,4 +233,5 @@ Data/emptyLoad.csv Data/splchar.csv source.csv Data/v1_version/metastore_db.zip -Data/v1_version/store.zip \ No newline at end of file +Data/v1_version/store.zip +Data/timeseriestest.csv \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/67581cfe/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala new file mode 100644 index 0000000..3148cf3 --- /dev/null +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PreAggregateTestCase.scala @@ -0,0 +1,216 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated + +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.{Include, QueryTest} +import org.junit.Assert +import org.scalatest.BeforeAndAfterEach + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + + +class PreAggregateTestCase extends QueryTest with BeforeAndAfterEach { + val csvPath = s"$resourcesPath/source.csv" + + override def beforeEach: Unit = { + sql("drop table if exists PreAggMain") + sql("drop table if exists AggMain") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") + sql("CREATE TABLE PreAggMain (id Int, date date, country string, phonetype string, " + + "serialname String,salary int ) STORED BY 'org.apache.carbondata.format' " + + "tblproperties('dictionary_include'='country')") + sql("CREATE TABLE AggMain (id Int, date date, country string, phonetype string, " + + "serialname String,salary int ) STORED BY 'org.apache.carbondata.format'" + + "tblproperties('dictionary_include'='country')") + sql("create datamap PreAggSum on table PreAggMain using 'preaggregate' as " + + "select country,sum(salary) as sum from PreAggMain group by country") + sql("create datamap PreAggAvg on table PreAggMain using 'preaggregate' as " + + "select country,avg(salary) as avg from PreAggMain group by country") + sql("create datamap PreAggCount on table PreAggMain using 'preaggregate' as " + + "select country,count(salary) as count from PreAggMain group by country") + sql("create datamap PreAggMin on table PreAggMain using 'preaggregate' as " + + "select country,min(salary) as min from PreAggMain group by country") + sql("create datamap PreAggMax on table PreAggMain using 'preaggregate' as " + + "select country,max(salary) as max from PreAggMain group by country") + } + + //test to check existence of datamap + test("PreAggregateTestCase_TC001", Include) { + Assert.assertEquals(sql("show datamap on table PreAggMain").count(), 5) + checkExistence(sql("Describe formatted PreAggMain_PreAggSum"), true, "DICTIONARY") + } + + //check for load data should reflects in all preaggregate tables + test("PreAggregateTestCase_TC002", Include) { + sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain ").collect + sql(s"LOAD DATA INPATH '$csvPath' into table AggMain ").collect + + val expectedSum = sql("select country,sum(salary) as sum from AggMain group by country") + checkAnswer(sql("select * from PreAggMain_PreAggSum"), expectedSum) + + val expectedAvg = sql("select country,sum(salary),count(salary) from AggMain group by country") + checkAnswer(sql("select * from PreAggMain_PreAggAvg"), expectedAvg) + + val expectedCount = sql("select country,count(salary) as count from AggMain group by country") + checkAnswer(sql("select * from PreAggMain_PreAggCount"), expectedCount) + + val expectedMin = sql("select country,min(salary) as min from AggMain group by country") + checkAnswer(sql("select * from PreAggMain_PreAggMin"), expectedMin) + + val expectedMax = sql("select country,max(salary) as max from AggMain group by country") + checkAnswer(sql("select * from PreAggMain_PreAggMax"), expectedMax) + } + + //test for incremental load + test("PreAggregateTestCase_TC003", Include) { + sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain").collect + sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain").collect + sql(s"LOAD DATA INPATH '$csvPath' into table AggMain").collect + + val expectedSum = sql("select country,sum(salary) as sum from " + + "AggMain group by country") + checkAnswer(sql("select * from PreAggMain_PreAggSum"), expectedSum.union(expectedSum)) + + val expectedAvg = sql("select country,sum(salary),count(salary) from AggMain group by country") + checkAnswer(sql("select * from PreAggMain_PreAggAvg"), expectedAvg.union(expectedAvg)) + + val expectedCount = sql("select country,count(salary) as count from AggMain group by country") + checkAnswer(sql("select * from PreAggMain_PreAggCount"), expectedCount.union(expectedCount)) + + val expectedMin = sql("select country,min(salary) as min from AggMain group by country") + checkAnswer(sql("select * from PreAggMain_PreAggMin"), expectedMin.union(expectedMin)) + + val expectedMax = sql("select country,max(salary) as max from AggMain group by country") + checkAnswer(sql("select * from PreAggMain_PreAggMax"), expectedMax.union(expectedMax)) + } + + //test for creating datamap having data from all segment after incremental load + test("PreAggregateTestCase_TC004", Include) { + sql("insert into PreAggMain values(1,'2015/7/23','country1','phone197','ASD69643',15000)") + sql("insert into PreAggMain values(2,'2015/8/23','country2','phone197','ASD69643',10000)") + sql("insert into PreAggMain values(3,'2005/7/28','country1','phone197','ASD69643',5000)") + sql("create datamap testDataMap on table PreAggMain using 'preaggregate' as " + + "select country,sum(salary) as sum from PreAggMain group by country") + checkAnswer(sql("select * from PreAggMain_testDataMap"), + Seq(Row("country1", 20000), Row("country2", 10000))) + } + + //test for insert overwrite in main table + test("PreAggregateTestCase_TC005", Include) { + sql("insert into PreAggMain values(1,'2015/7/23','country1','phone197','ASD696',15000)") + sql("insert into PreAggMain values(2,'2003/8/13','country2','phone197','AD6943',10000)") + sql("insert overwrite table PreAggMain values(3,'2005/7/28','country3','phone197','ASD69643',5000)") + sql("create datamap testDataMap on table PreAggMain using 'preaggregate' as " + + "select country,sum(salary) as sum from PreAggMain group by country") + checkAnswer(sql("select * from PreAggMain_testDataMap"), Seq(Row("country3", 5000))) + } + + // test output for join query with preaggregate and without preaggregate + test("PreAggregateTestCase_TC006", Include) { + sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain").collect + sql(s"LOAD DATA INPATH '$csvPath' into table AggMain").collect + val actual = sql("select t1.country,sum(id) from PreAggMain t1 join " + + "(select country as newcountry,sum(salary) as sum from PreAggMain group by " + + "country )t2 on t1.country=t2.newcountry group by country") + + val expected = sql("select t1.country,sum(id) from AggMain t1 join " + + "(select country as newcountry,sum(salary) as sum from AggMain group by " + + "country )t2 on t1.country=t2.newcountry group by country") + checkAnswer(actual, expected) + } + + test("PreAggregateTestCase_TC007", Include) { + sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain").collect + sql(s"LOAD DATA INPATH '$csvPath' into table AggMain").collect + val actual = sql("select t1.country,count(t1.country) from PreAggMain t1 join " + + " (select country,count(salary) as count from PreAggMain group by country )" + + "t2 on t1.country=t2.country group by t1.country") + + val expected = sql("select t1.country,count(t1.country) from AggMain t1 join " + + "(select country,count(salary) as count from AggMain group by country )t2 " + + "on t1.country=t2.country group by t1.country") + checkAnswer(actual, expected) + } + + //test to verify correct data in preaggregate table + test("PreAggregateTestCase_TC008", Include) { + sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain").collect + sql(s"LOAD DATA INPATH '$csvPath' into table AggMain").collect + sql("create datamap testDatamap on table PreAggMain using 'preaggregate' as " + + "select sum(CASE WHEN country='china' THEN id ELSE 0 END) as sum,country from " + + "PreAggMain group by country") + val actual = sql("select * from PreAggMain_testDatamap") + val expected = sql( + "select sum(CASE WHEN country='china' THEN id ELSE 0 END) as sum,country " + + "from AggMain group by country") + checkAnswer(actual, expected) + } + + //test for select using in clause in preaggregate table + test("PreAggregateTestCase_TC009", Include) { + sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain").collect + sql(s"LOAD DATA INPATH '$csvPath' into table AggMain").collect + sql("create datamap testDatamap on table PreAggMain using 'preaggregate' as " + + "select sum(CASE WHEN id in (10,11,12) THEN salary ELSE 0 END) as sum from PreAggMain " + + "group by country") + val actual = sql("select * from PreAggMain_testDatamap") + val expected = sql( + "select sum(CASE WHEN id in (10,11,12) THEN salary ELSE 0 END) as sum,country from AggMain " + + "group by country") + checkAnswer(actual, expected) + } + + //test to check data using preaggregate and without preaggregate + test("PreAggregateTestCase_TC010", Include) { + sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain").collect + sql(s"LOAD DATA INPATH '$csvPath' into table AggMain").collect + + val actual = sql( + "select count(CASE WHEN country='usa' THEN id ELSE 0 END) as count,country from PreAggMain " + + "group by country") + val expected = sql( + "select count(CASE WHEN country='usa' THEN id ELSE 0 END) as count,country from AggMain group" + + " by country") + checkAnswer(actual, expected) + } + + //test to check data using preaggregate and without preaggregate with in clause + test("PreAggregateTestCase_TC011", Include) { + sql(s"LOAD DATA INPATH '$csvPath' into table PreAggMain").collect + sql(s"LOAD DATA INPATH '$csvPath' into table AggMain").collect + sql("create datamap testDatamap on table PreAggMain using 'preaggregate' as " + + "select sum(CASE WHEN id in (12,13,14) THEN salary ELSE 0 END) as sum from PreAggMain " + + "group by country") + val actual = sql( + "select sum(CASE WHEN id in (12,13,14) THEN salary ELSE 0 END) as sum,country from " + + "PreAggMain group by country") + val expected = sql( + "select sum(CASE WHEN id in (12,13,14) THEN salary ELSE 0 END) as sum,country from AggMain " + + "group by country") + checkAnswer(actual, expected) + } + + override def afterEach: Unit = { + sql("drop table if exists mainTable") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/67581cfe/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/TimeSeriesPreAggregateTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/TimeSeriesPreAggregateTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/TimeSeriesPreAggregateTestCase.scala new file mode 100644 index 0000000..78ea7d4 --- /dev/null +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/TimeSeriesPreAggregateTestCase.scala @@ -0,0 +1,189 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers._ + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES +import org.apache.carbondata.core.util.CarbonProperties + +class TimeSeriesPreAggregateTestCase extends QueryTest with BeforeAndAfterEach { + + val timeSeries = TIMESERIES.toString + + val csvPath = s"$resourcesPath/Data/timeseriestest.csv" + override def beforeEach: Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + sql("drop table if exists mainTable") + sql( + "CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache" + + ".carbondata.format'") + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'SECOND_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_minute ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'minute_granularity'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_hour ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'HOUR_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_day ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'DAY_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_month ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'MONTH_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_year ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'year_granularity'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$csvPath' into table mainTable") + } + + test("TimeSeriesPreAggregateTestCase_001") { + val expected = sql("select cast(date_format(mytime, 'YYYY') as timestamp) as mytime,sum(age) " + + "from mainTable group by date_format(mytime , 'YYYY')") + val actual = sql("select * from maintable_agg0_year") + checkAnswer(actual, expected) + } + + test("TimeSeriesPreAggregateTestCase_002") { + val expected = sql( + "select cast(date_format(mytime, 'YYYY-MM') as timestamp) as mytime,sum(age) " + + "from mainTable group by date_format(mytime , 'YYYY-MM')") + val actual = sql("select * from maintable_agg0_month") + checkAnswer(actual, expected) + } + + test("TimeSeriesPreAggregateTestCase_003") { + val expected = sql( + "select cast(date_format(mytime, 'YYYY-MM-dd') as timestamp) as mytime,sum(age) " + + "from mainTable group by date_format(mytime , 'YYYY-MM-dd')") + val actual = sql("select * from maintable_agg0_day") + checkAnswer(actual, expected) + } + + test("TimeSeriesPreAggregateTestCase_004") { + val expected = sql( + "select cast(date_format(mytime, 'YYYY-MM-dd HH') as timestamp) as mytime,sum(age) " + + "from mainTable group by date_format(mytime , 'YYYY-MM-dd HH')") + val actual = sql("select * from maintable_agg0_hour") + checkAnswer(actual, expected) + } + + test("TimeSeriesPreAggregateTestCase_005") { + val expected = sql( + "select cast(date_format(mytime, 'YYYY-MM-dd HH:mm') as timestamp) as mytime,sum(age) " + + "from mainTable group by date_format(mytime , 'YYYY-MM-dd HH:mm')") + val actual = sql("select * from maintable_agg0_minute") + checkAnswer(actual, expected) + } + + test("TimeSeriesPreAggregateTestCase_006") { + val expected = sql( + "select cast(date_format(mytime, 'YYYY-MM-dd HH:mm:ss') as timestamp) as mytime,sum(age) " + + "from mainTable group by date_format(mytime , 'YYYY-MM-dd HH:mm:ss')") + val actual = sql("select * from maintable_agg0_second") + checkAnswer(actual, expected) + } + + //test case for compaction + test("TimeSeriesPreAggregateTestCase_007") { + beforeEach + sql(s"LOAD DATA LOCAL INPATH '$csvPath' into table mainTable") + sql(s"LOAD DATA LOCAL INPATH '$csvPath' into table mainTable") + sql(s"LOAD DATA LOCAL INPATH '$csvPath' into table mainTable") + sql("alter table maintable compact 'minor'") + val segmentNamesSecond = sql("show segments for table maintable_agg0_second").collect() + .map(_.get(0).toString) + segmentNamesSecond should equal(Array("3", "2", "1", "0.1", "0")) + + val segmentNamesMinute = sql("show segments for table maintable_agg0_minute").collect() + .map(_.get(0).toString) + segmentNamesMinute should equal(Array("3", "2", "1", "0.1", "0")) + + val segmentNamesHour = sql("show segments for table maintable_agg0_hour").collect() + .map(_.get(0).toString) + segmentNamesHour should equal(Array("3", "2", "1", "0.1", "0")) + + val segmentNamesday = sql("show segments for table maintable_agg0_day").collect() + .map(_.get(0).toString) + segmentNamesday should equal(Array("3", "2", "1", "0.1", "0")) + + val segmentNamesmonth = sql("show segments for table maintable_agg0_month").collect() + .map(_.get(0).toString) + segmentNamesmonth should equal(Array("3", "2", "1", "0.1", "0")) + + val segmentNamesyear = sql("show segments for table maintable_agg0_year").collect() + .map(_.get(0).toString) + segmentNamesyear should equal(Array("3", "2", "1", "0.1", "0")) + } + + override def afterEach: Unit = { + sql("drop table if exists mainTable") + + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/67581cfe/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala index da6e829..9e15193 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala @@ -57,7 +57,9 @@ class SDVSuites extends Suites with BeforeAndAfterAll { new TimestamptypesTestCase :: new V3offheapvectorTestCase :: new Vector1TestCase :: - new Vector2TestCase :: Nil + new Vector2TestCase :: + new PreAggregateTestCase :: + new TimeSeriesPreAggregateTestCase :: Nil override val nestedSuites = suites.toIndexedSeq @@ -140,7 +142,9 @@ class SDVSuites3 extends Suites with BeforeAndAfterAll { new QueriesIncludeDictionaryTestCase :: new TestRegisterCarbonTable :: new TableCommentAlterTableTestCase :: - new StandardPartitionTestCase :: Nil + new StandardPartitionTestCase :: + new PreAggregateTestCase :: + new TimeSeriesPreAggregateTestCase :: Nil override val nestedSuites = suites.toIndexedSeq
