This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d9f69ae  [CARBONDATA-4110] Support clean files dry run operation and 
show statistics after clean files operation
d9f69ae is described below

commit d9f69aef5affad6d687820c0642d4f3e1655f217
Author: Vikram Ahuja <[email protected]>
AuthorDate: Thu Jan 7 11:19:54 2021 +0530

    [CARBONDATA-4110] Support clean files dry run operation and show statistics 
after clean files
    operation
    
    Why is this PR needed?
    Currently in the clean files operation the user does not know how much 
space will be freed.
    The idea is the add support for dry run in clean files which can tell the 
user how much space
    will be freed in the clean files operation without cleaning the actual data.
    
    What changes were proposed in this PR?
    This PR has the following changes:
    
    1. Support dry run in clean files: It will show the user how much space 
will be freed by the
       clean files operation and how much space left (which can be released 
after expiration time)
       after the clean files operation.
    2. Clean files output: Total size released during the clean files operation
    3. Disable clean files Statistics option in case the user does not want 
clean files statistics
    4. Clean files log: To enhance the clean files log to print the name of 
every file that is being
    deleted in the info log.
    
    This closes #4072
---
 .../carbondata/core/metadata/SegmentFileStore.java |  11 +-
 .../carbondata/core/mutate/CarbonUpdateUtil.java   |   2 +-
 .../core/statusmanager/SegmentStatusManager.java   |  22 ++-
 .../carbondata/core/util/CleanFilesUtil.java       |  10 ++
 .../carbondata/core/util/DeleteLoadFolders.java    |   2 +-
 .../org/apache/carbondata/core/util/TrashUtil.java |  56 +++++++-
 docs/clean-files.md                                |  39 +++++-
 .../TestCreateIndexForCleanAndDeleteSegment.scala  |  16 ++-
 .../apache/carbondata/trash/DataTrashManager.scala | 154 +++++++++++++++++++--
 .../management/CarbonCleanFilesCommand.scala       |  55 ++++++--
 .../scala/org/apache/spark/util/CleanFiles.scala   |   1 +
 .../cleanfiles/TestCleanFileCommand.scala          |  76 ++++++++--
 .../TestCleanFilesCommandPartitionTable.scala      |  20 ++-
 .../FlatFolderTableLoadingTestCase.scala           |   8 +-
 14 files changed, 428 insertions(+), 44 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 0a26b5e..869cc5c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -1125,17 +1125,23 @@ public class SegmentFileStore {
     List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
         FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+    List<String> deletedFiles = new ArrayList<>();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
       FileFactory.deleteFile(entry.getKey());
+      deletedFiles.add(entry.getKey());
       for (String file : entry.getValue()) {
         String[] deltaFilePaths =
             updateStatusManager.getDeleteDeltaFilePath(file, 
segment.getSegmentNo());
         for (String deltaFilePath : deltaFilePaths) {
           FileFactory.deleteFile(deltaFilePath);
+          deletedFiles.add(deltaFilePath);
         }
         FileFactory.deleteFile(file);
+        deletedFiles.add(file);
       }
     }
+    LOGGER.info("Deleted the files: " + String.join(",", deletedFiles) + " on 
clean" +
+        " files operation");
     deletePhysicalPartition(partitionSpecs, indexFilesMap, indexOrMergeFiles, 
tablePath);
   }
 
@@ -1147,16 +1153,19 @@ public class SegmentFileStore {
    */
   private static void deletePhysicalPartition(List<PartitionSpec> 
partitionSpecs,
       Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, 
String tablePath) {
+    LOGGER.info("Deleting files: ");
     for (String indexOrMergeFile : indexOrMergeFiles) {
       if (null != partitionSpecs) {
         Path location = new Path(indexOrMergeFile);
         boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
         if (!exists) {
           
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+          LOGGER.info(location.toString());
         }
       } else {
         Path location = new Path(indexOrMergeFile);
         
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+        LOGGER.info(location.toString());
       }
     }
     for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
@@ -1199,7 +1208,7 @@ public class SegmentFileStore {
     }
   }
 
-  private static boolean pathExistsInPartitionSpec(List<PartitionSpec> 
partitionSpecs,
+  public static boolean pathExistsInPartitionSpec(List<PartitionSpec> 
partitionSpecs,
       Path partitionPath) {
     for (PartitionSpec spec : partitionSpecs) {
       if (spec.getLocation().equals(partitionPath)) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index e78b630..41c4b2b 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -517,7 +517,7 @@ public class CarbonUpdateUtil {
 
     long minutesElapsed = (difference / (1000 * 60));
 
-    return minutesElapsed > maxTime;
+    return minutesElapsed >= maxTime;
 
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 5b800a2..38d9e56 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -983,7 +983,27 @@ public class SegmentStatusManager {
     }
   }
 
-  private static boolean isLoadDeletionRequired(LoadMetadataDetails[] details) 
{
+  public static boolean isExpiredSegment(LoadMetadataDetails oneLoad, 
AbsoluteTableIdentifier
+      absoluteTableIdentifier) {
+    boolean isExpiredSegment = false;
+    if (oneLoad.getSegmentStatus() == SegmentStatus.COMPACTED || 
oneLoad.getSegmentStatus() ==
+        SegmentStatus.MARKED_FOR_DELETE) {
+      isExpiredSegment = true;
+    } else if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS 
|| oneLoad
+        .getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+      // check if lock can be acquired
+      ICarbonLock segmentLock = 
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+          CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + 
LockUsage.LOCK);
+      try {
+        isExpiredSegment = segmentLock.lockWithRetries();
+      } finally {
+        CarbonLockUtil.fileUnlock(segmentLock, LockUsage.LOCK);
+      }
+    }
+    return isExpiredSegment;
+  }
+
+  public static boolean isLoadDeletionRequired(LoadMetadataDetails[] details) {
     if (details != null && details.length > 0) {
       for (LoadMetadataDetails oneRow : details) {
         if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus()
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
index 9889fe3..44d1d8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -74,13 +74,19 @@ public class CleanFilesUtil {
               // delete the segment file as well
               
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
                   staleSegmentFile));
+              List<String> deletedFiles = new ArrayList<>();
+              deletedFiles.add(staleSegmentFile);
               for (String duplicateStaleSegmentFile : redundantSegmentFile) {
                 if 
(DataFileUtil.getSegmentNoFromSegmentFile(duplicateStaleSegmentFile)
                     .equals(segmentNumber)) {
                   
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable
                       .getTablePath(), duplicateStaleSegmentFile));
+                  deletedFiles.add(duplicateStaleSegmentFile);
                 }
               }
+              LOGGER.info("Deleted the Segment :" + segmentPath.getName() + " 
after"
+                  + " moving it to the trash folder");
+              LOGGER.info("Deleted stale segment files: " + String.join(",", 
deletedFiles));
             } catch (IOException | InterruptedException e) {
               LOGGER.error("Unable to delete the segment: " + segmentPath + " 
from after moving" +
                   " it to the trash folder. Please delete them manually : " + 
e.getMessage(), e);
@@ -122,16 +128,20 @@ public class CleanFilesUtil {
       try {
         for (String file : filesToProcess) {
           FileFactory.deleteFile(file);
+          LOGGER.info("Deleted file :" + file + " after moving it to the trash 
folder");
         }
         // Delete the segment file too
         
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
             staleSegmentFile));
+        LOGGER.info("Deleted stale segment file after moving it to the trash 
folder :"
+            + staleSegmentFile);
         // remove duplicate segment files if any
         for (String duplicateStaleSegmentFile : redundantSegmentFile) {
           if 
(DataFileUtil.getSegmentNoFromSegmentFile(duplicateStaleSegmentFile)
               .equals(segmentNumber)) {
             
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
                 duplicateStaleSegmentFile));
+            LOGGER.info("Deleted redundant segment file :" + 
duplicateStaleSegmentFile);
           }
         }
       } catch (IOException e) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java 
b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index f8f2607..49db220 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -187,7 +187,7 @@ public final class DeleteLoadFolders {
     return false;
   }
 
-  private static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad,
+  public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad,
       boolean isForceDelete, boolean cleanStaleInProgress) {
     /*
      * if cleanStaleInProgress == false and  isForceDelete == false, clean MFD 
and Compacted
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
index ae5476d..47d196b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.util;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -150,48 +151,91 @@ public final class TrashUtil {
 
   /**
    * The below method deletes timestamp subdirectories in the trash folder 
which have expired as
-   * per the user defined retention time
+   * per the user defined retention time. It return an array where the first 
element has the size
+   * freed from the trash folder and the second element has the remaining size 
in the trash folder
    */
-  public static void deleteExpiredDataFromTrash(String tablePath) {
+  public static long[] deleteExpiredDataFromTrash(String tablePath, Boolean 
isDryRun,
+      Boolean showStats) {
     CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
         .getTrashFolderPath(tablePath));
+    long sizeFreed = 0;
+    long trashFolderSize = 0;
     // Deleting the timestamp based subdirectories in the trashfolder by the 
given timestamp.
     try {
       if (trashFolder.isFileExist()) {
+        if (isDryRun || showStats) {
+          trashFolderSize = 
FileFactory.getDirectorySize(trashFolder.getAbsolutePath());
+        }
         CarbonFile[] timestampFolderList = trashFolder.listFiles();
+        List<CarbonFile> filesToDelete = new ArrayList<>();
         for (CarbonFile timestampFolder : timestampFolderList) {
           // If the timeStamp at which the timeStamp subdirectory has expired 
as per the user
           // defined value, delete the complete timeStamp subdirectory
           if (timestampFolder.isDirectory() && 
isTrashRetentionTimeoutExceeded(Long
               .parseLong(timestampFolder.getName()))) {
-            FileFactory.deleteAllCarbonFilesOfDir(timestampFolder);
-            LOGGER.info("Timestamp subfolder from the Trash folder deleted: " 
+ timestampFolder
+            // only calculate size in case of dry run or in case clean files 
is with show stats
+            if (isDryRun || showStats) {
+              sizeFreed += 
FileFactory.getDirectorySize(timestampFolder.getAbsolutePath());
+            }
+            filesToDelete.add(timestampFolder);
+          }
+        }
+        if (!isDryRun) {
+          for (CarbonFile carbonFile : filesToDelete) {
+            LOGGER.info("Timestamp subfolder from the Trash folder deleted: " 
+ carbonFile
                 .getAbsolutePath());
+            FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
           }
         }
       }
     } catch (IOException e) {
       LOGGER.error("Error during deleting expired timestamp folder from the 
trash folder", e);
     }
+    return new long[] {sizeFreed, trashFolderSize - sizeFreed};
   }
 
   /**
    * The below method deletes all the files and folders in the trash folder of 
a carbon table.
+   * Returns an array in which the first element contains the size freed in 
case of clean files
+   * operation or size that can be freed in case of dry run and the second 
element contains the
+   * remaining size.
    */
-  public static void emptyTrash(String tablePath) {
+  public static long[] emptyTrash(String tablePath, Boolean isDryRun, Boolean 
showStats) {
     CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
         .getTrashFolderPath(tablePath));
     // if the trash folder exists delete the contents of the trash folder
+    long sizeFreed = 0;
+    long[] sizeStatistics = new long[]{0, 0};
     try {
       if (trashFolder.isFileExist()) {
         CarbonFile[] carbonFileList = trashFolder.listFiles();
+        List<CarbonFile> filesToDelete = new ArrayList<>();
         for (CarbonFile carbonFile : carbonFileList) {
-          FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
+          //Only calculate size when it is dry run operation or when show 
statistics is
+          // true with actual operation
+          if (isDryRun || showStats) {
+            sizeFreed += 
FileFactory.getDirectorySize(carbonFile.getAbsolutePath());
+          }
+          filesToDelete.add(carbonFile);
+        }
+        sizeStatistics[0] = sizeFreed;
+        if (!isDryRun) {
+          for (CarbonFile carbonFile : filesToDelete) {
+            FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
+          }
+          LOGGER.info("Trash Folder has been emptied for table: " + tablePath);
+          if (showStats) {
+            sizeStatistics[1] = 
FileFactory.getDirectorySize(trashFolder.getAbsolutePath());
+          }
+        } else {
+          sizeStatistics[1] = 
FileFactory.getDirectorySize(trashFolder.getAbsolutePath()) -
+              sizeFreed;
         }
       }
     } catch (IOException e) {
       LOGGER.error("Error while emptying the trash folder", e);
     }
+    return sizeStatistics;
   }
 
   /**
diff --git a/docs/clean-files.md b/docs/clean-files.md
index dbe611b..e5bde49 100644
--- a/docs/clean-files.md
+++ b/docs/clean-files.md
@@ -64,4 +64,41 @@ The stale_inprogress option with force option will delete 
Marked for delete, Com
 
   ```
   CLEAN FILES FOR TABLE TABLE_NAME options('stale_inprogress'='true', 
'force'='true')
-  ```
\ No newline at end of file
+  ```
+### DRY RUN OPTION
+Clean files also support a dry run option which will let the user know how 
much space will we freed 
+during the actual clean files operation. The dry run operation will not delete 
any data but will just give
+size based statistics on the data which will be cleaned in clean files. Dry 
run operation will return two columns where the first will 
+show how much space will be freed by that clean files operation and the second 
column will show the 
+remaining stale data(data which can be deleted but has not yet expired as per 
the ```max.query.execution.time``` and ``` carbon.trash.retention.days``` values
+).  By default the value of ```dryrun``` option is ```false```.
+
+Dry Run Operation is supported with four types of commands:
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME options('dryrun'='true')
+  ```
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME options('force'='true', 'dryrun'='true')
+  ```
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME 
options('stale_inprogress'='true','dryrun'='true')
+  ```
+
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME options('stale_inprogress'='true', 
'force'='true','dryrun'='true')
+  ```
+
+**NOTE**:
+  * Since the dry run operation will calculate size and will access File level 
API's, the operation can
+  be a costly and a time consuming operation in case of tables with large 
number of segments.
+  * When dry run is true, the statistics option will not matter.
+  
+### SHOW STATISTICS
+Clean files operation tells how much size is freed during that operation to 
the user.  By default, the clean files operation
+will show the size freed statistics. Since calculating and showing statistics 
can be a costly operation and reduce the performance of the
+clean files operation, the user can disable that option by using ```statistics 
= false``` in the clean files options.
+  
+   ```
+   CLEAN FILES FOR TABLE TABLE_NAME options('statistics'='false')
+   ```
+  
\ No newline at end of file
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
index 32183ab..ded9c87 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
@@ -19,6 +19,9 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 /**
  * test cases for testing clean and delete segment functionality for index 
tables
  */
@@ -60,7 +63,10 @@ class TestCreateIndexForCleanAndDeleteSegment extends 
QueryTest with BeforeAndAf
       "SEGMENT.STARTTIME BEFORE '2025-06-01 12:05:06'")
     sql("create materialized view mv1 as select empname, deptname, " +
       "avg(salary) from  delete_segment_by_id group by empname, deptname")
-    sql("clean files for table delete_segment_by_id")
+    var dryRun = sql("clean files for table delete_segment_by_id 
OPTIONS('dryrun'='true')")
+      .collect()
+    var cleanFiles = sql("clean files for table 
delete_segment_by_id").collect()
+    assert(cleanFiles(0).get(0) == dryRun(0).get(0))
     checkAnswer(sql("select count(*) from delete_segment_by_id"),
       sql("select count(*) from index_no_dictionary"))
     val postDeleteSegmentsByDate = sql("SHOW SEGMENTS FOR TABLE 
delete_segment_by_id").count()
@@ -69,6 +75,14 @@ class TestCreateIndexForCleanAndDeleteSegment extends 
QueryTest with BeforeAndAf
     assert(result.get(0).get(2).toString.equalsIgnoreCase("ENABLED"))
     assert(result.get(0).get(3).toString.equalsIgnoreCase("full"))
     assert(result.get(0).get(4).toString.equalsIgnoreCase("on_commit"))
+    dryRun = sql("clean files for table delete_segment_by_id" +
+      " OPTIONS('dryrun'='true', 'force'='true')").collect()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
+    cleanFiles = sql("clean files for table delete_segment_by_id 
OPTIONS('force'='true')").collect()
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+    assert(cleanFiles(0).get(0) == dryRun(0).get(0))
     sql("drop materialized view if exists mv1 ")
     sql("drop table if exists delete_segment_by_id")
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
index cadc43b..1d0bd3b 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
@@ -19,16 +19,19 @@ package org.apache.carbondata.trash
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.lang.StringUtils
+
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, DeleteLoadFolders, TrashUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
 object DataTrashManager {
@@ -48,7 +51,8 @@ object DataTrashManager {
       carbonTable: CarbonTable,
       isForceDelete: Boolean,
       cleanStaleInProgress: Boolean,
-      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+      showStatistics: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None) : Long = {
     // if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
     if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
       LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
@@ -72,11 +76,26 @@ object DataTrashManager {
       carbonDeleteSegmentLock = CarbonLockUtil.getLockObject(carbonTable
         .getAbsoluteTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK, 
deleteSegmentErrorMsg)
       // step 1: check and clean trash folder
-      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // trashFolderSizeStats(0) contains the size that is freed/or can be 
freed and
+      // trashFolderSizeStats(1) contains the size of remaining data in the 
trash folder
+      val trashFolderSizeStats = checkAndCleanTrashFolder(carbonTable, 
isForceDelete,
+          isDryRun = false, showStatistics)
       // step 2: move stale segments which are not exists in metadata into 
.Trash
       moveStaleSegmentsToTrash(carbonTable)
       // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
-      checkAndCleanExpiredSegments(carbonTable, isForceDelete, 
cleanStaleInProgress, partitionSpecs)
+      // Since calculating the the size before and after clean files can be a 
costly operation
+      // have exposed an option where user can change this behaviour.
+      if (showStatistics) {
+        val sizeBeforeCleaning = getSizeSnapshot(carbonTable)
+        checkAndCleanExpiredSegments(carbonTable, isForceDelete,
+          cleanStaleInProgress, partitionSpecs)
+        val sizeAfterCleaning = getSizeSnapshot(carbonTable)
+        sizeBeforeCleaning - sizeAfterCleaning + trashFolderSizeStats._1
+      } else {
+        checkAndCleanExpiredSegments(carbonTable, isForceDelete,
+          cleanStaleInProgress, partitionSpecs)
+        0
+      }
     } finally {
       if (carbonCleanFilesLock != null) {
         CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
@@ -87,13 +106,54 @@ object DataTrashManager {
     }
   }
 
-  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+  /**
+   * Checks the size of the segment files as well as datafiles, this method is 
used before and after
+   * clean files operation to check how much space is actually freed, during 
the operation.
+   */
+  def getSizeSnapshot(carbonTable: CarbonTable): Long = {
+    val metadataDetails = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    var size: Long = 0
+    val segmentFileLocation = 
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)
+    if (FileFactory.isFileExist(segmentFileLocation)) {
+      size += FileFactory.getDirectorySize(segmentFileLocation)
+    }
+    metadataDetails.foreach(oneLoad =>
+      if (oneLoad.getVisibility.toBoolean) {
+        size += calculateSegmentSizeForOneLoad(carbonTable, oneLoad, 
metadataDetails)
+      }
+    )
+    size
+  }
+
+  /**
+   * Method to handle the Clean files dry run operation
+   */
+  def cleanFilesDryRunOperation (
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      showStats: Boolean): (Long, Long) = {
+    // get size freed from the trash folder
+    val trashFolderSizeStats = checkAndCleanTrashFolder(carbonTable, 
isForceDelete,
+        isDryRun = true, showStats)
+    // get size that will be deleted (MFD, COmpacted, Inprogress segments)
+    val expiredSegmentsSizeStats = dryRunOnExpiredSegments(carbonTable, 
isForceDelete,
+      cleanStaleInProgress)
+    (trashFolderSizeStats._1 + expiredSegmentsSizeStats._1, 
trashFolderSizeStats._2 +
+        expiredSegmentsSizeStats._2)
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean,
+      isDryRun: Boolean, showStats: Boolean): (Long, Long) = {
     if (isForceDelete) {
       // empty the trash folder
-      TrashUtil.emptyTrash(carbonTable.getTablePath)
+      val sizeStatistics = TrashUtil.emptyTrash(carbonTable.getTablePath, 
isDryRun, showStats)
+      (sizeStatistics.head, sizeStatistics(1))
     } else {
       // clear trash based on timestamp
-      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+      val sizeStatistics = 
TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath,
+          isDryRun, showStats)
+      (sizeStatistics.head, sizeStatistics(1))
     }
   }
 
@@ -122,6 +182,84 @@ object DataTrashManager {
   }
 
   /**
+   * Does Clean files dry run operation on the expired segments. Returns the 
size freed
+   * during that clean files operation and also shows the remaining trash 
size, which can be
+   * cleaned after those segments are expired
+   */
+  private def dryRunOnExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean): (Long, Long) = {
+    var sizeFreed: Long = 0
+    var trashSizeRemaining: Long = 0
+    val loadMetadataDetails = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    if (SegmentStatusManager.isLoadDeletionRequired(loadMetadataDetails)) {
+      loadMetadataDetails.foreach { oneLoad =>
+        if (!oneLoad.getVisibility.equalsIgnoreCase("false")) {
+          val segmentFilePath = 
CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath,
+              oneLoad.getSegmentFile)
+          if (DeleteLoadFolders.canDeleteThisLoad(oneLoad, isForceDelete, 
cleanStaleInProgress)) {
+            // No need to consider physical data for external segments, only 
consider metadata.
+            if (oneLoad.getPath() == null || 
oneLoad.getPath().equalsIgnoreCase("NA")) {
+              sizeFreed += calculateSegmentSizeForOneLoad(carbonTable, 
oneLoad, loadMetadataDetails)
+            }
+            if (FileFactory.isFileExist(segmentFilePath)) {
+              sizeFreed += FileFactory.getCarbonFile(segmentFilePath).getSize
+            }
+          } else {
+            if (SegmentStatusManager.isExpiredSegment(oneLoad, carbonTable
+                .getAbsoluteTableIdentifier)) {
+              trashSizeRemaining += 
calculateSegmentSizeForOneLoad(carbonTable, oneLoad,
+                  loadMetadataDetails)
+              if (FileFactory.isFileExist(segmentFilePath)) {
+                trashSizeRemaining += 
FileFactory.getCarbonFile(segmentFilePath).getSize
+              }
+            }
+          }
+        }
+      }
+    }
+    (sizeFreed, trashSizeRemaining)
+  }
+
+  /**
+   * calculates the segment size based of a segment
+   */
+  def calculateSegmentSizeForOneLoad( carbonTable: CarbonTable, oneLoad: 
LoadMetadataDetails,
+        loadMetadataDetails: Array[LoadMetadataDetails]) : Long = {
+    var size : Long = 0
+    if (!StringUtils.isEmpty(oneLoad.getDataSize)) {
+      size += oneLoad.getDataSize.toLong
+    }
+    if (!StringUtils.isEmpty(oneLoad.getIndexSize)) {
+      size += oneLoad.getIndexSize.toLong
+    }
+    if (!oneLoad.getUpdateDeltaStartTimestamp.isEmpty && 
!oneLoad.getUpdateDeltaEndTimestamp
+        .isEmpty) {
+      size += calculateDeltaFileSize(carbonTable, oneLoad, loadMetadataDetails)
+    }
+    size
+  }
+
+  /**
+   * calculates the size of delta files  for one segment
+   */
+  def calculateDeltaFileSize( carbonTable: CarbonTable, oneLoad: 
LoadMetadataDetails,
+      loadMetadataDetails: Array[LoadMetadataDetails]) : Long = {
+    var size: Long = 0
+    val segmentUpdateStatusManager = new 
SegmentUpdateStatusManager(carbonTable,
+        loadMetadataDetails)
+    
segmentUpdateStatusManager.getBlockNameFromSegment(oneLoad.getLoadName).asScala.foreach
 {
+      block =>
+      segmentUpdateStatusManager.getDeleteDeltaFilesList(Segment
+          .toSegment(oneLoad.getLoadName), block).asScala.foreach{ deltaFile =>
+        size += FileFactory.getCarbonFile(deltaFile).getSize
+      }
+    }
+    size
+  }
+
+  /**
    * clean the stale compact segment immediately after compaction failure
    */
   def cleanStaleCompactionSegment(
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index c51efef..6461604 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -19,14 +19,16 @@ package org.apache.spark.sql.execution.command.management
 
 import org.apache.log4j.Logger
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.types.StringType
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.ByteUtil
 import org.apache.carbondata.events._
 import org.apache.carbondata.trash.DataTrashManager
 
@@ -41,6 +43,27 @@ case class CarbonCleanFilesCommand(
   extends DataCommand {
 
   val LOGGER: Logger = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  val isDryRun: Boolean = options.getOrElse("dryrun", "false").toBoolean
+  val showStats: Boolean = if (isInternalCleanCall) {
+    false
+  } else {
+    options.getOrElse("statistics", "true").toBoolean
+  }
+
+  override def output: Seq[AttributeReference] = {
+    if (isDryRun) {
+      // dry run operation
+      Seq(
+        AttributeReference("Size To Be Freed", StringType, nullable = false)(),
+        AttributeReference("Trash Data Remaining", StringType, nullable = 
false)())
+    } else if (showStats) {
+      Seq(
+        AttributeReference("Size Freed", StringType, nullable = false)())
+      // actual clean files operation
+    } else {
+      Seq.empty
+    }
+  }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
@@ -58,18 +81,32 @@ case class CarbonCleanFilesCommand(
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
-
-    val preEvent = CleanFilesPreEvent(carbonTable, sparkSession)
-    val postEvent = CleanFilesPostEvent(carbonTable, sparkSession, options)
-    withEvents(preEvent, postEvent) {
-      DataTrashManager.cleanGarbageData(
+    var sizeCleaned : Long = 0
+    if (isDryRun) {
+      val result = DataTrashManager.cleanFilesDryRunOperation(
         carbonTable,
         options.getOrElse("force", "false").toBoolean,
         options.getOrElse("stale_inprogress", "false").toBoolean,
-        CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, 
carbonTable))
+        showStats)
+      Seq(Row(ByteUtil.convertByteToReadable(result._1), ByteUtil
+          .convertByteToReadable(result._2)))
+    } else {
+      val preEvent = CleanFilesPreEvent(carbonTable, sparkSession)
+      val postEvent = CleanFilesPostEvent(carbonTable, sparkSession, options)
+      withEvents(preEvent, postEvent) {
+         sizeCleaned = DataTrashManager.cleanGarbageData(
+          carbonTable,
+          options.getOrElse("force", "false").toBoolean,
+          options.getOrElse("stale_inprogress", "false").toBoolean,
+          showStats,
+          CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, 
carbonTable))
+      }
+      if (showStats) {
+        Seq(Row(ByteUtil.convertByteToReadable(sizeCleaned)))
+      } else {
+        Seq.empty
+      }
     }
-
-    Seq.empty
   }
 
   override protected def opName: String = "CLEAN FILES"
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala 
b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
index 16898b2..40bd805 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -40,6 +40,7 @@ object CleanFiles {
       carbonTable,
       isForceDeletion,
       cleanStaleInProgress,
+      false,
       CarbonFilters.getPartitions(Seq.empty[Expression], spark, carbonTable))
   }
 
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
index 8812253..c5589fe 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
@@ -31,7 +31,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, 
CarbonTestUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
 
@@ -53,7 +53,11 @@ class TestCleanFileCommand extends QueryTest with 
BeforeAndAfterAll {
 
     val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
     assert(segmentNumber1 == 4)
-    sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('stale_inprogress'='true')").show
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest " +
+      s"OPTIONS('stale_inprogress'='true','dryrun'='true')").collect()
+    val cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest" +
+      s" OPTIONS('stale_inprogress'='true')").collect()
+    assert(cleanFiles(0).get(0) == dryRun(0).get(0))
     val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
     assert(4 == segmentNumber2)
     assert(!FileFactory.isFileExist(trashFolderPath))
@@ -76,7 +80,11 @@ class TestCleanFileCommand extends QueryTest with 
BeforeAndAfterAll {
     assert(segmentNumber1 == 4)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
-    sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('stale_inprogress'='true','force'='true')").show
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS" +
+      s"('stale_inprogress'='true','force'='true','dryrun'='true')").collect()
+    val cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS" +
+      s"('stale_inprogress'='true','force'='true')").collect()
+    assert(cleanFiles(0).get(0) == dryRun(0).get(0))
     CarbonProperties.getInstance()
       .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
@@ -100,10 +108,18 @@ class TestCleanFileCommand extends QueryTest with 
BeforeAndAfterAll {
     val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
     assert(!FileFactory.isFileExist(trashFolderPath))
     sql(s"""Delete from table cleantest where segment.id in(4)""")
+
+    var dryRun = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('dryrun'='true')").collect()
+    var cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+    sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('statistics'='false')").show()
+    assert(cleanFiles(0).get(0) == dryRun(0).get(0))
+    dryRun = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('dryrun'='true','force'='true')")
+        .collect()
     val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
-    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('force'='true')").collect()
+    assert(cleanFiles(0).get(0) == dryRun(0).get(0))
     CarbonProperties.getInstance()
       .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     sql(s"""show segments for table cleantest""").show()
@@ -130,7 +146,9 @@ class TestCleanFileCommand extends QueryTest with 
BeforeAndAfterAll {
     
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(
         sqlContext.sparkSession), "4")
     assert(!FileFactory.isFileExist(trashFolderPath))
-    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    var dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('dryrun'='true')").collect()
+    var cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+    assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
     checkAnswer(sql(s"""select count(*) from cleantest"""),
       Seq(Row(4)))
     count = 0
@@ -149,17 +167,19 @@ class TestCleanFileCommand extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql(s"""select count(*) from cleantest"""),
       Seq(Row(5)))
 
-    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('dryrun'='true')").collect()
+    cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+    assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
     count = 0
     list = getFileCountInTrashFolder(trashFolderPath)
     assert(list == 2)
 
-    intercept[RuntimeException] {
-      sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
-    }
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
-    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true', 
'dryrun'='true')")
+        .collect()
+    cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('force'='true')").collect()
+    assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
     CarbonProperties.getInstance()
       .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
 
@@ -466,6 +486,42 @@ class TestCleanFileCommand extends QueryTest with 
BeforeAndAfterAll {
         CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
     }
 
+  test("Test clean files after delete command") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
+    sql("drop table if exists cleantest")
+    sql(
+      """
+        | CREATE TABLE cleantest (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
cleantest OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    val table = CarbonEnv.getCarbonTable(None, "cleantest") 
(sqlContext.sparkSession)
+    sql("delete from cleantest where deptno='10'")
+    sql(s"""Delete from table cleantest where segment.id in(0)""")
+    val segmentSize = 
FileFactory.getDirectorySize(CarbonTablePath.getSegmentPath(table
+        .getTablePath, "0")) + FileFactory.getDirectorySize(CarbonTablePath
+        .getSegmentFilesLocation(table.getTablePath))
+    var dryRun = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('dryrun'='true')").collect()
+    var cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+    assert(cleanFiles(0).get(0) == dryRun(0).get(0))
+    dryRun = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('dryrun'='true','force'='true')")
+      .collect()
+    cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('force'='true')").collect()
+    assert(cleanFiles(0).get(0) == dryRun(0).get(0))
+    assert(ByteUtil.convertByteToReadable(segmentSize) == cleanFiles(0).get(0))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+        CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+    sql("drop table if exists cleantest")
+  }
+
+
   def editTableStatusFile(carbonTablePath: String) : Unit = {
     // original table status file
     val f1 = new File(CarbonTablePath.getTableStatusFilePath(carbonTablePath))
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
index c494404..7f2a0e1 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
@@ -65,15 +65,25 @@ class TestCleanFilesCommandPartitionTable extends QueryTest 
with BeforeAndAfterA
     loadData()
     sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
     loadData()
+    var dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('dryrun'='true')").collect()
+    var cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+    assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
     val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
       .getTablePath
     val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.TRASH_DIR
     assert(!FileFactory.isFileExist(trashFolderPath))
     sql(s"""Delete from table cleantest where segment.id in(4)""")
+    dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest 
options('dryrun'='true')").collect()
+    cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+    assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
+
     val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
-    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('force'='true','dryrun'='true')")
+      .collect()
+    cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('force'='true')").collect()
+    assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
     CarbonProperties.getInstance()
       .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
@@ -139,7 +149,9 @@ class TestCleanFilesCommandPartitionTable extends QueryTest 
with BeforeAndAfterA
     
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(
       sqlContext.sparkSession), "2")
 
-    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    val dryRunRes = sql(s"CLEAN FILES FOR TABLE CLEANTEST 
OPTIONS('dryrun'='true')").collect()
+    val cleanFilesRes = sql(s"CLEAN FILES FOR TABLE CLEANTEST").collect()
+    assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
     checkAnswer(sql(s"""select count(*) from cleantest"""),
       Seq(Row(2)))
 
@@ -186,7 +198,9 @@ class TestCleanFilesCommandPartitionTable extends QueryTest 
with BeforeAndAfterA
     
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(
       sqlContext.sparkSession), "2")
 
-    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    val dryRunRes = sql(s"CLEAN FILES FOR TABLE CLEANTEST 
OPTIONS('dryrun'='true')").collect()
+    val cleanFilesRes = sql(s"CLEAN FILES FOR TABLE CLEANTEST").collect()
+    assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
 
     val timeStamp = getTimestampFolderName(trashFolderPath)
     // test recovery from partition table
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
index cde2f88..a32f352 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -101,7 +101,9 @@ class FlatFolderTableLoadingTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
              .listFiles()
              .count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) 
== 5)
-    sql("clean files for table t1 options('force'='true')")
+    var dryRunRes = sql("clean files for table t1 options('force'='true', 
'dryrun'='true')").collect()
+    var cleanFilesRes = sql("clean files for table t1 
options('force'='true')").collect()
+    assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
              .listFiles()
              .count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) 
== 4)
@@ -110,7 +112,9 @@ class FlatFolderTableLoadingTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
              .listFiles()
              .count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) 
== 5)
-    sql("clean files for table t1 options('force'='true')")
+    dryRunRes = sql("clean files for table t1 options('force'='true', 
'dryrun'='true')").collect()
+    cleanFilesRes = sql("clean files for table t1 
options('force'='true')").collect()
+    assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
              .listFiles()
              .count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) 
== 1)

Reply via email to