Repository: carbondata Updated Branches: refs/heads/master 2f4dbb694 -> 5a82232a8
[CI][Streaming] Reduce the execution time of TestStreamingTableOperation test suite Combine test case to reduce the execution time of TestStreamingTableOperation test suiteï¼ but not reduce the test coverage. This closes #1863 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5a82232a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5a82232a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5a82232a Branch: refs/heads/master Commit: 5a82232a85abf97bc6fad7c8b0ab036e5d74eab2 Parents: 2f4dbb6 Author: QiangCai <[email protected]> Authored: Fri Jan 26 09:40:46 2018 +0800 Committer: ravipesala <[email protected]> Committed: Sun Jan 28 13:05:32 2018 +0530 ---------------------------------------------------------------------- .../TestStreamingTableOperation.scala | 659 +++++-------------- 1 file changed, 170 insertions(+), 489 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a82232a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 62076bf..3de1391 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -63,42 +63,23 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { createTable(tableName = "batch_table", streaming = false, withBatchLoad = true) // 2. streaming table with different input source - // socket source - createTable(tableName = "stream_table_socket", streaming = true, withBatchLoad = true) // file source createTable(tableName = "stream_table_file", streaming = true, withBatchLoad = true) // 3. streaming table with bad records - createTable(tableName = "bad_record_force", streaming = true, withBatchLoad = true) createTable(tableName = "bad_record_fail", streaming = true, withBatchLoad = true) // 4. streaming frequency check createTable(tableName = "stream_table_1s", streaming = true, withBatchLoad = true) - createTable(tableName = "stream_table_10s", streaming = true, withBatchLoad = true) // 5. streaming table execute batch loading - createTable(tableName = "stream_table_batch", streaming = true, withBatchLoad = false) - // 6. detail query - // full scan - createTable(tableName = "stream_table_scan", streaming = true, withBatchLoad = true) - createTableWithComplexType( - tableName = "stream_table_scan_complex", streaming = true, withBatchLoad = true) - // filter scan + // 8. compaction + // full scan + filter scan + aggregate query createTable(tableName = "stream_table_filter", streaming = true, withBatchLoad = true) - createTableWithComplexType( - tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true) - // 7. aggregate query - createTable(tableName = "stream_table_agg", streaming = true, withBatchLoad = true) createTableWithComplexType( - tableName = "stream_table_agg_complex", streaming = true, withBatchLoad = true) - - // 8. compaction - createTable(tableName = "stream_table_compact", streaming = true, withBatchLoad = true) - - // 9. create new stream segment if current stream segment is full - createTable(tableName = "stream_table_new", streaming = true, withBatchLoad = true) + tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true) // 10. fault tolerant createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true) @@ -108,22 +89,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { createTable(tableName = "stream_table_delete_date", streaming = true, withBatchLoad = false) // 12. reject alter streaming properties - createTable(tableName = "stream_table_alter", streaming = false, withBatchLoad = false) - - // 13. handoff streaming segment - createTable(tableName = "stream_table_handoff", streaming = true, withBatchLoad = false) - - // 14. finish streaming - createTable(tableName = "stream_table_finish", streaming = true, withBatchLoad = true) + // 13. handoff streaming segment and finish streaming + createTable(tableName = "stream_table_handoff", streaming = false, withBatchLoad = false) // 15. auto handoff streaming segment - createTable(tableName = "stream_table_auto_handoff", streaming = true, withBatchLoad = false) - // 16. close streaming table - createTable(tableName = "stream_table_close", streaming = true, withBatchLoad = false) - createTable(tableName = "stream_table_close_auto_handoff", streaming = true, withBatchLoad = false) - // 17. reopen streaming table after close + // 9. create new stream segment if current stream segment is full createTable(tableName = "stream_table_reopen", streaming = true, withBatchLoad = false) // 18. block drop table while streaming is in progress @@ -193,30 +165,15 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { def dropTable(): Unit = { sql("drop table if exists streaming.batch_table") - sql("drop table if exists streaming.stream_table_socket") sql("drop table if exists streaming.stream_table_file") - sql("drop table if exists streaming.bad_record_force") sql("drop table if exists streaming.bad_record_fail") sql("drop table if exists streaming.stream_table_1s") - sql("drop table if exists streaming.stream_table_10s") - sql("drop table if exists streaming.stream_table_batch") - sql("drop table if exists streaming.stream_table_scan") - sql("drop table if exists streaming.stream_table_scan_complex") - sql("drop table if exists streaming.stream_table_filter") + sql("drop table if exists streaming.stream_table_filter ") sql("drop table if exists streaming.stream_table_filter_complex") - sql("drop table if exists streaming.stream_table_agg") - sql("drop table if exists streaming.stream_table_agg_complex") - sql("drop table if exists streaming.stream_table_compact") - sql("drop table if exists streaming.stream_table_new") sql("drop table if exists streaming.stream_table_tolerant") sql("drop table if exists streaming.stream_table_delete_id") sql("drop table if exists streaming.stream_table_delete_date") - sql("drop table if exists streaming.stream_table_alter") sql("drop table if exists streaming.stream_table_handoff") - sql("drop table if exists streaming.stream_table_finish") - sql("drop table if exists streaming.stream_table_auto_handoff") - sql("drop table if exists streaming.stream_table_close") - sql("drop table if exists streaming.stream_table_close_auto_handoff") sql("drop table if exists streaming.stream_table_reopen") sql("drop table if exists streaming.stream_table_drop") sql("drop table if exists streaming.agg_table_block") @@ -254,26 +211,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { } } - // input source: socket - test("streaming ingest from socket source") { - executeStreamingIngest( - tableName = "stream_table_socket", - batchNums = 2, - rowNumsEachBatch = 10, - intervalOfSource = 1, - intervalOfIngest = 1, - continueSeconds = 10, - generateBadRecords = false, - badRecordAction = "force", - autoHandoff = false - ) - - checkAnswer( - sql("select count(*) from streaming.stream_table_socket"), - Seq(Row(25)) - ) - } - // input source: file test("streaming ingest from file source") { val identifier = new TableIdentifier("stream_table_file", Option("streaming")) @@ -288,7 +225,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { thread.start() Thread.sleep(2000) generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir) - Thread.sleep(10000) + Thread.sleep(5000) thread.interrupt() checkAnswer( sql("select count(*) from streaming.stream_table_file"), @@ -297,25 +234,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { } // bad records - test("streaming table with bad records action: force") { - executeStreamingIngest( - tableName = "bad_record_force", - batchNums = 2, - rowNumsEachBatch = 10, - intervalOfSource = 1, - intervalOfIngest = 1, - continueSeconds = 10, - generateBadRecords = true, - badRecordAction = "force", - autoHandoff = false - ) - checkAnswer( - sql("select count(*) from streaming.stream_table_socket"), - Seq(Row(25)) - ) - - } - test("streaming table with bad records action: fail") { executeStreamingIngest( tableName = "bad_record_fail", @@ -323,138 +241,58 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { rowNumsEachBatch = 10, intervalOfSource = 1, intervalOfIngest = 1, - continueSeconds = 10, + continueSeconds = 8, generateBadRecords = true, badRecordAction = "fail", autoHandoff = false ) val result = sql("select count(*) from streaming.bad_record_fail").collect() - assert(result(0).getLong(0) < 25) + assert(result(0).getLong(0) < 10 + 5) } // ingest with different interval test("1 row per 1 second interval") { executeStreamingIngest( tableName = "stream_table_1s", - batchNums = 20, + batchNums = 3, rowNumsEachBatch = 1, intervalOfSource = 1, intervalOfIngest = 1, - continueSeconds = 20, + continueSeconds = 6, generateBadRecords = false, badRecordAction = "force", autoHandoff = false ) val result = sql("select count(*) from streaming.stream_table_1s").collect() // 20 seconds can't ingest all data, exists data delay - assert(result(0).getLong(0) > 5 + 10) + assert(result(0).getLong(0) > 5) } - test("10000 row per 10 seconds interval") { + test("query on stream table with dictionary, sort_columns") { executeStreamingIngest( - tableName = "stream_table_10s", - batchNums = 5, - rowNumsEachBatch = 10000, + tableName = "stream_table_filter", + batchNums = 2, + rowNumsEachBatch = 25, intervalOfSource = 5, - intervalOfIngest = 10, - continueSeconds = 50, - generateBadRecords = false, - badRecordAction = "force", - autoHandoff = false - ) - checkAnswer( - sql("select count(*) from streaming.stream_table_10s"), - Seq(Row(5 + 10000 * 5))) - } - - // batch loading on streaming table - test("streaming table execute batch loading") { - executeStreamingIngest( - tableName = "stream_table_batch", - batchNums = 5, - rowNumsEachBatch = 100, - intervalOfSource = 3, intervalOfIngest = 5, - continueSeconds = 30, - generateBadRecords = false, - badRecordAction = "force", - autoHandoff = false - ) - checkAnswer( - sql("select count(*) from streaming.stream_table_batch"), - Seq(Row(100 * 5))) - - executeBatchLoad("stream_table_batch") - - checkAnswer( - sql("select count(*) from streaming.stream_table_batch"), - Seq(Row(100 * 5 + 5))) - } - - // detail query on batch and stream segment - test("non-filter query on stream table with dictionary, sort_columns") { - executeStreamingIngest( - tableName = "stream_table_scan", - batchNums = 5, - rowNumsEachBatch = 10, - intervalOfSource = 2, - intervalOfIngest = 4, - continueSeconds = 20, - generateBadRecords = false, - badRecordAction = "force", - autoHandoff = false - ) - - val result = sql("select * from streaming.stream_table_scan order by id").collect() - assert(result != null) - assert(result.length == 55) - // check one row of streaming data - assert(result(0).getInt(0) == 1) - assert(result(0).getString(1) == "name_1") - // check one row of batch loading - assert(result(50).getInt(0) == 100000001) - assert(result(50).getString(1) == "batch_1") - } - - test("non-filter query on stream table with dictionary, sort_columns and complex column") { - executeStreamingIngest( - tableName = "stream_table_scan_complex", - batchNums = 5, - rowNumsEachBatch = 10, - intervalOfSource = 2, - intervalOfIngest = 4, continueSeconds = 20, - generateBadRecords = false, + generateBadRecords = true, badRecordAction = "force", autoHandoff = false ) - val result = sql("select * from streaming.stream_table_scan_complex order by id").collect() + // non-filter + val result = sql("select * from streaming.stream_table_filter order by id").collect() assert(result != null) assert(result.length == 55) // check one row of streaming data - assert(result(0).getInt(0) == 1) - assert(result(0).getString(1) == "name_1") - assert(result(0).getStruct(4).getInt(1) == 1) + assert(result(1).getInt(0) == 1) + assert(result(1).getString(1) == "name_1") // check one row of batch loading assert(result(50).getInt(0) == 100000001) assert(result(50).getString(1) == "batch_1") - assert(result(50).getStruct(4).getInt(1) == 20) - } - - test("filter query on stream table with dictionary, sort_columns") { - executeStreamingIngest( - tableName = "stream_table_filter", - batchNums = 5, - rowNumsEachBatch = 10, - intervalOfSource = 2, - intervalOfIngest = 4, - continueSeconds = 20, - generateBadRecords = true, - badRecordAction = "force", - autoHandoff = false - ) + // filter checkAnswer( sql("select * from stream_table_filter where id = 1"), Seq(Row(1, "name_1", "city_1", 10000.0))) @@ -481,21 +319,77 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("select * from stream_table_filter where city = ''"), Seq(Row(2, "name_2", "", 20000.0))) + // agg + checkAnswer( + sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " + + "from stream_table_filter where id >= 2 and id <= 100000004"), + Seq(Row(52, 100000004, "batch_1", 7692332, 400001278))) + + checkAnswer( + sql("select city, count(id), sum(id), cast(avg(id) as integer), " + + "max(salary), min(salary) " + + "from stream_table_filter " + + "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " + + "and city <> '' " + + "group by city " + + "order by city"), + Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1), + Row("city_2", 1, 100000002, 100000002, 0.2, 0.2), + Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3))) + + // batch loading + for(_ <- 0 to 2) { + executeBatchLoad("stream_table_filter") + } + checkAnswer( + sql("select count(*) from streaming.stream_table_filter"), + Seq(Row(25 * 2 + 5 + 5 * 3))) + + sql("alter table streaming.stream_table_filter compact 'minor'") + Thread.sleep(5000) + val result1 = sql("show segments for table streaming.stream_table_filter").collect() + result1.foreach { row => + if (row.getString(0).equals("1")) { + assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1)) + assertResult(FileFormat.ROW_V1.toString)(row.getString(5)) + } else if (row.getString(0).equals("0.1")) { + assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1)) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5)) + } else { + assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1)) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5)) + } + } + } - test("filter query on stream table with dictionary, sort_columns and complex column") { + test("query on stream table with dictionary, sort_columns and complex column") { executeStreamingIngest( tableName = "stream_table_filter_complex", - batchNums = 5, - rowNumsEachBatch = 10, - intervalOfSource = 2, - intervalOfIngest = 4, + batchNums = 2, + rowNumsEachBatch = 25, + intervalOfSource = 5, + intervalOfIngest = 5, continueSeconds = 20, generateBadRecords = true, badRecordAction = "force", autoHandoff = false ) + // non-filter + val result = sql("select * from streaming.stream_table_filter_complex order by id").collect() + assert(result != null) + assert(result.length == 55) + // check one row of streaming data + assert(result(0).isNullAt(0)) + assert(result(0).getString(1) == "name_6") + assert(result(0).getStruct(4).getInt(1) == 6) + // check one row of batch loading + assert(result(50).getInt(0) == 100000001) + assert(result(50).getString(1) == "batch_1") + assert(result(50).getStruct(4).getInt(1) == 20) + + // filter checkAnswer( sql("select * from stream_table_filter_complex where id = 1"), Seq(Row(1, "name_1", "city_1", 10000.0, Row(wrap(Array("school_1", "school_11")), 1)))) @@ -526,62 +420,16 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("select * from stream_table_filter_complex where city = ''"), Seq(Row(2, "name_2", "", 20000.0, Row(wrap(Array("school_2", "school_22")), 2)))) - } - - // aggregation - test("aggregation query") { - executeStreamingIngest( - tableName = "stream_table_agg", - batchNums = 5, - rowNumsEachBatch = 10, - intervalOfSource = 2, - intervalOfIngest = 4, - continueSeconds = 20, - generateBadRecords = true, - badRecordAction = "force", - autoHandoff = false - ) - - checkAnswer( - sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " + - "from stream_table_agg where id >= 2 and id <= 100000004"), - Seq(Row(52, 100000004, "batch_1", 7692332, 400001278))) - - checkAnswer( - sql("select city, count(id), sum(id), cast(avg(id) as integer), " + - "max(salary), min(salary) " + - "from stream_table_agg " + - "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " + - "and city <> '' " + - "group by city " + - "order by city"), - Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1), - Row("city_2", 1, 100000002, 100000002, 0.2, 0.2), - Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3))) - } - - test("aggregation query with complex") { - executeStreamingIngest( - tableName = "stream_table_agg_complex", - batchNums = 5, - rowNumsEachBatch = 10, - intervalOfSource = 2, - intervalOfIngest = 4, - continueSeconds = 20, - generateBadRecords = true, - badRecordAction = "force", - autoHandoff = false - ) - + // agg checkAnswer( sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " + - "from stream_table_agg_complex where id >= 2 and id <= 100000004"), + "from stream_table_filter_complex where id >= 2 and id <= 100000004"), Seq(Row(52, 100000004, "batch_1", 27, 1408))) checkAnswer( sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " + "max(salary), min(salary) " + - "from stream_table_agg_complex " + + "from stream_table_filter_complex " + "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " + "and city <> '' " + "group by city " + @@ -591,70 +439,19 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { Row("city_3", 2, 100000006, 21, 30000.0, 0.3))) } - // compaction - test("test compaction on stream table") { - executeStreamingIngest( - tableName = "stream_table_compact", - batchNums = 5, - rowNumsEachBatch = 10, - intervalOfSource = 2, - intervalOfIngest = 4, - continueSeconds = 20, - generateBadRecords = false, - badRecordAction = "force", - autoHandoff = false - ) - for (_ <- 0 to 3) { - executeBatchLoad("stream_table_compact") - } - - sql("alter table streaming.stream_table_compact compact 'minor'") - - val result = sql("show segments for table streaming.stream_table_compact").collect() - result.foreach { row => - if (row.getString(0).equals("1")) { - assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1)) - assertResult(FileFormat.ROW_V1.toString)(row.getString(5)) - } - } - } - - // stream segment max size - test("create new stream segment if current stream segment is full") { - executeStreamingIngest( - tableName = "stream_table_new", - batchNums = 6, - rowNumsEachBatch = 10000, - intervalOfSource = 5, - intervalOfIngest = 10, - continueSeconds = 40, - generateBadRecords = false, - badRecordAction = "force", - handoffSize = 1024L * 200, - autoHandoff = false - ) - assert(sql("show segments for table streaming.stream_table_new").count() > 1) - - checkAnswer( - sql("select count(*) from streaming.stream_table_new"), - Seq(Row(5 + 10000 * 6)) - ) - } - test("test deleting streaming segment by ID while ingesting") { executeStreamingIngest( tableName = "stream_table_delete_id", - batchNums = 6, - rowNumsEachBatch = 10000, - intervalOfSource = 3, + batchNums = 3, + rowNumsEachBatch = 100, + intervalOfSource = 5, intervalOfIngest = 5, - continueSeconds = 20, + continueSeconds = 18, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200, + handoffSize = 1, autoHandoff = false ) - Thread.sleep(10000) val beforeDelete = sql("show segments for table streaming.stream_table_delete_id").collect() val segmentIds1 = beforeDelete.filter(_.getString(1).equals("Streaming")).map(_.getString(0)).mkString(",") val msg = intercept[Exception] { @@ -674,17 +471,16 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { test("test deleting streaming segment by date while ingesting") { executeStreamingIngest( tableName = "stream_table_delete_date", - batchNums = 6, - rowNumsEachBatch = 10000, - intervalOfSource = 3, + batchNums = 3, + rowNumsEachBatch = 100, + intervalOfSource = 5, intervalOfIngest = 5, - continueSeconds = 20, + continueSeconds = 18, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200, + handoffSize = 1, autoHandoff = false ) - Thread.sleep(10000) val beforeDelete = sql("show segments for table streaming.stream_table_delete_date").collect() sql(s"delete from table streaming.stream_table_delete_date where segment.starttime before " + s"'2999-10-01 01:00:00'") @@ -696,230 +492,99 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { } } - test("reject alter streaming properties") { + test("reject alter streaming properties and handoff 'streaming finish' segment to columnar segment") { try { - sql("ALTER TABLE streaming.stream_table_alter UNSET TBLPROPERTIES IF EXISTS ('streaming')") + sql("ALTER TABLE streaming.stream_table_handoff UNSET TBLPROPERTIES IF EXISTS ('streaming')") assert(false, "unsupport to unset streaming property") } catch { case _ => assert(true) } try { - sql("ALTER TABLE streaming.stream_table_alter SET TBLPROPERTIES('streaming'='true')") + sql("ALTER TABLE streaming.stream_table_handoff SET TBLPROPERTIES('streaming'='true')") executeStreamingIngest( - tableName = "stream_table_alter", - batchNums = 6, - rowNumsEachBatch = 10000, + tableName = "stream_table_handoff", + batchNums = 2, + rowNumsEachBatch = 100, intervalOfSource = 5, - intervalOfIngest = 10, - continueSeconds = 40, + intervalOfIngest = 5, + continueSeconds = 20, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200, + handoffSize = 1L, autoHandoff = false ) - checkAnswer( - sql("select count(*) from streaming.stream_table_alter"), - Seq(Row(6 * 10000)) - ) } catch { case _ => assert(false, "should support set table to streaming") } - - try { - sql("ALTER TABLE stream_table_alter SET TBLPROPERTIES('streaming'='false')") - assert(false, "unsupport disable streaming properties") - } catch { - case _ => - assert(true) - } - } - - test("handoff 'streaming finish' segment to columnar segment") { - executeStreamingIngest( - tableName = "stream_table_handoff", - batchNums = 6, - rowNumsEachBatch = 10000, - intervalOfSource = 5, - intervalOfIngest = 10, - continueSeconds = 40, - generateBadRecords = false, - badRecordAction = "force", - handoffSize = 1024L * 200, - autoHandoff = false - ) val segments = sql("show segments for table streaming.stream_table_handoff").collect() - assert(segments.length == 3 || segments.length == 4) + assert(segments.length == 2 || segments.length == 3) assertResult("Streaming")(segments(0).getString(1)) - (1 to segments.length - 1).foreach { index => - assertResult("Streaming Finish")(segments(index).getString(1)) - } + assertResult("Streaming Finish")(segments(1).getString(1)) checkAnswer( sql("select count(*) from streaming.stream_table_handoff"), - Seq(Row(6 * 10000)) + Seq(Row(2 * 100)) ) sql("alter table streaming.stream_table_handoff compact 'streaming'") - Thread.sleep(10000) + Thread.sleep(5000) val newSegments = sql("show segments for table streaming.stream_table_handoff").collect() - assertResult(5)(newSegments.length) - assertResult("Success")(newSegments(0).getString(1)) - assertResult("Success")(newSegments(1).getString(1)) - assertResult("Streaming")(newSegments(2).getString(1)) - assertResult("Compacted")(newSegments(3).getString(1)) - assertResult("Compacted")(newSegments(4).getString(1)) - checkAnswer( - sql("select count(*) from streaming.stream_table_handoff"), - Seq(Row(6 * 10000)) - ) - } - - test("auto handoff 'streaming finish' segment to columnar segment") { - executeStreamingIngest( - tableName = "stream_table_auto_handoff", - batchNums = 6, - rowNumsEachBatch = 10000, - intervalOfSource = 5, - intervalOfIngest = 10, - continueSeconds = 40, - generateBadRecords = false, - badRecordAction = "force", - handoffSize = 1024L * 200, - autoHandoff = true - ) - Thread.sleep(10000) - val segments = sql("show segments for table streaming.stream_table_auto_handoff").collect() - assertResult(5)(segments.length) - assertResult(2)(segments.filter(_.getString(1).equals("Success")).length) - assertResult(2)(segments.filter(_.getString(1).equals("Compacted")).length) - assertResult(1)(segments.filter(_.getString(1).equals("Streaming")).length) - checkAnswer( - sql("select count(*) from streaming.stream_table_auto_handoff"), - Seq(Row(6 * 10000)) - ) - } - - test("alter table finish streaming") { - executeStreamingIngest( - tableName = "stream_table_finish", - batchNums = 6, - rowNumsEachBatch = 10000, - intervalOfSource = 5, - intervalOfIngest = 10, - continueSeconds = 40, - generateBadRecords = false, - badRecordAction = "force", - handoffSize = 1024L * 200, - autoHandoff = false - ) - sql("alter table streaming.stream_table_finish finish streaming") - - val segments = sql("show segments for table streaming.stream_table_finish").collect() - assert(segments.length == 4 || segments.length == 5) - (0 to segments.length -2).foreach { index => - assertResult("Streaming Finish")(segments(index).getString(1)) + assert(newSegments.length == 3 || newSegments.length == 5) + assertResult("Streaming")(newSegments((newSegments.length - 1) / 2).getString(1)) + (0 until (newSegments.length - 1) / 2).foreach{ i => + assertResult("Success")(newSegments(i).getString(1)) + } + ((newSegments.length + 1) / 2 until newSegments.length).foreach{ i => + assertResult("Compacted")(newSegments(i).getString(1)) } - assertResult("Success")(segments(segments.length - 1).getString(1)) - checkAnswer( - sql("select count(*) from streaming.stream_table_finish"), - Seq(Row(5 + 6 * 10000)) - ) - } - - test("alter table close streaming") { - executeStreamingIngest( - tableName = "stream_table_close", - batchNums = 6, - rowNumsEachBatch = 10000, - intervalOfSource = 5, - intervalOfIngest = 10, - continueSeconds = 40, - generateBadRecords = false, - badRecordAction = "force", - handoffSize = 1024L * 200, - autoHandoff = false - ) - - val table1 = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close")(spark) - assertResult(true)(table1.isStreamingTable) - sql("alter table streaming.stream_table_close compact 'close_streaming'") - - val segments = sql("show segments for table streaming.stream_table_close").collect() - assertResult(6)(segments.length) - assertResult(3)(segments.filter(_.getString(1).equals("Success")).length) - assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length) - checkAnswer( - sql("select count(*) from streaming.stream_table_close"), - Seq(Row(6 * 10000)) - ) - val table2 = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close")(spark) - assertResult(false)(table2.isStreamingTable) - } - - test("alter table close streaming with auto handoff") { - executeStreamingIngest( - tableName = "stream_table_close_auto_handoff", - batchNums = 6, - rowNumsEachBatch = 10000, - intervalOfSource = 5, - intervalOfIngest = 10, - continueSeconds = 40, - generateBadRecords = false, - badRecordAction = "force", - handoffSize = 1024L * 200, - autoHandoff = true - ) - Thread.sleep(10000) - val table1 = - CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close_auto_handoff")(spark) - assertResult(true)(table1.isStreamingTable) + sql("alter table streaming.stream_table_handoff finish streaming") + val newSegments1 = sql("show segments for table streaming.stream_table_handoff").collect() + assertResult("Streaming Finish")(newSegments1((newSegments.length - 1) / 2).getString(1)) - sql("alter table streaming.stream_table_close_auto_handoff compact 'close_streaming'") - val segments = - sql("show segments for table streaming.stream_table_close_auto_handoff").collect() - assertResult(6)(segments.length) - assertResult(3)(segments.filter(_.getString(1).equals("Success")).length) - assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length) checkAnswer( - sql("select count(*) from streaming.stream_table_close_auto_handoff"), - Seq(Row(6 * 10000)) + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(2 * 100)) ) - val table2 = - CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close_auto_handoff")(spark) - assertResult(false)(table2.isStreamingTable) + try { + sql("ALTER TABLE stream_table_handoff SET TBLPROPERTIES('streaming'='false')") + assert(false, "unsupport disable streaming properties") + } catch { + case _ => + assert(true) + } } - test("reopen streaming table") { + test("auto hand off, close and reopen streaming table") { executeStreamingIngest( tableName = "stream_table_reopen", - batchNums = 6, - rowNumsEachBatch = 10000, + batchNums = 2, + rowNumsEachBatch = 100, intervalOfSource = 5, - intervalOfIngest = 10, - continueSeconds = 40, + intervalOfIngest = 5, + continueSeconds = 20, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200, - autoHandoff = true + handoffSize = 1L, + autoHandoff = false ) - Thread.sleep(10000) - val table1 = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark) assertResult(true)(table1.isStreamingTable) sql("alter table streaming.stream_table_reopen compact 'close_streaming'") + val segments = sql("show segments for table streaming.stream_table_reopen").collect() - assertResult(6)(segments.length) - assertResult(3)(segments.filter(_.getString(1).equals("Success")).length) - assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length) + assert(segments.length == 4 || segments.length == 6) + assertResult(segments.length / 2)(segments.filter(_.getString(1).equals("Success")).length) + assertResult(segments.length / 2)(segments.filter(_.getString(1).equals("Compacted")).length) + checkAnswer( sql("select count(*) from streaming.stream_table_reopen"), - Seq(Row(6 * 10000)) + Seq(Row(2 * 100)) ) val table2 = @@ -934,21 +599,34 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { executeStreamingIngest( tableName = "stream_table_reopen", - batchNums = 6, - rowNumsEachBatch = 10000, + batchNums = 2, + rowNumsEachBatch = 100, intervalOfSource = 5, - intervalOfIngest = 10, - continueSeconds = 40, + intervalOfIngest = 5, + continueSeconds = 20, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200, + handoffSize = 1L, autoHandoff = true ) Thread.sleep(10000) + val newSegments1 = + sql("show segments for table streaming.stream_table_reopen").collect() + assert(newSegments1.length == 7 || newSegments1.length == 9 || newSegments1.length == 11) + assertResult(1)(newSegments1.filter(_.getString(1).equals("Streaming")).length) + assertResult((newSegments1.length - 1) / 2)(newSegments1.filter(_.getString(1).equals("Success")).length) + assertResult((newSegments1.length - 1) / 2)(newSegments1.filter(_.getString(1).equals("Compacted")).length) + + sql("alter table streaming.stream_table_reopen compact 'close_streaming'") + val newSegments = + sql("show segments for table streaming.stream_table_reopen").collect() + assert(newSegments.length == 8 || newSegments.length == 10 || newSegments.length == 12) + assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Success")).length) + assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Compacted")).length) checkAnswer( sql("select count(*) from streaming.stream_table_reopen"), - Seq(Row(6 * 10000 * 2)) + Seq(Row(2 * 100 * 2)) ) } @@ -960,7 +638,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { var server: ServerSocket = null try { server = getServerSocket - val thread1 = createWriteSocketThread(server, 2, 10, 5) + val thread1 = createWriteSocketThread(server, 2, 10, 3) val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier, "force", 5, 1024L * 200, false) thread1.start() thread2.start() @@ -1021,30 +699,33 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { var index = 0 for (_ <- 1 to writeNums) { // write 5 records per iteration + val stringBuilder = new StringBuilder() for (_ <- 1 to rowNums) { index = index + 1 if (badRecords) { if (index == 2) { // null value - socketWriter.println(index.toString + ",name_" + index + stringBuilder.append(index.toString + ",name_" + index + ",," + (10000.00 * index).toString + ",school_" + index + ":school_" + index + index + "$" + index) } else if (index == 6) { // illegal number - socketWriter.println(index.toString + "abc,name_" + index + stringBuilder.append(index.toString + "abc,name_" + index + ",city_" + index + "," + (10000.00 * index).toString + ",school_" + index + ":school_" + index + index + "$" + index) } else { - socketWriter.println(index.toString + ",name_" + index + stringBuilder.append(index.toString + ",name_" + index + ",city_" + index + "," + (10000.00 * index).toString + ",school_" + index + ":school_" + index + index + "$" + index) } } else { - socketWriter.println(index.toString + ",name_" + index + stringBuilder.append(index.toString + ",name_" + index + ",city_" + index + "," + (10000.00 * index).toString + ",school_" + index + ":school_" + index + index + "$" + index) } + stringBuilder.append("\n") } + socketWriter.append(stringBuilder.toString()) socketWriter.flush() Thread.sleep(1000 * intervalSecond) } @@ -1137,8 +818,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { thread1.start() thread2.start() Thread.sleep(continueSeconds * 1000) - thread1.interrupt() thread2.interrupt() + thread1.interrupt() } finally { if (null != server) { server.close()
