Repository: carbondata Updated Branches: refs/heads/master 7978b974d -> de52da40f
[HOTFIX] Fix streaming CI issue Fix streaming CI issue This closes #2093 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/de52da40 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/de52da40 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/de52da40 Branch: refs/heads/master Commit: de52da40fcfc934e179395c3922966f80dea0fa2 Parents: 7978b97 Author: QiangCai <[email protected]> Authored: Fri Mar 23 14:49:57 2018 +0800 Committer: chenliang613 <[email protected]> Committed: Fri Mar 23 16:45:04 2018 +0800 ---------------------------------------------------------------------- .../TestStreamingTableOperation.scala | 74 +++++++++++++------- 1 file changed, 49 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/de52da40/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 00e1140..3719be9 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -998,31 +998,55 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { Seq(Row(2 * 100)) ) try{ - sql("set carbon.input.segments.streaming.stream_table_handoff = 1") - checkAnswer( - sql("select count(*) from streaming.stream_table_handoff"), - Seq(Row(100)) - ) - sql("set carbon.input.segments.streaming.stream_table_handoff = *") - checkAnswer( - sql("select count(*) from streaming.stream_table_handoff"), - Seq(Row(2 * 100)) - ) - sql("set carbon.input.segments.streaming.stream_table_handoff = 2") - checkAnswer( - sql("select count(*) from streaming.stream_table_handoff"), - Seq(Row(1 * 100)) - ) - sql("set carbon.input.segments.streaming.stream_table_handoff = 1,2") - checkAnswer( - sql("select count(*) from streaming.stream_table_handoff"), - Seq(Row(2 * 100)) - ) - sql("set carbon.input.segments.streaming.stream_table_handoff = 3") - checkAnswer( - sql("select count(*) from streaming.stream_table_handoff"), - Seq(Row(0)) - ) + if (newSegments1.length == 3 ) { + sql("set carbon.input.segments.streaming.stream_table_handoff = 1") + val segment1 = sql("select * from streaming.stream_table_handoff").count() + sql("set carbon.input.segments.streaming.stream_table_handoff = 2") + val segment2 = sql("select * from streaming.stream_table_handoff").count() + assertResult(2 * 100) (segment1 + segment2) + + sql("set carbon.input.segments.streaming.stream_table_handoff = *") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(2 * 100)) + ) + + sql("set carbon.input.segments.streaming.stream_table_handoff = 1,2") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(2 * 100)) + ) + sql("set carbon.input.segments.streaming.stream_table_handoff = 0,3") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(0)) + ) + } else if (newSegments1.length == 5) { + sql("set carbon.input.segments.streaming.stream_table_handoff = 2") + val segment2 = sql("select * from streaming.stream_table_handoff").count() + + sql("set carbon.input.segments.streaming.stream_table_handoff = 3,4") + val segment34 = sql("select * from streaming.stream_table_handoff").count() + assertResult(2 * 100) (segment2 + segment34) + + sql("set carbon.input.segments.streaming.stream_table_handoff = *") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(2 * 100)) + ) + + sql("set carbon.input.segments.streaming.stream_table_handoff = 2,3,4") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(2 * 100)) + ) + + sql("set carbon.input.segments.streaming.stream_table_handoff = 0,1,5") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(0)) + ) + } } finally { sql("set carbon.input.segments.streaming.stream_table_handoff = *")
