Github user rahulforallp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2128#discussion_r180156365
--- Diff:
integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
---
@@ -151,13 +153,82 @@ object CarbonStore {
}
}
} finally {
+ if (currentTablePartitions.equals(None)) {
+ cleanUpPartitionFoldersRecurssively(carbonTable,
List.empty[PartitionSpec])
+ } else {
+ cleanUpPartitionFoldersRecurssively(carbonTable,
currentTablePartitions.get.toList)
+ }
+
if (carbonCleanFilesLock != null) {
CarbonLockUtil.fileUnlock(carbonCleanFilesLock,
LockUsage.CLEAN_FILES_LOCK)
}
}
LOGGER.audit(s"Clean files operation is success for
$dbName.$tableName.")
}
+ /**
+ * delete partition folders recurssively
+ *
+ * @param carbonTable
+ * @param partitionSpecList
+ */
+ def cleanUpPartitionFoldersRecurssively(carbonTable: CarbonTable,
+ partitionSpecList: List[PartitionSpec]): Unit = {
+ if (carbonTable != null) {
+ val loadMetadataDetails = SegmentStatusManager
+ .readLoadMetadata(carbonTable.getMetadataPath)
+
+ val fileType = FileFactory.getFileType(carbonTable.getTablePath)
+ val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath,
fileType)
+
+ // list all files from table path
+ val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+ loadMetadataDetails.foreach { metadataDetail =>
+ if
(metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+ metadataDetail.getSegmentFile == null) {
+ val loadStartTime: Long = metadataDetail.getLoadStartTime
+ // delete all files of @loadStartTime from tablepath
+ cleanPartitionFolder(listOfDefaultPartFilesIterator,
loadStartTime)
+ partitionSpecList.foreach {
+ partitionSpec =>
+ val partitionLocation = partitionSpec.getLocation
+ // For partition folder outside the tablePath
+ if
(!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+ val fileType =
FileFactory.getFileType(partitionLocation.toString)
+ val partitionCarbonFile = FileFactory
+ .getCarbonFile(partitionLocation.toString, fileType)
+ // list all files from partitionLoacation
+ val listOfExternalPartFilesIterator =
partitionCarbonFile.listFiles(true)
+ // delete all files of @loadStartTime from externalPath
+ cleanPartitionFolder(listOfExternalPartFilesIterator,
loadStartTime)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @param carbonFiles
+ * @param timestamp
+ */
+ private def cleanPartitionFolder(carbonFiles: Array[CarbonFile],
+ timestamp: Long): Unit = {
+ carbonFiles.foreach {
+ carbonFile =>
+ val filePath = carbonFile.getPath
+ val fileName = carbonFile.getName
+ if (fileName.lastIndexOf("-") > 0 && fileName.lastIndexOf(".") >
0) {
+ if (fileName.substring(fileName.lastIndexOf("-") + 1,
fileName.lastIndexOf("."))
--- End diff --
done
Advertising
---