This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new ace4e60 [CARBONDATA-3754]Clean up the data file and index files after
SI rebuild
ace4e60 is described below
commit ace4e60e3f3af1ead7744607ffbb78385d59a598
Author: akashrn5 <[email protected]>
AuthorDate: Tue Mar 24 15:17:18 2020 +0530
[CARBONDATA-3754]Clean up the data file and index files after SI rebuild
Why is this PR needed?
Clean up not happening for the data file and index files after SI rebuild
What changes were proposed in this PR?
every task should clear the old data and index files once task finishes.
This closes #3676
---
.../CarbonDataFileMergeTestCaseOnSI.scala | 7 ------
.../secondaryindex/rdd/CarbonSIRebuildRDD.scala | 27 ++++++++++++++++++++--
2 files changed, 25 insertions(+), 9 deletions(-)
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index b6e3360..9eced78 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -81,7 +81,6 @@ class CarbonDataFileMergeTestCaseOnSI
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge
OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='100')")
val rows = sql("""Select count(*) from indexmerge where
name='n164419'""").collect()
- sql("clean files for table indexmerge_index1")
checkAnswer(sql("""Select count(*) from indexmerge where
name='n164419'"""), rows)
assert(getDataFileCount("indexmerge_index1", "0") < 7)
}
@@ -108,7 +107,6 @@ class CarbonDataFileMergeTestCaseOnSI
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
sql("REFRESH INDEX nonindexmerge_index1 ON TABLE nonindexmerge").collect()
checkAnswer(sql("""Select count(*) from nonindexmerge where
name='n164419'"""), rows)
- sql("clean files for table nonindexmerge_index1")
assert(getDataFileCount("nonindexmerge_index1", "0") < 7)
assert(getDataFileCount("nonindexmerge_index1", "1") < 7)
checkAnswer(sql("""Select count(*) from nonindexmerge where
name='n164419'"""), rows)
@@ -136,14 +134,11 @@ class CarbonDataFileMergeTestCaseOnSI
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
sql("REFRESH INDEX nonindexmerge_index2 ON TABLE nonindexmerge WHERE
SEGMENT.ID IN(0)").collect()
checkAnswer(sql("""Select count(*) from nonindexmerge where
name='n164419'"""), rows)
- sql("clean files for table nonindexmerge_index2")
assert(getDataFileCount("nonindexmerge_index2", "0") < 7)
assert(getDataFileCount("nonindexmerge_index2", "1") == 100)
sql("REFRESH INDEX nonindexmerge_index2 ON TABLE nonindexmerge WHERE
SEGMENT.ID IN(1)").collect()
checkAnswer(sql("""Select count(*) from nonindexmerge where
name='n164419'"""), rows)
- sql("clean files for table nonindexmerge_index2")
assert(getDataFileCount("nonindexmerge_index2", "1") < 7)
- sql("clean files for table nonindexmerge_index2")
checkAnswer(sql("""Select count(*) from nonindexmerge where
name='n164419'"""), rows)
}
@@ -192,7 +187,6 @@ class CarbonDataFileMergeTestCaseOnSI
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
- sql("clean files for table nonindexmerge_index3")
assert(getDataFileCount("nonindexmerge_index3", "0.1") < 11)
checkAnswer(sql("""Select count(*) from nonindexmerge where
name='n164419'"""), rows)
CarbonProperties.getInstance()
@@ -224,7 +218,6 @@ class CarbonDataFileMergeTestCaseOnSI
sql(
"CREATE INDEX nonindexmerge_index4 on table nonindexmerge (name) AS
'carbondata' " +
"properties('table_blocksize'='1')")
- sql("clean files for table nonindexmerge_index4")
assert(getDataFileCount("nonindexmerge_index4", "0.2") < 15)
checkAnswer(sql("""Select count(*) from nonindexmerge where
name='n164419'"""), rows)
CarbonProperties.getInstance()
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
index 05c6e96..2399a45 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
@@ -37,12 +37,15 @@ import
org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants,
SortScopeOptions}
import org.apache.carbondata.core.datastore.block.{SegmentProperties,
TaskBlockInfo}
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.api.CarbonInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -252,7 +255,7 @@ class CarbonSIRebuildRDD[K, V](
// add task completion listener to clean up the resources
context.addTaskCompletionListener { _ =>
- close()
+ close(splitList)
}
try {
// fire a query and get the results.
@@ -308,7 +311,7 @@ class CarbonSIRebuildRDD[K, V](
throw e
}
- private def close(): Unit = {
+ private def close(splits: util.List[CarbonInputSplit]): Unit = {
deleteLocalDataFolders()
// close all the query executor service and clean up memory acquired
during query processing
if (null != exec) {
@@ -321,6 +324,26 @@ class CarbonSIRebuildRDD[K, V](
LOGGER.info("Closing compaction processor instance to clean up
loading resources")
processor.close()
}
+
+ // delete all the old data files which are used for merging
+ splits.asScala.foreach { split =>
+ val carbonFile = FileFactory.getCarbonFile(split.getFilePath)
+ carbonFile.delete()
+ }
+
+ // delete the indexfile/merge index carbonFile of old data files
+ val segmentPath =
FileFactory.getCarbonFile(indexTable.getSegmentPath(segmentId))
+ val indexFiles = segmentPath.listFiles(new CarbonFileFilter {
+ override def accept(carbonFile: CarbonFile): Boolean = {
+ (carbonFile.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+
carbonFile.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) &&
+
DataFileUtil.getTimeStampFromFileName(carbonFile.getAbsolutePath).toLong <
+ carbonLoadModelCopy.getFactTimeStamp
+ }
+ })
+ indexFiles.foreach { indexFile =>
+ indexFile.delete()
+ }
}
private def deleteLocalDataFolders(): Unit = {