This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new e3d02ec [CARBONDATA-3754]avoid listing index files during SI rebuild
e3d02ec is described below
commit e3d02ec9b25ee7ca542c4bf23d6e9f283dcbeffb
Author: akashrn5 <[email protected]>
AuthorDate: Fri Apr 17 13:17:24 2020 +0530
[CARBONDATA-3754]avoid listing index files during SI rebuild
Why is this PR needed?
List files was done during rebuild of SI, which will be costly in case of
S3 and OBS
What changes were proposed in this PR?
avoided list files and delete files handled through segmentFileStore
This closes #3719
---
.../carbondata/core/metadata/SegmentFileStore.java | 18 +++++++++++++++
.../secondaryindex/rdd/CarbonSIRebuildRDD.scala | 14 -----------
.../secondaryindex/util/SecondaryIndexUtil.scala | 27 +++++++++++++++++++++-
3 files changed, 44 insertions(+), 15 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 72e0425..3bf562a 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -1245,6 +1245,24 @@ public class SegmentFileStore {
}
/**
+ * This method returns the list of indx/merge index files for a segment in
carbonTable.
+ */
+ public static Set<String> getIndexFilesListForSegment(Segment segment,
String tablePath)
+ throws IOException {
+ Set<String> indexFiles;
+ if (segment.getSegmentFileName() == null) {
+ String segmentPath = CarbonTablePath.getSegmentPath(tablePath,
segment.getSegmentNo());
+ indexFiles =
+ new
SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(segmentPath).keySet();
+ } else {
+ SegmentFileStore segmentFileStore =
+ new SegmentFileStore(tablePath, segment.getSegmentFileName());
+ indexFiles = segmentFileStore.getIndexOrMergeFiles().keySet();
+ }
+ return indexFiles;
+ }
+
+ /**
* It contains the segment information like location, partitions and related
index files
*/
public static class SegmentFile implements Serializable {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
index 93a6838..f00dac5 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
@@ -331,20 +331,6 @@ class CarbonSIRebuildRDD[K, V](
val carbonFile = FileFactory.getCarbonFile(split.getFilePath)
carbonFile.delete()
}
-
- // delete the indexfile/merge index carbonFile of old data files
- val segmentPath =
FileFactory.getCarbonFile(indexTable.getSegmentPath(segmentId))
- val indexFiles = segmentPath.listFiles(new CarbonFileFilter {
- override def accept(carbonFile: CarbonFile): Boolean = {
- (carbonFile.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
-
carbonFile.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) &&
-
DataFileUtil.getTimeStampFromFileName(carbonFile.getAbsolutePath).toLong <
- carbonLoadModelCopy.getFactTimeStamp
- }
- })
- indexFiles.foreach { indexFile =>
- indexFile.delete()
- }
}
private def deleteLocalDataFolders(): Unit = {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 9b2ac55..298be7a 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.secondaryindex.util
import java.io.IOException
import java.util
-import java.util.{Collections, Comparator, List}
+import java.util.{Collections, Comparator}
import scala.collection.JavaConverters._
import scala.util.control.Breaks
@@ -35,6 +35,7 @@ import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.block.{TableBlockInfo,
TaskBlockInfo}
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
@@ -44,6 +45,7 @@ import
org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.indexserver.IndexServer
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema,
CarbonLoadModel}
@@ -197,6 +199,10 @@ object SecondaryIndexUtil {
}
if (finalMergeStatus) {
if (null != mergeStatus && mergeStatus.length != 0) {
+ deleteOldIndexOrMergeIndexFiles(
+ carbonLoadModel.getFactTimeStamp,
+ validSegments,
+ indexCarbonTable)
mergedSegments.asScala.map { seg =>
val file = SegmentFileStore.writeSegmentFile(
indexCarbonTable,
@@ -281,6 +287,25 @@ object SecondaryIndexUtil {
}
/**
+ * This method deletes the old index files or merge index file after data
files merge
+ */
+ private def deleteOldIndexOrMergeIndexFiles(
+ factTimeStamp: Long,
+ validSegments: util.List[Segment],
+ indexCarbonTable: CarbonTable): Unit = {
+ // delete the index/merge index carbonFile of old data files
+ validSegments.asScala.foreach { segment =>
+ SegmentFileStore.getIndexFilesListForSegment(segment,
indexCarbonTable.getTablePath)
+ .asScala
+ .foreach { indexFile =>
+ if (DataFileUtil.getTimeStampFromFileName(indexFile).toLong <
factTimeStamp) {
+ FileFactory.getCarbonFile(indexFile).delete()
+ }
+ }
+ }
+ }
+
+ /**
* Identifies the group of blocks to be merged based on the merge size.
* This should be per segment grouping.
*