Repository: carbondata Updated Branches: refs/heads/master 400395a8d -> cac619555
[CARBONDATA-1771] While segment_index compaction, .carbonindex files of invalid segments are also getting merged Scenario: Disable feature, do loads, execute MINOR compaction Execute SEGMENT_INDEX compaction SEGMENT_INDEX compaction merges .carbonindex files of compacted invalid segments also Solution: Merge index files of valid segments only This closes #1535 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cac61955 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cac61955 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cac61955 Branch: refs/heads/master Commit: cac6195553ec68b111c886fb27dc7ac3808d4502 Parents: 400395a Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Sun Nov 19 20:53:50 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Mon Nov 20 16:51:25 2017 +0530 ---------------------------------------------------------------------- .../CarbonIndexFileMergeTestCase.scala | 33 ++++++++++++++++++++ .../AlterTableCompactionCommand.scala | 2 +- 2 files changed, 34 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cac61955/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala index f06994c..c66107f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala @@ -148,6 +148,39 @@ class CarbonIndexFileMergeTestCase checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) } + test("Verify index index merge for compacted segments") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") + sql("DROP TABLE IF EXISTS nonindexmerge") + sql( + """ + | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " + + s"'GLOBAL_SORT_PARTITIONS'='100')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " + + s"'GLOBAL_SORT_PARTITIONS'='100')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " + + s"'GLOBAL_SORT_PARTITIONS'='100')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " + + s"'GLOBAL_SORT_PARTITIONS'='100')") + val rows = sql("""Select count(*) from nonindexmerge""").collect() + assert(getIndexFileCount("default_nonindexmerge", "0") == 100) + assert(getIndexFileCount("default_nonindexmerge", "1") == 100) + assert(getIndexFileCount("default_nonindexmerge", "2") == 100) + assert(getIndexFileCount("default_nonindexmerge", "3") == 100) + sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect() + sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect() + assert(getIndexFileCount("default_nonindexmerge", "0") == 100) + assert(getIndexFileCount("default_nonindexmerge", "1") == 100) + assert(getIndexFileCount("default_nonindexmerge", "2") == 100) + assert(getIndexFileCount("default_nonindexmerge", "3") == 100) + assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0) + checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) + } + private def getIndexFileCount(tableName: String, segment: String): Int = { val table = CarbonMetadata.getInstance().getCarbonTable(tableName) val path = CarbonTablePath http://git-wip-us.apache.org/repos/asf/carbondata/blob/cac61955/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala index 826d35a..6e11fe4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala @@ -142,7 +142,7 @@ case class AlterTableCompactionCommand( if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) { // Just launch job to merge index and return CommonUtil.mergeIndexFiles(sqlContext.sparkContext, - carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName), + CarbonDataMergerUtil.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala, carbonLoadModel.getTablePath, carbonTable, true) return