ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r515750468



##########
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -368,12 +373,21 @@ public static void 
getCarbonIndexFilesRecursively(CarbonFile carbonFile,
     return carbonFile.listFiles(new CarbonFileFilter() {
       @Override
       public boolean accept(CarbonFile file) {
-        return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || 
file.getName()
-            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() 
> 0);
+        return (!oldIndexAndMergeIndexFiles.contains(file.getAbsolutePath()) 
&& (
+            file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || 
file.getName()
+                .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && 
file.getSize() > 0);
       }
     });
   }
 
+  public static List<String> getOldIndexAndMergeIndexFiles() {
+    return oldIndexAndMergeIndexFiles;
+  }
+
+  public static void setOldIndexAndMergeIndexFiles(List<String> 
oldIndexAndMergeIndexFiles) {

Review comment:
       While writing merge index file /segment file, it gets index files from 
segment directory. And during SI small files merge step, we will have 
`old.index`, `new.index` files and only `new.index` is valid for merge step and 
writing segment file. For maintable also we could store and use like this for 
normal load but in few cases like add segment/reading from external location, 
we will have to read mergeindex file to identify invalid index files. And for 
clean files, I'm getting all index files from the segment directory and 
eliminate which are not present in segment file.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -368,12 +373,21 @@ public static void 
getCarbonIndexFilesRecursively(CarbonFile carbonFile,
     return carbonFile.listFiles(new CarbonFileFilter() {

Review comment:
       ok removed

##########
File path: 
core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,19 @@ private void prepareLoadMetadata() {
     if (null == index) {

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,19 @@ private void prepareLoadMetadata() {
     if (null == index) {
       index = new LinkedList<>();
     }
+    CarbonFile[] carbonIndexFiles =
+        
index.stream().map(FileFactory::getCarbonFile).toArray(CarbonFile[]::new);
+    List<String> mergedIndexFiles =
+        SegmentFileStore.getInvalidAndMergedIndexFiles(carbonIndexFiles);
     for (String indexPath : index) {

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,18 +83,30 @@ public 
TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
   @Override
   public Map<String, String> getCommittedIndexFile(Segment segment) throws 
IOException {
     Map<String, String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
+    SegmentFileStore fileStore =
+        new SegmentFileStore(identifier.getTablePath(), 
segment.getSegmentFileName());
+    if (fileStore.getSegmentFile() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segment.getSegmentNo());
       indexFiles = new 
SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
     } else {
-      SegmentFileStore fileStore =
-          new SegmentFileStore(identifier.getTablePath(), 
segment.getSegmentFileName());
       indexFiles = fileStore.getIndexOrMergeFiles();
       if (fileStore.getSegmentFile() != null) {
         
segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
       }
     }
+    List<String> index = new ArrayList<>(indexFiles.keySet());
+    CarbonFile[] carbonIndexFiles =
+        
index.stream().map(FileFactory::getCarbonFile).toArray(CarbonFile[]::new);

Review comment:
       done. For `getInvalidAndMergedIndexFiles`, it takes `List<String>` as 
argument and returns `List<String>`. 

##########
File path: 
core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -95,22 +97,20 @@ private String mergeCarbonIndexFilesOfSegment(String 
segmentId,
               indexFilesInPartition.add(indexCarbonFile);
             }
           }
-          indexFiles = indexFilesInPartition.toArray(new 
CarbonFile[indexFilesInPartition.size()]);
+          indexFiles = indexFilesInPartition;
         } else {
-          indexFiles = indexCarbonFiles.toArray(new 
CarbonFile[indexCarbonFiles.size()]);
+          indexFiles = indexCarbonFiles;
         }
+      }
+      if (indexFiles.isEmpty() || indexFileNamesTobeAdded != null) {
+        return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+            readFileFooterFromCarbonDataFile, segmentPath,
+            indexFiles.toArray(new CarbonFile[indexFiles.size()]), segmentId);
       } else {
-        indexFiles =
+        CarbonFile[] indexFilesFromFile =

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -95,22 +97,20 @@ private String mergeCarbonIndexFilesOfSegment(String 
segmentId,
               indexFilesInPartition.add(indexCarbonFile);
             }
           }
-          indexFiles = indexFilesInPartition.toArray(new 
CarbonFile[indexFilesInPartition.size()]);
+          indexFiles = indexFilesInPartition;
         } else {
-          indexFiles = indexCarbonFiles.toArray(new 
CarbonFile[indexCarbonFiles.size()]);
+          indexFiles = indexCarbonFiles;
         }
+      }
+      if (indexFiles.isEmpty() || indexFileNamesTobeAdded != null) {
+        return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+            readFileFooterFromCarbonDataFile, segmentPath,
+            indexFiles.toArray(new CarbonFile[indexFiles.size()]), segmentId);

Review comment:
       done

##########
File path: 
integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
##########
@@ -188,8 +188,22 @@ class PrestoInsertIntoTableTestCase
     val absoluteTableIdentifier: AbsoluteTableIdentifier = 
getAbsoluteIdentifier("testdb",
       "testtable")
     val carbonTable = 
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
-    val segmentFoldersLocation = 
CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
-    
assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size()
 == 8)
+    val segment0 = CarbonTablePath
+      .getSegmentPath(carbonTable.getAbsoluteTableIdentifier.getTablePath, "0")

Review comment:
       done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -310,20 +284,22 @@ object SecondaryIndexUtil {
   /**
    * This method deletes the old index files or merge index file after data 
files merge
    */
-  private def deleteOldIndexOrMergeIndexFiles(
+  private def collectOldIndexOrMergeIndexFiles(
       factTimeStamp: Long,
       validSegments: util.List[Segment],
       indexCarbonTable: CarbonTable): Unit = {
+    val oldIndexAndMergeIndexFiles = new util.ArrayList[String]
     // delete the index/merge index carbonFile of old data files

Review comment:
       done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -310,20 +284,22 @@ object SecondaryIndexUtil {
   /**
    * This method deletes the old index files or merge index file after data 
files merge
    */
-  private def deleteOldIndexOrMergeIndexFiles(
+  private def collectOldIndexOrMergeIndexFiles(
       factTimeStamp: Long,
       validSegments: util.List[Segment],
       indexCarbonTable: CarbonTable): Unit = {
+    val oldIndexAndMergeIndexFiles = new util.ArrayList[String]
     // delete the index/merge index carbonFile of old data files
     validSegments.asScala.foreach { segment =>
       SegmentFileStore.getIndexFilesListForSegment(segment, 
indexCarbonTable.getTablePath)
         .asScala
         .foreach { indexFile =>
           if (DataFileUtil.getTimeStampFromFileName(indexFile).toLong < 
factTimeStamp) {
-            FileFactory.getCarbonFile(indexFile).delete()
+            oldIndexAndMergeIndexFiles.add(indexFile)
           }
         }
     }
+    
SegmentIndexFileStore.setOldIndexAndMergeIndexFiles(oldIndexAndMergeIndexFiles)

Review comment:
       modified one existing test case and added one to check data file merge 
without enabling index merge property.

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new 
SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList

Review comment:
       `deleteLoadsAndUpdateMetadata` reads loadmetadata and is called from 
multiple places. so I think it's better to read inside method.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -352,27 +343,20 @@ object SecondaryIndexCreator {
       var tableStatusUpdateForFailure = false
 
       if (successSISegments.nonEmpty && !isCompactionCall) {
-        tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
-          successSISegments,
-          secondaryIndexModel.carbonLoadModel.getDatabaseName,
-          secondaryIndexModel.secondaryIndex.indexName,
-          SegmentStatus.INSERT_IN_PROGRESS,
-          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
-          segmentToLoadStartTimeMap,
-          indexCarbonTable,
-          secondaryIndexModel.sqlContext.sparkSession)
-
         // merge index files for success segments in case of only load
         
CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
           successSISegments,
           segmentToLoadStartTimeMap,
           indexCarbonTable.getTablePath,
           indexCarbonTable, mergeIndexProperty = false)
-
         val loadMetadataDetails = SegmentStatusManager
           .readLoadMetadata(indexCarbonTable.getMetadataPath)
           .filter(loadMetadataDetail => 
successSISegments.contains(loadMetadataDetail.getLoadName))
-
+        for (loadMetadata <- loadMetadataDetails) {

Review comment:
       Done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -464,6 +472,38 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  public static List<String> getMergedIndexFiles(CarbonFile[] indexFiles) 
throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    List<String> mergedIndexFiles = new ArrayList<>();
+    long lastModifiedTime = 0L;
+    long length = 0L;
+    CarbonFile validMergeIndexFile = null;
+    List<CarbonFile> mergeIndexCarbonFiles = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {
+      if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(file.getCanonicalPath());
+        Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+            indexFileStore.getCarbonMergeFileToIndexFilesMap();
+        
mergedIndexFiles.addAll(carbonMergeFileToIndexFilesMap.get(file.getCanonicalPath()));
+        // In case there are more than 1 mergeindex files present, get the 
latest one.
+        if (file.getLastModifiedTime() > lastModifiedTime || file.getLength() 
> length) {

Review comment:
       done

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new 
SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList
+    validSegments.foreach( segment => {
+      val sfs: SegmentFileStore = new 
SegmentFileStore(carbonTable.getTablePath,
+        segment.getSegmentFileName)
+      var indexFiles = List[CarbonFile]()
+      if (carbonTable.isHivePartitionTable) {
+        val partitionSpecs = sfs.getPartitionSpecs.asScala.toList
+        val segmentName = 
segment.getSegmentFileName.replace(CarbonTablePath.SEGMENT_EXT, "")
+        for (partitionSpec <- partitionSpecs) {
+          var carbonIndexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(partitionSpec.getLocation.toString, 
FileFactory.getConfiguration)
+            .toList
+          carbonIndexFiles = carbonIndexFiles.filter(x => x.getAbsolutePath
+            .contains(segmentName.substring(
+              segmentName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1, 
segmentName.length)))
+          indexFiles = indexFiles ++ carbonIndexFiles
+          cleanUpIndexFilesForSegment(sfs, indexFiles)
+        }
+      } else {
+        val segmentPath: String = 
carbonTable.getSegmentPath(segment.getSegmentNo)
+        val carbonIndexFiles = SegmentIndexFileStore
+          .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration)

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -327,6 +315,21 @@ public static boolean writeSegmentFile(CarbonTable 
carbonTable, Segment segment)
     return false;
   }
 
+  public static void setIndexFileNamesToFolderDetails(FolderDetails 
folderDetails,
+      CarbonFile[] indexFiles) throws IOException {
+    List<String> mergedAndInvalidIndexFiles = 
getInvalidAndMergedIndexFiles(indexFiles);
+    for (CarbonFile file : indexFiles) {
+      // do not include already merged index files details in segment file.
+      if (!mergedAndInvalidIndexFiles.contains(file.getName())) {

Review comment:
       I have changed the method from `getInvalidAndMergedIndexFiles` to 
`getValidCarbonIndexFiles`. In `getValidCarbonIndexFiles`, I have added the 
empty list check.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
##########
@@ -79,18 +83,30 @@ public 
TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
   @Override
   public Map<String, String> getCommittedIndexFile(Segment segment) throws 
IOException {
     Map<String, String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
+    SegmentFileStore fileStore =
+        new SegmentFileStore(identifier.getTablePath(), 
segment.getSegmentFileName());
+    if (fileStore.getSegmentFile() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segment.getSegmentNo());
       indexFiles = new 
SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
     } else {
-      SegmentFileStore fileStore =
-          new SegmentFileStore(identifier.getTablePath(), 
segment.getSegmentFileName());
       indexFiles = fileStore.getIndexOrMergeFiles();
       if (fileStore.getSegmentFile() != null) {
         
segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
       }
     }
+    List<String> index = new ArrayList<>(indexFiles.keySet());

Review comment:
       without a list, getting concurrent modification exception. made the list 
to null at the end.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1366,18 +1412,10 @@ public static void removeTempFolder(Map<String, 
FolderDetails> locationMap, Stri
   /**
    * This method returns the list of index/merge index files for a segment in 
carbonTable.
    */
-  public static Set<String> getIndexFilesListForSegment(Segment segment, 
String tablePath)
-      throws IOException {
+  public static Set<String> getIndexFilesListForSegment(Segment segment, 
String tablePath) {
     Set<String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
-      String segmentPath = CarbonTablePath.getSegmentPath(tablePath, 
segment.getSegmentNo());
-      indexFiles =
-          new 
SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(segmentPath).keySet();
-    } else {
-      SegmentFileStore segmentFileStore =

Review comment:
       reverted and modified to collect invalid index files also.
          

##########
File path: 
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -361,6 +361,11 @@ private CarbonCommonConstants() {
 
   public static final String CARBON_INDEX_SCHEMA_STORAGE_DATABASE = "DATABASE";
 
+  @CarbonProperty
+  public static final String CARBON_INDEXFILES_DELETETIME = 
"carbon.index.delete.time";

Review comment:
       Done. Removed `CARBON_INDEXFILES_DELETETIME` and modified to use 
`max.query.execution.time`

##########
File path: 
core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
##########
@@ -139,11 +141,20 @@ private void prepareLoadMetadata() {
     if (null == index) {
       index = new LinkedList<>();
     }
+    CarbonFile[] indexFiles = new CarbonFile[index.size()];
+    for (int i = 0; i < index.size(); i++) {
+      indexFiles[i] = FileFactory.getCarbonFile(index.get(i));
+    }
+    List<String> mergedIndexFiles = 
SegmentFileStore.getMergedIndexFiles(indexFiles);
     for (String indexPath : index) {
       if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {

Review comment:
       Done.

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -353,6 +356,79 @@ object CarbonStore {
     }
   }
 
+  /**
+   * Clean invalid and expired index files of carbon table.
+   *
+   * @param carbonTable CarbonTable
+   */
+  def cleanUpIndexFiles(carbonTable: CarbonTable): Unit = {
+    val validSegments = new 
SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+      .getValidAndInvalidSegments.getValidSegments.asScala.toList

Review comment:
       Done. Similarly made changes to use the same metadataDetails for 
`cleanUpDeltaFiles`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to