[CARBONDATA-2704] Index file size in describe formatted command is not updated 
correctly with the segment file

Problem:
Describe formatted command is not showing correct index files size after index 
files merge.
Solution:
Segment file should be updated with the actual index files size of that segment 
after index files merge.

This closes #2462


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eb604fdb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eb604fdb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eb604fdb

Branch: refs/heads/carbonstore
Commit: eb604fdb73983dfe9396d488a51907d90ed51d3e
Parents: cdee81d
Author: dhatchayani <dhatcha.offic...@gmail.com>
Authored: Mon Jul 9 11:19:51 2018 +0530
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Sun Jul 15 20:34:32 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/SegmentFileStore.java         |  4 +-
 .../apache/carbondata/core/util/CarbonUtil.java | 48 ++++++++++++-------
 .../core/writer/CarbonIndexFileMergeWriter.java | 17 +++----
 .../CarbonIndexFileMergeTestCase.scala          | 50 ++++++++++++++++++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  4 +-
 5 files changed, 96 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb604fdb/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 3d3b245..ce79e65 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -281,7 +281,7 @@ public class SegmentFileStore {
    * @throws IOException
    */
   public static boolean updateSegmentFile(String tablePath, String segmentId, 
String segmentFile,
-      String tableId) throws IOException {
+      String tableId, SegmentFileStore segmentFileStore) throws IOException {
     boolean status = false;
     String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
     if (!FileFactory.isFileExist(tableStatusPath)) {
@@ -308,6 +308,8 @@ public class SegmentFileStore {
           // if the segments is in the list of marked for delete then update 
the status.
           if (segmentId.equals(detail.getLoadName())) {
             detail.setSegmentFile(segmentFile);
+            detail.setIndexSize(String.valueOf(CarbonUtil
+                .getCarbonIndexSize(segmentFileStore, 
segmentFileStore.getLocationMap())));
             break;
           }
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb604fdb/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index e87e52c..9796696 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2647,23 +2647,7 @@ public final class CarbonUtil {
       fileStore.readIndexFiles();
       Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
       // get the size of carbonindex file
-      for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : 
locationMap.entrySet()) {
-        SegmentFileStore.FolderDetails folderDetails = entry.getValue();
-        Set<String> carbonindexFiles = folderDetails.getFiles();
-        String mergeFileName = folderDetails.getMergeFileName();
-        if (null != mergeFileName) {
-          String mergeIndexPath =
-              fileStore.getTablePath() + entry.getKey() + 
CarbonCommonConstants.FILE_SEPARATOR
-                  + mergeFileName;
-          carbonIndexSize += 
FileFactory.getCarbonFile(mergeIndexPath).getSize();
-        }
-        for (String indexFile : carbonindexFiles) {
-          String indexPath =
-              fileStore.getTablePath() + entry.getKey() + 
CarbonCommonConstants.FILE_SEPARATOR
-                  + indexFile;
-          carbonIndexSize += FileFactory.getCarbonFile(indexPath).getSize();
-        }
-      }
+      carbonIndexSize = getCarbonIndexSize(fileStore, locationMap);
       for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
         // get the size of carbondata files
         for (String blockFile : entry.getValue()) {
@@ -2676,6 +2660,36 @@ public final class CarbonUtil {
     return dataAndIndexSize;
   }
 
+  /**
+   * Calcuate the index files size of the segment
+   *
+   * @param fileStore
+   * @param locationMap
+   * @return
+   */
+  public static long getCarbonIndexSize(SegmentFileStore fileStore,
+      Map<String, SegmentFileStore.FolderDetails> locationMap) {
+    long carbonIndexSize = 0L;
+    for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : 
locationMap.entrySet()) {
+      SegmentFileStore.FolderDetails folderDetails = entry.getValue();
+      Set<String> carbonindexFiles = folderDetails.getFiles();
+      String mergeFileName = folderDetails.getMergeFileName();
+      if (null != mergeFileName) {
+        String mergeIndexPath =
+            fileStore.getTablePath() + entry.getKey() + 
CarbonCommonConstants.FILE_SEPARATOR
+                + mergeFileName;
+        carbonIndexSize += FileFactory.getCarbonFile(mergeIndexPath).getSize();
+      }
+      for (String indexFile : carbonindexFiles) {
+        String indexPath =
+            fileStore.getTablePath() + entry.getKey() + 
CarbonCommonConstants.FILE_SEPARATOR
+                + indexFile;
+        carbonIndexSize += FileFactory.getCarbonFile(indexPath).getSize();
+      }
+    }
+    return carbonIndexSize;
+  }
+
   // Get the total size of carbon data and the total size of carbon index
   public static HashMap<String, Long> getDataSizeAndIndexSize(String tablePath,
       Segment segment) throws IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb604fdb/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index c293064..80a46cb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -114,11 +114,11 @@ public class CarbonIndexFileMergeWriter {
   private String writeMergeIndexFileBasedOnSegmentFile(
       String segmentId,
       List<String> indexFileNamesTobeAdded,
-      SegmentFileStore sfs, CarbonFile[] indexFiles) throws IOException {
+      SegmentFileStore segmentFileStore, CarbonFile[] indexFiles) throws 
IOException {
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
     fileStore
-        .readAllIIndexOfSegment(sfs.getSegmentFile(), sfs.getTablePath(), 
SegmentStatus.SUCCESS,
-            true);
+        .readAllIIndexOfSegment(segmentFileStore.getSegmentFile(), 
segmentFileStore.getTablePath(),
+            SegmentStatus.SUCCESS, true);
     Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath();
     Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>();
     for (Map.Entry<String, byte[]> entry: indexMap.entrySet()) {
@@ -133,11 +133,12 @@ public class CarbonIndexFileMergeWriter {
     for (Map.Entry<String, Map<String, byte[]>> entry : 
indexLocationMap.entrySet()) {
       String mergeIndexFile =
           writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), 
entry.getValue(), segmentId);
-      for (Map.Entry<String, SegmentFileStore.FolderDetails> segentry : 
sfs.getLocationMap()
-          .entrySet()) {
+      for (Map.Entry<String, SegmentFileStore.FolderDetails> segentry : 
segmentFileStore
+          .getLocationMap().entrySet()) {
         String location = segentry.getKey();
         if (segentry.getValue().isRelative()) {
-          location = sfs.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR 
+ location;
+          location =
+              segmentFileStore.getTablePath() + 
CarbonCommonConstants.FILE_SEPARATOR + location;
         }
         if (new Path(entry.getKey()).equals(new Path(location))) {
           segentry.getValue().setMergeFileName(mergeIndexFile);
@@ -153,9 +154,9 @@ public class CarbonIndexFileMergeWriter {
             + CarbonTablePath.SEGMENT_EXT;
     String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
-    SegmentFileStore.writeSegmentFile(sfs.getSegmentFile(), path);
+    SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
     SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, 
newSegmentFileName,
-        table.getCarbonTableIdentifier().getTableId());
+        table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
 
     for (CarbonFile file : indexFiles) {
       file.delete();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb604fdb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index b4937e6..51e46f7 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -17,15 +17,20 @@
 
 package org.apache.carbondata.spark.testsuite.datacompaction
 
+import org.junit.Assert
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 
@@ -193,6 +198,34 @@ class CarbonIndexFileMergeTestCase
     sql("select * from mitable").show()
   }
 
+  // CARBONDATA-2704, test the index file size after merge
+  test("Verify the size of the index file after merge") {
+    sql("DROP TABLE IF EXISTS fileSize")
+    sql(
+      """
+        | CREATE TABLE fileSize(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE fileSize 
OPTIONS('header'='false')")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"fileSize")
+    var loadMetadataDetails = SegmentStatusManager
+      
.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(table.getTablePath))
+    var segment0 = loadMetadataDetails.filter(x=> 
x.getLoadName.equalsIgnoreCase("0"))
+    Assert
+      .assertEquals(getIndexOrMergeIndexFileSize(table, "0", 
CarbonTablePath.INDEX_FILE_EXT),
+        segment0.head.getIndexSize.toLong)
+    new CarbonIndexFileMergeWriter(table)
+      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
+    loadMetadataDetails = SegmentStatusManager
+      
.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(table.getTablePath))
+    segment0 = loadMetadataDetails.filter(x=> 
x.getLoadName.equalsIgnoreCase("0"))
+    Assert
+      .assertEquals(getIndexOrMergeIndexFileSize(table, "0", 
CarbonTablePath.MERGE_INDEX_FILE_EXT),
+        segment0.head.getIndexSize.toLong)
+    sql("DROP TABLE IF EXISTS fileSize")
+  }
+
   private def getIndexFileCount(tableName: String, segmentNo: String): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName)
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentNo)
@@ -222,4 +255,21 @@ class CarbonIndexFileMergeTestCase
     }
   }
 
+  private def getIndexOrMergeIndexFileSize(carbonTable: CarbonTable,
+      segmentId: String,
+      fileExtension: String): Long = {
+    var size = 0L;
+    val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentId)
+    val segmentFile = FileFactory.getCarbonFile(segmentPath)
+    val carbonFiles = segmentFile.listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = {
+        (file.getName.endsWith(fileExtension))
+      }
+    })
+    carbonFiles.toList.foreach(carbonFile => {
+      size += FileFactory.getCarbonFile(carbonFile.getPath).getSize
+    })
+    size
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb604fdb/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 543ba30..40d5c0d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -522,7 +522,9 @@ object CarbonDataRDDFactory {
         carbonTable.getTablePath,
         carbonLoadModel.getSegmentId,
         segmentFileName,
-        carbonTable.getCarbonTableIdentifier.getTableId)
+        carbonTable.getCarbonTableIdentifier.getTableId,
+        new SegmentFileStore(carbonTable.getTablePath, segmentFileName))
+
       operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
         carbonLoadModel.getSegmentId)
       val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =

Reply via email to