Repository: carbondata Updated Branches: refs/heads/master 3ec7b3ffa -> 9d9415101
[HOTFIX][Streaming] Enhance streaming aggregate table testcase Earlier in StreamingAgg testcase the query result is not verified. This PR adds verification for it. This closes #2238 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9d941510 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9d941510 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9d941510 Branch: refs/heads/master Commit: 9d9415101c1882add8efdc57203e654b73877a59 Parents: 3ec7b3f Author: Jacky Li <[email protected]> Authored: Fri Apr 27 15:42:54 2018 +0800 Committer: chenliang613 <[email protected]> Committed: Sun Apr 29 15:09:18 2018 +0800 ---------------------------------------------------------------------- .../preaggregate/TestPreAggStreaming.scala | 30 ++++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d941510/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala index 9377108..262c8b8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala @@ -27,8 +27,7 @@ class TestPreAggStreaming extends QueryTest with BeforeAndAfterAll { override def beforeAll: Unit = { - sql("drop table if exists mainTable") - sql("drop table if exists mainTableStreamingOne") + dropAll sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format' tblproperties('streaming'='true')") sql("create datamap agg0 on table mainTable using 'preaggregate' as select name from mainTable group by name") sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name") @@ -38,60 +37,62 @@ class TestPreAggStreaming extends QueryTest with BeforeAndAfterAll { sql("create datamap aggStreamingAvg on table mainTableStreamingOne using 'preaggregate' as select name,avg(age) from mainTableStreamingOne group by name") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTableStreamingOne") + sql("CREATE TABLE origin(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format' tblproperties('streaming'='true')") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table origin") } test("Test Pre Agg Streaming with project column and group by") { val df = sql("select name from maintable group by name") - df.collect() assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + checkAnswer(df, sql("select name from origin group by name")) } test("Test Pre Agg Streaming table Agg Sum Aggregation") { val df = sql("select name, sum(age) from maintable group by name") - df.collect() assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + checkAnswer(df, sql("select name, sum(age) from origin group by name")) } test("Test Pre Agg Streaming table with UDF") { val df = sql("select substring(name,1,1), sum(age) from maintable group by substring(name,1,1)") - df.collect() assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + checkAnswer(df, sql("select substring(name,1,1), sum(age) from origin group by substring(name,1,1)")) } test("Test Pre Agg Streaming table with UDF Only in group by") { val df = sql("select sum(age) from maintable group by substring(name,1,1)") - df.collect() assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + checkAnswer(df, sql("select sum(age) from origin group by substring(name,1,1)")) } test("Test Pre Agg Streaming table With Sum Aggregation And Order by") { val df = sql("select name, sum(age) from maintable group by name order by name") - df.collect() assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + checkAnswer(df, sql("select name, sum(age) from origin group by name order by name")) } test("Test Pre Agg Streaming table With Avg Aggregation") { val df = sql("select name, avg(age) from maintable group by name order by name") - df.collect() assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + checkAnswer(df, sql("select name, avg(age) from origin group by name order by name")) } test("Test Pre Agg Streaming table With Expression Aggregation") { val df = sql("select name, sum(CASE WHEN age=35 THEN id ELSE 0 END) from maintable group by name order by name") - df.collect() assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + checkAnswer(df, sql("select name, sum(CASE WHEN age=35 THEN id ELSE 0 END) from origin group by name order by name")) } test("Test Pre Agg Streaming table With only aggregate expression and group by") { val df = sql("select sum(age) from maintable group by name") - df.collect() assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + checkAnswer(df, sql("select sum(age) from origin group by name")) } test("Test Pre Agg Streaming table With small int and avg") { val df = sql("select name, avg(age) from mainTableStreamingOne group by name") - df.collect() assert(validateStreamingTablePlan(df.queryExecution.analyzed)) + checkAnswer(df, sql("select name, avg(age) from origin group by name")) } /** @@ -119,8 +120,13 @@ class TestPreAggStreaming extends QueryTest with BeforeAndAfterAll { isChildTableExists } - override def afterAll: Unit = { + private def dropAll: Unit = { sql("drop table if exists mainTable") sql("drop table if exists mainTableStreamingOne") + sql("drop table if exists origin") + } + + override def afterAll: Unit = { + dropAll } }
