Repository: carbondata
Updated Branches:
  refs/heads/master 3ec7b3ffa -> 9d9415101


[HOTFIX][Streaming] Enhance streaming aggregate table testcase

Earlier in StreamingAgg testcase the query result is not verified. This PR adds 
verification for it.

This closes #2238


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9d941510
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9d941510
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9d941510

Branch: refs/heads/master
Commit: 9d9415101c1882add8efdc57203e654b73877a59
Parents: 3ec7b3f
Author: Jacky Li <[email protected]>
Authored: Fri Apr 27 15:42:54 2018 +0800
Committer: chenliang613 <[email protected]>
Committed: Sun Apr 29 15:09:18 2018 +0800

----------------------------------------------------------------------
 .../preaggregate/TestPreAggStreaming.scala      | 30 ++++++++++++--------
 1 file changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d941510/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
index 9377108..262c8b8 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
@@ -27,8 +27,7 @@ class TestPreAggStreaming extends QueryTest with 
BeforeAndAfterAll {
 
 
   override def beforeAll: Unit = {
-    sql("drop table if exists mainTable")
-    sql("drop table if exists mainTableStreamingOne")
+    dropAll
     sql("CREATE TABLE mainTable(id int, name string, city string, age string) 
STORED BY 'org.apache.carbondata.format' tblproperties('streaming'='true')")
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select 
name from mainTable group by name")
     sql("create datamap agg1 on table mainTable using 'preaggregate' as select 
name,sum(age) from mainTable group by name")
@@ -38,60 +37,62 @@ class TestPreAggStreaming extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap aggStreamingAvg on table mainTableStreamingOne using 
'preaggregate' as select name,avg(age) from mainTableStreamingOne group by 
name")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table mainTable")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table mainTableStreamingOne")
+    sql("CREATE TABLE origin(id int, name string, city string, age string) 
STORED BY 'org.apache.carbondata.format' tblproperties('streaming'='true')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table origin")
   }
 
   test("Test Pre Agg Streaming with project column and group by") {
     val df = sql("select name from maintable group by name")
-    df.collect()
     assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+    checkAnswer(df, sql("select name from origin group by name"))
   }
 
   test("Test Pre Agg Streaming table Agg Sum Aggregation") {
     val df = sql("select name, sum(age) from maintable group by name")
-    df.collect()
     assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+    checkAnswer(df, sql("select name, sum(age) from origin group by name"))
   }
 
   test("Test Pre Agg Streaming table with UDF") {
     val df = sql("select substring(name,1,1), sum(age) from maintable group by 
substring(name,1,1)")
-    df.collect()
     assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+    checkAnswer(df, sql("select substring(name,1,1), sum(age) from origin 
group by substring(name,1,1)"))
   }
 
   test("Test Pre Agg Streaming table with UDF Only in group by") {
     val df = sql("select sum(age) from maintable group by substring(name,1,1)")
-    df.collect()
     assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+    checkAnswer(df, sql("select sum(age) from origin group by 
substring(name,1,1)"))
   }
 
   test("Test Pre Agg Streaming table With Sum Aggregation And Order by") {
     val df = sql("select name, sum(age) from maintable group by name order by 
name")
-    df.collect()
     assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+    checkAnswer(df, sql("select name, sum(age) from origin group by name order 
by name"))
   }
 
   test("Test Pre Agg Streaming table With Avg Aggregation") {
     val df = sql("select name, avg(age) from maintable group by name order by 
name")
-    df.collect()
     assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+    checkAnswer(df, sql("select name, avg(age) from origin group by name order 
by name"))
   }
 
   test("Test Pre Agg Streaming table With Expression Aggregation") {
     val df = sql("select name, sum(CASE WHEN age=35 THEN id ELSE 0 END) from 
maintable group by name order by name")
-    df.collect()
     assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+    checkAnswer(df, sql("select name, sum(CASE WHEN age=35 THEN id ELSE 0 END) 
from origin group by name order by name"))
   }
 
   test("Test Pre Agg Streaming table With only aggregate expression and group 
by") {
     val df = sql("select sum(age) from maintable group by name")
-    df.collect()
     assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+    checkAnswer(df, sql("select sum(age) from origin group by name"))
   }
 
   test("Test Pre Agg Streaming table With small int and avg") {
     val df = sql("select name, avg(age) from mainTableStreamingOne group by 
name")
-    df.collect()
     assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+    checkAnswer(df, sql("select name, avg(age) from origin group by name"))
   }
 
   /**
@@ -119,8 +120,13 @@ class TestPreAggStreaming extends QueryTest with 
BeforeAndAfterAll {
     isChildTableExists
   }
 
-  override def afterAll: Unit = {
+  private def dropAll: Unit = {
     sql("drop table if exists mainTable")
     sql("drop table if exists mainTableStreamingOne")
+    sql("drop table if exists origin")
+  }
+
+  override def afterAll: Unit = {
+    dropAll
   }
 }

Reply via email to