vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r516800475



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -80,40 +112,96 @@ case class CarbonCleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // if insert overwrite in progress, do not allow delete segment
-    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", 
"clean file")
-    }
-    val operationContext = new OperationContext
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, 
operationContext)
-    if (tableName.isDefined) {
-      Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
-      if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get)
+    if (!isDryRun) {
+      // if insert overwrite in progress, do not allow delete segment
+      if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "insert 
overwrite", "clean file")
+      }
+      val operationContext = new OperationContext
+      val cleanFilesPreEvent: CleanFilesPreEvent = 
CleanFilesPreEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, 
operationContext)
+      if (tableName.isDefined) {
+        Checker.validateTableExists(databaseNameOp, tableName.get, 
sparkSession)
+        if (forceTrashClean) {
+          CleanFilesUtil.deleteDataFromTrashFolder(carbonTable, sparkSession)
+        } else {
+          // clear trash based on timestamp
+          CleanFilesUtil.deleteDataFromTrashFolderByTimeStamp(carbonTable, 
sparkSession)
+        }
+        if (forceTableClean) {
+          deleteAllData(sparkSession, databaseNameOp, tableName.get)
+        } else {
+          cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        }
+        // delete partial load and send them to trash
+        TableProcessingOperations
+          .deletePartialLoadDataIfExist(carbonTable, false)
+        // clean stash in metadata folder too
+        deleteStashInMetadataFolder(carbonTable)
       } else {
-        cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        cleanGarbageDataInAllTables(sparkSession)
+      }
+      if (cleanFileCommands != null) {
+        cleanFileCommands.foreach(_.processData(sparkSession))
       }
+      val cleanFilesPostEvent: CleanFilesPostEvent =
+        CleanFilesPostEvent(carbonTable, sparkSession)
+      OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, 
operationContext)
+      Seq.empty
+    } else if (isDryRun && tableName.isDefined) {
+      // dry run, do not clean anything and do not delete trash too
+      CleanFilesUtil.cleanFilesDryRun(carbonTable, sparkSession)
     } else {
-      cleanGarbageDataInAllTables(sparkSession)
+      Seq.empty
     }
-    if (cleanFileCommands != null) {
-      cleanFileCommands.foreach(_.processData(sparkSession))
+  }
+
+  // This method deletes the stale segment files in the segment folder.
+  def deleteStashInMetadataFolder(carbonTable: CarbonTable): Unit = {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to