Repository: carbondata
Updated Branches:
  refs/heads/master 7978b974d -> de52da40f


[HOTFIX] Fix streaming CI issue

Fix streaming CI issue

This closes #2093


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/de52da40
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/de52da40
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/de52da40

Branch: refs/heads/master
Commit: de52da40fcfc934e179395c3922966f80dea0fa2
Parents: 7978b97
Author: QiangCai <[email protected]>
Authored: Fri Mar 23 14:49:57 2018 +0800
Committer: chenliang613 <[email protected]>
Committed: Fri Mar 23 16:45:04 2018 +0800

----------------------------------------------------------------------
 .../TestStreamingTableOperation.scala           | 74 +++++++++++++-------
 1 file changed, 49 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/de52da40/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 00e1140..3719be9 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -998,31 +998,55 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
       Seq(Row(2 * 100))
     )
     try{
-      sql("set carbon.input.segments.streaming.stream_table_handoff = 1")
-      checkAnswer(
-        sql("select count(*) from streaming.stream_table_handoff"),
-        Seq(Row(100))
-      )
-      sql("set carbon.input.segments.streaming.stream_table_handoff = *")
-      checkAnswer(
-        sql("select count(*) from streaming.stream_table_handoff"),
-        Seq(Row(2 * 100))
-      )
-      sql("set carbon.input.segments.streaming.stream_table_handoff = 2")
-      checkAnswer(
-        sql("select count(*) from streaming.stream_table_handoff"),
-        Seq(Row(1 * 100))
-      )
-      sql("set carbon.input.segments.streaming.stream_table_handoff = 1,2")
-      checkAnswer(
-        sql("select count(*) from streaming.stream_table_handoff"),
-        Seq(Row(2 * 100))
-      )
-      sql("set carbon.input.segments.streaming.stream_table_handoff = 3")
-      checkAnswer(
-        sql("select count(*) from streaming.stream_table_handoff"),
-        Seq(Row(0))
-      )
+      if (newSegments1.length == 3 ) {
+        sql("set carbon.input.segments.streaming.stream_table_handoff = 1")
+        val segment1 = sql("select * from 
streaming.stream_table_handoff").count()
+        sql("set carbon.input.segments.streaming.stream_table_handoff = 2")
+        val segment2 = sql("select * from 
streaming.stream_table_handoff").count()
+        assertResult(2 * 100) (segment1 + segment2)
+
+        sql("set carbon.input.segments.streaming.stream_table_handoff = *")
+        checkAnswer(
+          sql("select count(*) from streaming.stream_table_handoff"),
+          Seq(Row(2 * 100))
+        )
+
+        sql("set carbon.input.segments.streaming.stream_table_handoff = 1,2")
+        checkAnswer(
+          sql("select count(*) from streaming.stream_table_handoff"),
+          Seq(Row(2 * 100))
+        )
+        sql("set carbon.input.segments.streaming.stream_table_handoff = 0,3")
+        checkAnswer(
+          sql("select count(*) from streaming.stream_table_handoff"),
+          Seq(Row(0))
+        )
+      } else if (newSegments1.length == 5) {
+        sql("set carbon.input.segments.streaming.stream_table_handoff = 2")
+        val segment2 = sql("select * from 
streaming.stream_table_handoff").count()
+
+        sql("set carbon.input.segments.streaming.stream_table_handoff = 3,4")
+        val segment34 = sql("select * from 
streaming.stream_table_handoff").count()
+        assertResult(2 * 100) (segment2 + segment34)
+
+        sql("set carbon.input.segments.streaming.stream_table_handoff = *")
+        checkAnswer(
+          sql("select count(*) from streaming.stream_table_handoff"),
+          Seq(Row(2 * 100))
+        )
+
+        sql("set carbon.input.segments.streaming.stream_table_handoff = 2,3,4")
+        checkAnswer(
+          sql("select count(*) from streaming.stream_table_handoff"),
+          Seq(Row(2 * 100))
+        )
+
+        sql("set carbon.input.segments.streaming.stream_table_handoff = 0,1,5")
+        checkAnswer(
+          sql("select count(*) from streaming.stream_table_handoff"),
+          Seq(Row(0))
+        )
+      }
     }
     finally {
       sql("set carbon.input.segments.streaming.stream_table_handoff = *")

Reply via email to