[CARBONDATA-2704] Index file size in describe formatted command is not updated correctly with the segment file
Problem: Describe formatted command is not showing correct index files size after index files merge. Solution: Segment file should be updated with the actual index files size of that segment after index files merge. This closes #2462 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eb604fdb Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eb604fdb Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eb604fdb Branch: refs/heads/carbonstore Commit: eb604fdb73983dfe9396d488a51907d90ed51d3e Parents: cdee81d Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Mon Jul 9 11:19:51 2018 +0530 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Sun Jul 15 20:34:32 2018 +0530 ---------------------------------------------------------------------- .../core/metadata/SegmentFileStore.java | 4 +- .../apache/carbondata/core/util/CarbonUtil.java | 48 ++++++++++++------- .../core/writer/CarbonIndexFileMergeWriter.java | 17 +++---- .../CarbonIndexFileMergeTestCase.scala | 50 ++++++++++++++++++++ .../spark/rdd/CarbonDataRDDFactory.scala | 4 +- 5 files changed, 96 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb604fdb/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 3d3b245..ce79e65 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -281,7 +281,7 @@ public class SegmentFileStore { * @throws IOException */ public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile, - String tableId) throws IOException { + String tableId, SegmentFileStore segmentFileStore) throws IOException { boolean status = false; String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath); if (!FileFactory.isFileExist(tableStatusPath)) { @@ -308,6 +308,8 @@ public class SegmentFileStore { // if the segments is in the list of marked for delete then update the status. if (segmentId.equals(detail.getLoadName())) { detail.setSegmentFile(segmentFile); + detail.setIndexSize(String.valueOf(CarbonUtil + .getCarbonIndexSize(segmentFileStore, segmentFileStore.getLocationMap()))); break; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb604fdb/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index e87e52c..9796696 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2647,23 +2647,7 @@ public final class CarbonUtil { fileStore.readIndexFiles(); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); // get the size of carbonindex file - for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : locationMap.entrySet()) { - SegmentFileStore.FolderDetails folderDetails = entry.getValue(); - Set<String> carbonindexFiles = folderDetails.getFiles(); - String mergeFileName = folderDetails.getMergeFileName(); - if (null != mergeFileName) { - String mergeIndexPath = - fileStore.getTablePath() + entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR - + mergeFileName; - carbonIndexSize += FileFactory.getCarbonFile(mergeIndexPath).getSize(); - } - for (String indexFile : carbonindexFiles) { - String indexPath = - fileStore.getTablePath() + entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR - + indexFile; - carbonIndexSize += FileFactory.getCarbonFile(indexPath).getSize(); - } - } + carbonIndexSize = getCarbonIndexSize(fileStore, locationMap); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { // get the size of carbondata files for (String blockFile : entry.getValue()) { @@ -2676,6 +2660,36 @@ public final class CarbonUtil { return dataAndIndexSize; } + /** + * Calcuate the index files size of the segment + * + * @param fileStore + * @param locationMap + * @return + */ + public static long getCarbonIndexSize(SegmentFileStore fileStore, + Map<String, SegmentFileStore.FolderDetails> locationMap) { + long carbonIndexSize = 0L; + for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : locationMap.entrySet()) { + SegmentFileStore.FolderDetails folderDetails = entry.getValue(); + Set<String> carbonindexFiles = folderDetails.getFiles(); + String mergeFileName = folderDetails.getMergeFileName(); + if (null != mergeFileName) { + String mergeIndexPath = + fileStore.getTablePath() + entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + + mergeFileName; + carbonIndexSize += FileFactory.getCarbonFile(mergeIndexPath).getSize(); + } + for (String indexFile : carbonindexFiles) { + String indexPath = + fileStore.getTablePath() + entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + + indexFile; + carbonIndexSize += FileFactory.getCarbonFile(indexPath).getSize(); + } + } + return carbonIndexSize; + } + // Get the total size of carbon data and the total size of carbon index public static HashMap<String, Long> getDataSizeAndIndexSize(String tablePath, Segment segment) throws IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb604fdb/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java index c293064..80a46cb 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java @@ -114,11 +114,11 @@ public class CarbonIndexFileMergeWriter { private String writeMergeIndexFileBasedOnSegmentFile( String segmentId, List<String> indexFileNamesTobeAdded, - SegmentFileStore sfs, CarbonFile[] indexFiles) throws IOException { + SegmentFileStore segmentFileStore, CarbonFile[] indexFiles) throws IOException { SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); fileStore - .readAllIIndexOfSegment(sfs.getSegmentFile(), sfs.getTablePath(), SegmentStatus.SUCCESS, - true); + .readAllIIndexOfSegment(segmentFileStore.getSegmentFile(), segmentFileStore.getTablePath(), + SegmentStatus.SUCCESS, true); Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath(); Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>(); for (Map.Entry<String, byte[]> entry: indexMap.entrySet()) { @@ -133,11 +133,12 @@ public class CarbonIndexFileMergeWriter { for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) { String mergeIndexFile = writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), entry.getValue(), segmentId); - for (Map.Entry<String, SegmentFileStore.FolderDetails> segentry : sfs.getLocationMap() - .entrySet()) { + for (Map.Entry<String, SegmentFileStore.FolderDetails> segentry : segmentFileStore + .getLocationMap().entrySet()) { String location = segentry.getKey(); if (segentry.getValue().isRelative()) { - location = sfs.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location; + location = + segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location; } if (new Path(entry.getKey()).equals(new Path(location))) { segentry.getValue().setMergeFileName(mergeIndexFile); @@ -153,9 +154,9 @@ public class CarbonIndexFileMergeWriter { + CarbonTablePath.SEGMENT_EXT; String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName; - SegmentFileStore.writeSegmentFile(sfs.getSegmentFile(), path); + SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path); SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, newSegmentFileName, - table.getCarbonTableIdentifier().getTableId()); + table.getCarbonTableIdentifier().getTableId(), segmentFileStore); for (CarbonFile file : indexFiles) { file.delete(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb604fdb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala index b4937e6..51e46f7 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala @@ -17,15 +17,20 @@ package org.apache.carbondata.spark.testsuite.datacompaction +import org.junit.Assert + import scala.collection.JavaConverters._ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter @@ -193,6 +198,34 @@ class CarbonIndexFileMergeTestCase sql("select * from mitable").show() } + // CARBONDATA-2704, test the index file size after merge + test("Verify the size of the index file after merge") { + sql("DROP TABLE IF EXISTS fileSize") + sql( + """ + | CREATE TABLE fileSize(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE fileSize OPTIONS('header'='false')") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "fileSize") + var loadMetadataDetails = SegmentStatusManager + .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(table.getTablePath)) + var segment0 = loadMetadataDetails.filter(x=> x.getLoadName.equalsIgnoreCase("0")) + Assert + .assertEquals(getIndexOrMergeIndexFileSize(table, "0", CarbonTablePath.INDEX_FILE_EXT), + segment0.head.getIndexSize.toLong) + new CarbonIndexFileMergeWriter(table) + .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) + loadMetadataDetails = SegmentStatusManager + .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(table.getTablePath)) + segment0 = loadMetadataDetails.filter(x=> x.getLoadName.equalsIgnoreCase("0")) + Assert + .assertEquals(getIndexOrMergeIndexFileSize(table, "0", CarbonTablePath.MERGE_INDEX_FILE_EXT), + segment0.head.getIndexSize.toLong) + sql("DROP TABLE IF EXISTS fileSize") + } + private def getIndexFileCount(tableName: String, segmentNo: String): Int = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName) val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo) @@ -222,4 +255,21 @@ class CarbonIndexFileMergeTestCase } } + private def getIndexOrMergeIndexFileSize(carbonTable: CarbonTable, + segmentId: String, + fileExtension: String): Long = { + var size = 0L; + val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + val segmentFile = FileFactory.getCarbonFile(segmentPath) + val carbonFiles = segmentFile.listFiles(new CarbonFileFilter() { + override def accept(file: CarbonFile): Boolean = { + (file.getName.endsWith(fileExtension)) + } + }) + carbonFiles.toList.foreach(carbonFile => { + size += FileFactory.getCarbonFile(carbonFile.getPath).getSize + }) + size + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb604fdb/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 543ba30..40d5c0d 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -522,7 +522,9 @@ object CarbonDataRDDFactory { carbonTable.getTablePath, carbonLoadModel.getSegmentId, segmentFileName, - carbonTable.getCarbonTableIdentifier.getTableId) + carbonTable.getCarbonTableIdentifier.getTableId, + new SegmentFileStore(carbonTable.getTablePath, segmentFileName)) + operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment", carbonLoadModel.getSegmentId) val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =