This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e3d02ec  [CARBONDATA-3754]avoid listing index files during SI rebuild
e3d02ec is described below

commit e3d02ec9b25ee7ca542c4bf23d6e9f283dcbeffb
Author: akashrn5 <[email protected]>
AuthorDate: Fri Apr 17 13:17:24 2020 +0530

    [CARBONDATA-3754]avoid listing index files during SI rebuild
    
    Why is this PR needed?
    List files was done during rebuild of SI, which will be costly in case of 
S3 and OBS
    
    What changes were proposed in this PR?
    avoided list files and delete files handled through segmentFileStore
    
    This closes #3719
---
 .../carbondata/core/metadata/SegmentFileStore.java | 18 +++++++++++++++
 .../secondaryindex/rdd/CarbonSIRebuildRDD.scala    | 14 -----------
 .../secondaryindex/util/SecondaryIndexUtil.scala   | 27 +++++++++++++++++++++-
 3 files changed, 44 insertions(+), 15 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 72e0425..3bf562a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -1245,6 +1245,24 @@ public class SegmentFileStore {
   }
 
   /**
+   * This method returns the list of indx/merge index files for a segment in 
carbonTable.
+   */
+  public static Set<String> getIndexFilesListForSegment(Segment segment, 
String tablePath)
+      throws IOException {
+    Set<String> indexFiles;
+    if (segment.getSegmentFileName() == null) {
+      String segmentPath = CarbonTablePath.getSegmentPath(tablePath, 
segment.getSegmentNo());
+      indexFiles =
+          new 
SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(segmentPath).keySet();
+    } else {
+      SegmentFileStore segmentFileStore =
+          new SegmentFileStore(tablePath, segment.getSegmentFileName());
+      indexFiles = segmentFileStore.getIndexOrMergeFiles().keySet();
+    }
+    return indexFiles;
+  }
+
+  /**
    * It contains the segment information like location, partitions and related 
index files
    */
   public static class SegmentFile implements Serializable {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
index 93a6838..f00dac5 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
@@ -331,20 +331,6 @@ class CarbonSIRebuildRDD[K, V](
           val carbonFile = FileFactory.getCarbonFile(split.getFilePath)
           carbonFile.delete()
         }
-
-        // delete the indexfile/merge index carbonFile of old data files
-        val segmentPath = 
FileFactory.getCarbonFile(indexTable.getSegmentPath(segmentId))
-        val indexFiles = segmentPath.listFiles(new CarbonFileFilter {
-          override def accept(carbonFile: CarbonFile): Boolean = {
-            (carbonFile.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
-             
carbonFile.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) &&
-            
DataFileUtil.getTimeStampFromFileName(carbonFile.getAbsolutePath).toLong <
-            carbonLoadModelCopy.getFactTimeStamp
-          }
-        })
-        indexFiles.foreach { indexFile =>
-          indexFile.delete()
-        }
       }
 
       private def deleteLocalDataFolders(): Unit = {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 9b2ac55..298be7a 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.secondaryindex.util
 
 import java.io.IOException
 import java.util
-import java.util.{Collections, Comparator, List}
+import java.util.{Collections, Comparator}
 
 import scala.collection.JavaConverters._
 import scala.util.control.Breaks
@@ -35,6 +35,7 @@ import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.{TableBlockInfo, 
TaskBlockInfo}
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
@@ -44,6 +45,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
@@ -197,6 +199,10 @@ object SecondaryIndexUtil {
       }
       if (finalMergeStatus) {
         if (null != mergeStatus && mergeStatus.length != 0) {
+          deleteOldIndexOrMergeIndexFiles(
+            carbonLoadModel.getFactTimeStamp,
+            validSegments,
+            indexCarbonTable)
           mergedSegments.asScala.map { seg =>
             val file = SegmentFileStore.writeSegmentFile(
               indexCarbonTable,
@@ -281,6 +287,25 @@ object SecondaryIndexUtil {
   }
 
   /**
+   * This method deletes the old index files or merge index file after data 
files merge
+   */
+  private def deleteOldIndexOrMergeIndexFiles(
+      factTimeStamp: Long,
+      validSegments: util.List[Segment],
+      indexCarbonTable: CarbonTable): Unit = {
+    // delete the index/merge index carbonFile of old data files
+    validSegments.asScala.foreach { segment =>
+      SegmentFileStore.getIndexFilesListForSegment(segment, 
indexCarbonTable.getTablePath)
+        .asScala
+        .foreach { indexFile =>
+          if (DataFileUtil.getTimeStampFromFileName(indexFile).toLong < 
factTimeStamp) {
+            FileFactory.getCarbonFile(indexFile).delete()
+          }
+        }
+    }
+  }
+
+  /**
    * Identifies the group of blocks to be merged based on the merge size.
    * This should be per segment grouping.
    *

Reply via email to