http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala index ae99800..d34f7a2 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala @@ -32,12 +32,9 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.CarbonMetadata -import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter case class FileElement(school: Array[String], age: Integer) case class StreamData(id: Integer, name: String, city: String, salary: java.lang.Float, @@ -411,53 +408,6 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { } } - test("query on stream table with dictionary, sort_columns, with merge index applied") { - executeStreamingIngest( - tableName = "stream_table_with_mi", - batchNums = 2, - rowNumsEachBatch = 25, - intervalOfSource = 5, - intervalOfIngest = 5, - continueSeconds = 20, - generateBadRecords = true, - badRecordAction = "force", - autoHandoff = false - ) - val carbonTable: CarbonTable = CarbonMetadata.getInstance - .getCarbonTable("streaming1", "stream_table_with_mi") - new CarbonIndexFileMergeWriter(carbonTable) - .mergeCarbonIndexFilesOfSegment("1", carbonTable.getTablePath, false, String.valueOf(System.currentTimeMillis())) - // non-filter - val result = sql("select * from streaming1.stream_table_with_mi order by id, name").collect() - assert(result != null) - assert(result.length == 55) - // check one row of streaming data - assert(result(1).isNullAt(0)) - assert(result(1).getString(1) == "name_6") - // check one row of batch loading - assert(result(50).getInt(0) == 100000001) - assert(result(50).getString(1) == "batch_1") - - // filter - checkAnswer( - sql("select * from stream_table_with_mi where id = 1"), - Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) - - checkAnswer( - sql("select * from stream_table_with_mi where id > 49 and id < 100000002"), - Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")), - Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) - - checkAnswer( - sql("select * from stream_table_with_mi where id between 50 and 100000001"), - Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")), - Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) - - checkAnswer( - sql("select * from stream_table_with_mi where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"), - Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) - - } test("query on stream table with dictionary, sort_columns and complex column") { executeStreamingIngest(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala index c6c647c..7ef86a5 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala @@ -43,9 +43,9 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA assertResult(2)(result.length) assertResult("table_info1")(result(0).getString(0)) // 2096 is the size of carbon table - assertResult(2098)(result(0).getLong(1)) + assertResult(2147)(result(0).getLong(1)) assertResult("table_info2")(result(1).getString(0)) - assertResult(2098)(result(1).getLong(1)) + assertResult(2147)(result(1).getLong(1)) } override def afterAll: Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java index bf8602c..f50f416 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java @@ -30,5 +30,6 @@ public enum CompactionType { STREAMING, CLOSE_STREAMING, CUSTOM, + SEGMENT_INDEX, NONE }
