VenuReddy2103 commented on a change in pull request #4072:
URL: https://github.com/apache/carbondata/pull/4072#discussion_r579263541



##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -150,48 +151,92 @@ public static void copyFilesToTrash(List<String> 
filesToCopy,
 
   /**
    * The below method deletes timestamp subdirectories in the trash folder 
which have expired as
-   * per the user defined retention time
+   * per the user defined retention time. It return an array where the first 
element has the size
+   * freed from the trash folder and the second element has the remaining size 
in the trash folder
    */
-  public static void deleteExpiredDataFromTrash(String tablePath) {
+  public static long[] deleteExpiredDataFromTrash(String tablePath, Boolean 
isDryRun,
+      Boolean showStats) {
     CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
         .getTrashFolderPath(tablePath));
+    long sizeFreed = 0;
+    long trashFolderSize = 0;
     // Deleting the timestamp based subdirectories in the trashfolder by the 
given timestamp.
     try {
       if (trashFolder.isFileExist()) {
+        if (isDryRun || showStats) {
+          trashFolderSize = 
FileFactory.getDirectorySize(trashFolder.getAbsolutePath());
+        }
         CarbonFile[] timestampFolderList = trashFolder.listFiles();
+        List<CarbonFile> filesToDelete = new ArrayList<>();
         for (CarbonFile timestampFolder : timestampFolderList) {
           // If the timeStamp at which the timeStamp subdirectory has expired 
as per the user
           // defined value, delete the complete timeStamp subdirectory
-          if (timestampFolder.isDirectory() && 
isTrashRetentionTimeoutExceeded(Long
-              .parseLong(timestampFolder.getName()))) {
-            FileFactory.deleteAllCarbonFilesOfDir(timestampFolder);
-            LOGGER.info("Timestamp subfolder from the Trash folder deleted: " 
+ timestampFolder
+          if 
(isTrashRetentionTimeoutExceeded(Long.parseLong(timestampFolder.getName()))) {
+            if (timestampFolder.isDirectory()) {

Review comment:
       would `if (timestampFolder.isDirectory()) ` ever be false ?

##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -150,48 +151,92 @@ public static void copyFilesToTrash(List<String> 
filesToCopy,
 
   /**
    * The below method deletes timestamp subdirectories in the trash folder 
which have expired as
-   * per the user defined retention time
+   * per the user defined retention time. It return an array where the first 
element has the size
+   * freed from the trash folder and the second element has the remaining size 
in the trash folder
    */
-  public static void deleteExpiredDataFromTrash(String tablePath) {
+  public static long[] deleteExpiredDataFromTrash(String tablePath, Boolean 
isDryRun,
+      Boolean showStats) {
     CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
         .getTrashFolderPath(tablePath));
+    long sizeFreed = 0;
+    long trashFolderSize = 0;
     // Deleting the timestamp based subdirectories in the trashfolder by the 
given timestamp.
     try {
       if (trashFolder.isFileExist()) {
+        if (isDryRun || showStats) {
+          trashFolderSize = 
FileFactory.getDirectorySize(trashFolder.getAbsolutePath());
+        }
         CarbonFile[] timestampFolderList = trashFolder.listFiles();
+        List<CarbonFile> filesToDelete = new ArrayList<>();
         for (CarbonFile timestampFolder : timestampFolderList) {
           // If the timeStamp at which the timeStamp subdirectory has expired 
as per the user
           // defined value, delete the complete timeStamp subdirectory
-          if (timestampFolder.isDirectory() && 
isTrashRetentionTimeoutExceeded(Long
-              .parseLong(timestampFolder.getName()))) {
-            FileFactory.deleteAllCarbonFilesOfDir(timestampFolder);
-            LOGGER.info("Timestamp subfolder from the Trash folder deleted: " 
+ timestampFolder
+          if 
(isTrashRetentionTimeoutExceeded(Long.parseLong(timestampFolder.getName()))) {
+            if (timestampFolder.isDirectory()) {
+              // only calculate size in case of dry run or in case clean files 
is with show stats
+              if (isDryRun || showStats) {
+                sizeFreed += 
FileFactory.getDirectorySize(timestampFolder.getAbsolutePath());
+              }
+              filesToDelete.add(timestampFolder);
+            }
+          }
+        }
+        if (!isDryRun) {
+          for (CarbonFile carbonFile : filesToDelete) {
+            LOGGER.info("Timestamp subfolder from the Trash folder deleted: " 
+ carbonFile
                 .getAbsolutePath());
+            FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
           }
         }
       }
     } catch (IOException e) {
       LOGGER.error("Error during deleting expired timestamp folder from the 
trash folder", e);
     }
+    return new long[] {sizeFreed, trashFolderSize - sizeFreed};
   }
 
   /**
    * The below method deletes all the files and folders in the trash folder of 
a carbon table.
+   * Returns an array in which the first element contains the size freed in 
case of clean files
+   * operation or size that can be freed in case of dry run and the second 
element contains the
+   * remaining size.
    */
-  public static void emptyTrash(String tablePath) {
+  public static long[] emptyTrash(String tablePath, Boolean isDryRun, Boolean 
showStats) {
     CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
         .getTrashFolderPath(tablePath));
     // if the trash folder exists delete the contents of the trash folder
+    long sizeFreed = 0;
+    long[] sizeStatistics = new long[]{0, 0};
     try {
       if (trashFolder.isFileExist()) {
         CarbonFile[] carbonFileList = trashFolder.listFiles();
+        List<CarbonFile> filesToDelete = new ArrayList<>();
         for (CarbonFile carbonFile : carbonFileList) {
-          FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
+          //Only calculate size when it is dry run operation or when show 
statistics is
+          // true with actual operation
+          if (isDryRun || showStats) {
+            sizeFreed += 
FileFactory.getDirectorySize(carbonFile.getAbsolutePath());
+          }
+          filesToDelete.add(carbonFile);
+        }
+        sizeStatistics[0] = sizeFreed;
+        if (!isDryRun) {
+          for (CarbonFile carbonFile : filesToDelete) {
+            FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
+          }
+          LOGGER.info("Trash Folder has been emptied for table: " + tablePath);
+          if (showStats) {
+            sizeStatistics[1] = 
FileFactory.getDirectorySize(trashFolder.getAbsolutePath());
+          }
+        } else {
+          sizeStatistics[1] = 
FileFactory.getDirectorySize(trashFolder.getAbsolutePath()) -

Review comment:
       need sizeStatistics[1] when isDryRun is true and showStats is false ?

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -121,6 +176,78 @@ object DataTrashManager {
     }
   }
 
+  /**
+   * Does Clean files dry run operation on the expired segments. Returns the 
size freed
+   * during that clean files operation and also shows the remaining trash 
size, which can be
+   * cleaned after those segments are expired
+   */
+  private def dryRunOnExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean): Seq[Long] = {
+    var sizeFreed: Long = 0
+    var trashSizeRemaining: Long = 0
+    val loadMetadataDetails = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    if (SegmentStatusManager.isLoadDeletionRequired(loadMetadataDetails)) {
+      loadMetadataDetails.foreach { oneLoad =>
+        val segmentFilePath = 
CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath,
+          oneLoad.getSegmentFile)
+        if (DeleteLoadFolders.canDeleteThisLoad(oneLoad, isForceDelete, 
cleanStaleInProgress)) {
+          // No need to consider physical data for external segments, only 
consider metadata.
+          if (oneLoad.getPath() == null || 
oneLoad.getPath().equalsIgnoreCase("NA")) {
+            sizeFreed += calculateSegmentSizeForOneLoad(carbonTable, oneLoad, 
loadMetadataDetails)
+          }
+          sizeFreed += FileFactory.getCarbonFile(segmentFilePath).getSize
+        } else {
+          if (SegmentStatusManager.isExpiredSegment(oneLoad, carbonTable
+              .getAbsoluteTableIdentifier)) {
+            trashSizeRemaining += calculateSegmentSizeForOneLoad(carbonTable, 
oneLoad,
+                loadMetadataDetails)
+            trashSizeRemaining += 
FileFactory.getCarbonFile(segmentFilePath).getSize
+          }
+        }
+      }
+    }
+    Seq(sizeFreed, trashSizeRemaining)
+  }
+
+  /**
+   * calculates the segment size based of a segment
+   */
+  def calculateSegmentSizeForOneLoad( carbonTable: CarbonTable, oneLoad: 
LoadMetadataDetails,
+        loadMetadataDetails: Array[LoadMetadataDetails]) : Long = {
+    var size : Long = 0
+    if (oneLoad.getDataSize!= null && !oneLoad.getDataSize.isEmpty) {

Review comment:
       probably space missed before `!=`. same at line 223.

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -87,13 +104,51 @@ object DataTrashManager {
     }
   }
 
-  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+  /**
+   * Checks the size of the segment files as well as datafiles, this method is 
used before and after
+   * clean files operation to check how much space is actually freed, during 
the operation.
+   */
+  def getSizeScreenshot(carbonTable: CarbonTable): Long = {
+    val metadataDetails = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    var size: Long = 0
+    val segmentFileLocation = 
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)
+    if (FileFactory.isFileExist(segmentFileLocation)) {
+      size += FileFactory.getDirectorySize(segmentFileLocation)
+    }
+    metadataDetails.foreach(oneLoad =>
+      if (oneLoad.getVisibility.toBoolean) {
+        size += calculateSegmentSizeForOneLoad(carbonTable, oneLoad, 
metadataDetails)
+      }
+    )
+    size
+  }
+
+  /**
+   * Method to handle the Clean files dry run operation
+   */
+  def cleanFilesDryRunOperation (
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      showStats: Boolean): Seq[Long] = {
+    // get size freed from the trash folder
+    val trashFolderSizeStats = checkAndCleanTrashFolder(carbonTable, 
isForceDelete,
+        isDryRun = true, showStats)
+    // get size that will be deleted (MFD, COmpacted, Inprogress segments)
+    val expiredSegmentsSizeStats = dryRunOnExpiredSegments(carbonTable, 
isForceDelete,
+      cleanStaleInProgress)
+    Seq(trashFolderSizeStats.head + expiredSegmentsSizeStats.head, 
trashFolderSizeStats(1) +
+        expiredSegmentsSizeStats(1))
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean,
+      isDryRun: Boolean, showStats: Boolean): Seq[Long] = {

Review comment:
       Returning array of 2(freed size, remaining size). We return the tuple is 
more meaningful and readable in the scala?

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -121,6 +176,78 @@ object DataTrashManager {
     }
   }
 
+  /**
+   * Does Clean files dry run operation on the expired segments. Returns the 
size freed
+   * during that clean files operation and also shows the remaining trash 
size, which can be
+   * cleaned after those segments are expired
+   */
+  private def dryRunOnExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean): Seq[Long] = {

Review comment:
       same as above comment regd. return type

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1161,10 +1167,12 @@ private static void 
deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
         boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
         if (!exists) {
           
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+          LOGGER.info("Deleted the mergeindex file: " + location.toString());

Review comment:
       This could be index or merge index? If so, how about we add 
`LOGGER.info("Deleting files:)` before loop and inside loop just have 
`LOGGER.info(location.toString());`

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -41,6 +43,26 @@ case class CarbonCleanFilesCommand(
   extends DataCommand {
 
   val LOGGER: Logger = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  val isDryRun: Boolean = options.getOrElse("dryrun", "false").toBoolean
+  var showStats: Boolean = options.getOrElse("statistics", "true").toBoolean
+  if (isInternalCleanCall) {
+    showStats = false
+  }
+
+  override def output: Seq[AttributeReference] = {
+    if (isDryRun) {
+      // dry run operation
+      Seq(
+        AttributeReference("Size Freed", StringType, nullable = false)(),
+        AttributeReference("Trash Data Remaining", StringType, nullable = 
false)())
+    } else if (!isDryRun && showStats) {

Review comment:
       !isDryRun would always evaluates to true.

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -121,6 +176,78 @@ object DataTrashManager {
     }
   }
 
+  /**
+   * Does Clean files dry run operation on the expired segments. Returns the 
size freed
+   * during that clean files operation and also shows the remaining trash 
size, which can be
+   * cleaned after those segments are expired
+   */
+  private def dryRunOnExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean): Seq[Long] = {
+    var sizeFreed: Long = 0
+    var trashSizeRemaining: Long = 0
+    val loadMetadataDetails = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    if (SegmentStatusManager.isLoadDeletionRequired(loadMetadataDetails)) {
+      loadMetadataDetails.foreach { oneLoad =>
+        val segmentFilePath = 
CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath,
+          oneLoad.getSegmentFile)
+        if (DeleteLoadFolders.canDeleteThisLoad(oneLoad, isForceDelete, 
cleanStaleInProgress)) {
+          // No need to consider physical data for external segments, only 
consider metadata.
+          if (oneLoad.getPath() == null || 
oneLoad.getPath().equalsIgnoreCase("NA")) {
+            sizeFreed += calculateSegmentSizeForOneLoad(carbonTable, oneLoad, 
loadMetadataDetails)
+          }
+          sizeFreed += FileFactory.getCarbonFile(segmentFilePath).getSize
+        } else {
+          if (SegmentStatusManager.isExpiredSegment(oneLoad, carbonTable
+              .getAbsoluteTableIdentifier)) {
+            trashSizeRemaining += calculateSegmentSizeForOneLoad(carbonTable, 
oneLoad,
+                loadMetadataDetails)
+            trashSizeRemaining += 
FileFactory.getCarbonFile(segmentFilePath).getSize
+          }
+        }
+      }
+    }
+    Seq(sizeFreed, trashSizeRemaining)
+  }
+
+  /**
+   * calculates the segment size based of a segment
+   */
+  def calculateSegmentSizeForOneLoad( carbonTable: CarbonTable, oneLoad: 
LoadMetadataDetails,
+        loadMetadataDetails: Array[LoadMetadataDetails]) : Long = {
+    var size : Long = 0
+    if (oneLoad.getDataSize!= null && !oneLoad.getDataSize.isEmpty) {

Review comment:
       you would want to use !StringUtils.isEmpty(oneLoad.getDataSize) ? can 
check similar at other place

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -41,6 +43,26 @@ case class CarbonCleanFilesCommand(
   extends DataCommand {
 
   val LOGGER: Logger = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  val isDryRun: Boolean = options.getOrElse("dryrun", "false").toBoolean
+  var showStats: Boolean = options.getOrElse("statistics", "true").toBoolean

Review comment:
       showStats can still be immutable. You might want to do something like 
this - 
     `val showStats: Boolean = if (isInternalCleanCall) {
       false
     } else {
       options.getOrElse("statistics", "true").toBoolean
     }`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to