Github user gvramana commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2128#discussion_r180109707
  
    --- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
 ---
    @@ -151,13 +153,82 @@ object CarbonStore {
             }
           }
         } finally {
    +      if (currentTablePartitions.equals(None)) {
    +        cleanUpPartitionFoldersRecurssively(carbonTable, 
List.empty[PartitionSpec])
    +      } else {
    +        cleanUpPartitionFoldersRecurssively(carbonTable, 
currentTablePartitions.get.toList)
    +      }
    +
           if (carbonCleanFilesLock != null) {
             CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
           }
         }
         LOGGER.audit(s"Clean files operation is success for 
$dbName.$tableName.")
       }
     
    +  /**
    +   * delete partition folders recurssively
    +   *
    +   * @param carbonTable
    +   * @param partitionSpecList
    +   */
    +  def cleanUpPartitionFoldersRecurssively(carbonTable: CarbonTable,
    +      partitionSpecList: List[PartitionSpec]): Unit = {
    +    if (carbonTable != null) {
    +      val loadMetadataDetails = SegmentStatusManager
    +        .readLoadMetadata(carbonTable.getMetadataPath)
    +
    +      val fileType = FileFactory.getFileType(carbonTable.getTablePath)
    +      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath, 
fileType)
    +
    +      // list all files from table path
    +      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
    +      loadMetadataDetails.foreach { metadataDetail =>
    +        if 
(metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
    +            metadataDetail.getSegmentFile == null) {
    +          val loadStartTime: Long = metadataDetail.getLoadStartTime
    +          // delete all files of @loadStartTime from tablepath
    +          cleanPartitionFolder(listOfDefaultPartFilesIterator, 
loadStartTime)
    +          partitionSpecList.foreach {
    +            partitionSpec =>
    +              val partitionLocation = partitionSpec.getLocation
    +              // For partition folder outside the tablePath
    +              if 
(!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
    +                val fileType = 
FileFactory.getFileType(partitionLocation.toString)
    +                val partitionCarbonFile = FileFactory
    +                  .getCarbonFile(partitionLocation.toString, fileType)
    +                // list all files from partitionLoacation
    +                val listOfExternalPartFilesIterator = 
partitionCarbonFile.listFiles(true)
    +                // delete all files of @loadStartTime from externalPath
    +                cleanPartitionFolder(listOfExternalPartFilesIterator, 
loadStartTime)
    +              }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   *
    +   * @param carbonFiles
    +   * @param timestamp
    +   */
    +  private def cleanPartitionFolder(carbonFiles: Array[CarbonFile],
    +      timestamp: Long): Unit = {
    +    carbonFiles.foreach {
    +      carbonFile =>
    +        val filePath = carbonFile.getPath
    +        val fileName = carbonFile.getName
    +        if (fileName.lastIndexOf("-") > 0 && fileName.lastIndexOf(".") > 
0) {
    +          if (fileName.substring(fileName.lastIndexOf("-") + 1, 
fileName.lastIndexOf("."))
    --- End diff --
    
    move getCarbonFileTimeStamp function can be moved to CarbonTablePath
    Change function name to cleanCarbonFilesInFolder


---

Reply via email to