This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d88ea4b  [CARBONDATA-3978] Support .Trash folder
d88ea4b is described below

commit d88ea4b53477a09753ac3a94bee6daf67c2bd21f
Author: Vikram Ahuja <[email protected]>
AuthorDate: Fri Nov 6 12:59:43 2020 +0530

    [CARBONDATA-3978] Support .Trash folder
    
    Why is this PR needed?
    To add support for a trash folder, where all the carbondata and index files 
of all the stale segments(segments whose entry is not in tablestatus file but 
exists in Fact folder and metadata folder) will go after clean files is called.
    
    What changes were proposed in this PR?
    Added trash folder(recycle bin) where all the data files of stale 
segments(segments whose entry is not in tablestatus file but exists in Fact 
folder and metadata folder) go to after clean files operation. Timestamp based 
subdirectories are added in the trash folder and those subdirectories will 
contain the segment folders which are moved to the trash. By default the 
expiration day of the timestamp subdirectory is 7 days, the subsequent clean 
files command after the expiration will rem [...]
    Clean files with force option directly empties the trash folder. By 
default, clean files with force option is disabled. It can be enabled by 
setting the carbon property "carbon.clean.file.force.allowed" to true.
    The MFD and Compacted segments will also take in into account the Carbon 
Trash Retention Property before they are being deleted instead of just max 
query timeout.
    
    Does this PR introduce any user interface change?
    Yes. (Added clean-files.md)
    
    Is any new testcase added?
    Yes
    
    This closes #4005
---
 .../core/constants/CarbonCommonConstants.java      |  28 ++
 .../carbondata/core/metadata/SegmentFileStore.java |   4 +-
 .../carbondata/core/util/CarbonProperties.java     |  42 +++
 .../carbondata/core/util/CleanFilesUtil.java       | 199 ++++++++++++
 .../carbondata/core/util/DeleteLoadFolders.java    |   8 +-
 .../org/apache/carbondata/core/util/TrashUtil.java | 224 +++++++++++++
 .../carbondata/core/util/path/CarbonTablePath.java |  13 +-
 docs/clean-files.md                                |  46 +++
 docs/configuration-parameters.md                   |   2 +
 docs/dml-of-carbondata.md                          |   1 +
 .../management/CarbonCleanFilesCommand.scala       |  27 +-
 .../command/mutation/CarbonTruncateCommand.scala   |   1 +
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |   7 +-
 .../cleanfiles/TestCleanFileCommand.scala          | 349 ++++++++++++++++++++
 .../TestCleanFilesCommandPartitionTable.scala      | 353 +++++++++++++++++++++
 15 files changed, 1291 insertions(+), 13 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 14ad059..2def2f3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1415,6 +1415,34 @@ public final class CarbonCommonConstants {
   public static final String BITSET_PIPE_LINE_DEFAULT = "true";
 
   /**
+   * this is the user defined time(in days), timestamp subfolders in trash 
directory will take
+   * this value as retention time. They are deleted after this time.
+   */
+  @CarbonProperty
+  public static final String CARBON_TRASH_RETENTION_DAYS = 
"carbon.trash.retention.days";
+
+  /**
+   * Default retention time of a subdirectory in trash folder is 7 days.
+   */
+  public static final int CARBON_TRASH_RETENTION_DAYS_DEFAULT = 7;
+
+  /**
+   * Maximum allowed retention time of a subdirectory in trash folder is 365 
days.
+   */
+  public static final int CARBON_TRASH_RETENTION_DAYS_MAXIMUM = 365;
+
+  /**
+   * User defined property to check if clean files operation with force option 
is allowed.
+   */
+  @CarbonProperty
+  public static final String CARBON_CLEAN_FILES_FORCE_ALLOWED = 
"carbon.clean.file.force.allowed";
+
+  /**
+   * By default clean files operation with force option is not allowed.
+   */
+  public static final String CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT = 
"false";
+
+  /**
    * minimum required registered resource for starting block distribution
    */
   @CarbonProperty
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 8fc1da8..803fc40 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -772,7 +772,7 @@ public class SegmentFileStore {
    * @param ignoreStatus
    * @throws IOException
    */
-  private List<String> readIndexFiles(SegmentStatus status, boolean 
ignoreStatus,
+  public List<String> readIndexFiles(SegmentStatus status, boolean 
ignoreStatus,
       Configuration configuration) throws IOException {
     if (indexFilesMap != null) {
       return new ArrayList<>();
@@ -1194,7 +1194,7 @@ public class SegmentFileStore {
    * till year partition folder if there are no other folder or files present 
under each folder till
    * year partition
    */
-  private static void deleteEmptyPartitionFolders(CarbonFile path) {
+  public static void deleteEmptyPartitionFolders(CarbonFile path) {
     if (path != null && path.listFiles().length == 0) {
       FileFactory.deleteAllCarbonFilesOfDir(path);
       Path parentsLocation = new Path(path.getAbsolutePath()).getParent();
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index b5f92d9..e2b4e08 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -207,6 +207,9 @@ public final class CarbonProperties {
       case CarbonCommonConstants.CARBON_INDEX_SCHEMA_STORAGE:
         validateDMSchemaStorageProvider();
         break;
+      case CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS:
+        validateTrashFolderRetentionTime();
+        break;
       // TODO : Validation for carbon.lock.type should be handled for 
addProperty flow
       default:
         // none
@@ -275,6 +278,7 @@ public final class CarbonProperties {
     validateDetailQueryBatchSize();
     validateIndexServerSerializationThreshold();
     validateAndGetLocalDictionarySizeThresholdInMB();
+    validateTrashFolderRetentionTime();
   }
 
   /**
@@ -2086,6 +2090,44 @@ public final class CarbonProperties {
     return Math.abs(Integer.parseInt(thresholdValue));
   }
 
+  /**
+   * The below method sets the time(in days) for which timestamp folder 
retention in trash
+   * folder will take place
+   */
+  private void validateTrashFolderRetentionTime() {
+    String propertyValue = carbonProperties.getProperty(CarbonCommonConstants
+        .CARBON_TRASH_RETENTION_DAYS, Integer.toString(CarbonCommonConstants
+        .CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+    try {
+      int configuredValue = Integer.parseInt(propertyValue);
+      if (configuredValue < 0 || configuredValue > CarbonCommonConstants
+          .CARBON_TRASH_RETENTION_DAYS_MAXIMUM) {
+        LOGGER.warn("Value of " + 
CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS + " is" +
+            " invalid, taking default value instead");
+        
carbonProperties.setProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, 
Integer
+            
.toString(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+      } else {
+        
carbonProperties.setProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS,
+            propertyValue);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error("Invalid value configured for " + CarbonCommonConstants
+          .CARBON_TRASH_RETENTION_DAYS + ", considering the default value");
+      
carbonProperties.setProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, 
Integer
+          
.toString(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+    }
+  }
+
+  /**
+   * Check if the user has allowed the use of clean files command with force 
option.
+   */
+  public boolean isCleanFilesForceAllowed() {
+    String configuredValue =
+        getProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+        CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT);
+    return Boolean.parseBoolean(configuredValue);
+  }
+
   public static boolean isFilterReorderingEnabled() {
     return Boolean.parseBoolean(
         getInstance().getProperty(CarbonCommonConstants.CARBON_REORDER_FILTER,
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
new file mode 100644
index 0000000..6311d3a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil;
+
+import org.apache.log4j.Logger;
+
+/**
+ *This util provide clean stale data methods for clean files command
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the 
source folder after
+   * copying the data to the trash and also remove the .segment files of the 
stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+      throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegmentFiles = new ArrayList<>();
+    List<String> redundantSegmentFile = new ArrayList<>();
+    getStaleSegmentFiles(carbonTable, staleSegmentFiles, redundantSegmentFile);
+    for (String staleSegmentFile : staleSegmentFiles) {
+      String segmentNumber = 
DataFileUtil.getSegmentNoFromSegmentFile(staleSegmentFile);
+      SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+          staleSegmentFile);
+      Map<String, SegmentFileStore.FolderDetails> locationMap = 
fileStore.getSegmentFile()
+          .getLocationMap();
+      if (locationMap != null) {
+        if (locationMap.entrySet().iterator().next().getValue().isRelative()) {
+          CarbonFile segmentPath = 
FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(
+              carbonTable.getTablePath(), segmentNumber));
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentPath, 
TrashUtil.getCompleteTrashFolderPath(
+              carbonTable.getTablePath(), timeStampForTrashFolder, 
segmentNumber));
+          // Deleting the stale Segment folders and the segment file.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentPath);
+            // delete the segment file as well
+            
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+                staleSegmentFile));
+            for (String duplicateStaleSegmentFile : redundantSegmentFile) {
+              if 
(DataFileUtil.getSegmentNoFromSegmentFile(duplicateStaleSegmentFile)
+                  .equals(segmentNumber)) {
+                
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable
+                    .getTablePath(), duplicateStaleSegmentFile));
+              }
+            }
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentPath + " 
from after moving" +
+                " it to the trash folder. Please delete them manually : " + 
e.getMessage(), e);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete 
the source folders
+   * after copying the data to the trash and also remove the .segment files of 
the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable 
carbonTable)
+      throws IOException {
+    long timeStampForTrashFolder = CarbonUpdateUtil.readCurrentTime();
+    List<String> staleSegmentFiles = new ArrayList<>();
+    List<String> redundantSegmentFile = new ArrayList<>();
+    getStaleSegmentFiles(carbonTable, staleSegmentFiles, redundantSegmentFile);
+    for (String staleSegmentFile : staleSegmentFiles) {
+      String segmentNumber = 
DataFileUtil.getSegmentNoFromSegmentFile(staleSegmentFile);
+      // for each segment we get the indexfile first, then we get the 
carbondata file. Move both
+      // of those to trash folder
+      SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+          staleSegmentFile);
+      List<String> filesToProcess = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+          FileFactory.getConfiguration());
+
+      // get carbondata files from here
+      Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+      for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+        filesToProcess.addAll(entry.getValue());
+      }
+      // After all the files have been added to list, move them to the trash 
folder
+      TrashUtil.copyFilesToTrash(filesToProcess, 
TrashUtil.getCompleteTrashFolderPath(
+          carbonTable.getTablePath(), timeStampForTrashFolder, segmentNumber), 
segmentNumber);
+      // After every file of that segment has been copied, need to delete 
those files.
+      try {
+        for (String file : filesToProcess) {
+          FileFactory.deleteFile(file);
+        }
+        // Delete the segment file too
+        
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+            staleSegmentFile));
+        // remove duplicate segment files if any
+        for (String duplicateStaleSegmentFile : redundantSegmentFile) {
+          if 
(DataFileUtil.getSegmentNoFromSegmentFile(duplicateStaleSegmentFile)
+              .equals(segmentNumber)) {
+            
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+                duplicateStaleSegmentFile));
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.error("Error while deleting the source data files. Please 
delete the files of" +
+            " segment: " + segmentNumber + " manually.", e);
+      }
+    }
+    // Delete the empty partition folders
+    
deleteEmptyPartitionFoldersRecursively(FileFactory.getCarbonFile(carbonTable.getTablePath()));
+  }
+
+  /**
+   * This method will find all the stale segments by comparing the segment 
files in the
+   * metadata directory with the segments in the table status file. Any 
segment which has entry
+   * in the metadata folder and is not present in the table status file is 
considered as a
+   * stale segment. Only comparing from tablestatus file, not checking 
tablestatus.history file
+   */
+  private static void getStaleSegmentFiles(CarbonTable carbonTable, 
List<String> staleSegmentFiles,
+      List<String> redundantSegmentFile) {
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    List<String> segmentFiles = 
Arrays.stream(FileFactory.getCarbonFile(segmentFilesLocation)
+        .listFiles()).map(CarbonFile::getName).collect(Collectors.toList());
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFiles.size() == 0) {
+      return;
+    }
+    LoadMetadataDetails[] details = 
SegmentStatusManager.readLoadMetadata(carbonTable
+        .getMetadataPath());
+    Set<String> loadNameSet = 
Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
+        .collect(Collectors.toSet());
+    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> 
!loadNameSet.contains(
+        
DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+    if (staleSegments.size() == 0) {
+      return;
+    }
+    // sort the stale segment List
+    Collections.sort(staleSegments);
+    // in case of multiple segment files for a segment, add the segment with 
the largest
+    // timestamp to staleSegmentFiles list and add the others to 
redundantsegmentfile list.
+    for (int i = 0; i < staleSegments.size() - 1; i++) {
+      if 
(!DataFileUtil.getSegmentNoFromSegmentFile(staleSegments.get(i)).equals(
+          DataFileUtil.getSegmentNoFromSegmentFile(staleSegments.get(i + 1)))) 
{
+        staleSegmentFiles.add(staleSegments.get(i));
+      } else {
+        redundantSegmentFile.add(staleSegments.get(i));
+      }
+    }
+    // adding the last occurrence always
+    staleSegmentFiles.add(staleSegments.get(staleSegments.size() - 1));
+  }
+
+  /**
+   * This method will delete all the empty partition folders starting from the 
table path
+   */
+  private static void deleteEmptyPartitionFoldersRecursively(CarbonFile 
tablePath) {
+    CarbonFile[] listOfFiles = tablePath.listFiles();
+    if (listOfFiles.length == 0) {
+      tablePath.delete();
+    } else {
+      for (CarbonFile file: listOfFiles) {
+        if (file.isDirectory() && file.getName().contains("=")) {
+          deleteEmptyPartitionFoldersRecursively(file);
+        }
+      }
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java 
b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index c95412c..d509959 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -183,9 +183,8 @@ public final class DeleteLoadFolders {
         return true;
       }
       long deletionTime = oneLoad.getModificationOrDeletionTimestamp();
-
-      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
-
+      return TrashUtil.isTrashRetentionTimeoutExceeded(deletionTime) && 
CarbonUpdateUtil
+          .isMaxQueryTimeoutExceeded(deletionTime);
     }
 
     return false;
@@ -202,7 +201,8 @@ public final class DeleteLoadFolders {
       }
       long deletionTime = oneLoad.getModificationOrDeletionTimestamp();
 
-      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
+      return TrashUtil.isTrashRetentionTimeoutExceeded(deletionTime) && 
CarbonUpdateUtil
+          .isMaxQueryTimeoutExceeded(deletionTime);
 
     }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
new file mode 100644
index 0000000..abbb3c2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Maintains the trash folder in carbondata. This class has methods to copy 
data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param sourcePath      the path from which to copy the file
+   * @param destinationPath the path where the file will be copied
+   */
+  private static void copyToTrashFolder(String sourcePath, String 
destinationPath)
+    throws IOException {
+    DataOutputStream dataOutputStream = null;
+    DataInputStream dataInputStream = null;
+    try {
+      dataOutputStream = FileFactory.getDataOutputStream(destinationPath);
+      dataInputStream = FileFactory.getDataInputStream(sourcePath);
+      IOUtils.copyBytes(dataInputStream, dataOutputStream, 
CarbonCommonConstants.BYTEBUFFER_SIZE);
+    } catch (IOException exception) {
+      LOGGER.error("Unable to copy " + sourcePath + " to the trash folder", 
exception);
+      throw exception;
+    } finally {
+      CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+    }
+  }
+
+  /**
+   * The below method copies the complete a file to the trash folder.
+   *
+   * @param filePathToCopy           the files which are to be moved to the 
trash folder
+   * @param trashFolderWithTimestamp timestamp, partition folder(if any) and 
segment number
+   */
+  public static void copyFileToTrashFolder(String filePathToCopy,
+      String trashFolderWithTimestamp) throws IOException {
+    CarbonFile carbonFileToCopy = FileFactory.getCarbonFile(filePathToCopy);
+    String destinationPath = trashFolderWithTimestamp + CarbonCommonConstants
+        .FILE_SEPARATOR + carbonFileToCopy.getName();
+    try {
+      if (!FileFactory.isFileExist(destinationPath)) {
+        copyToTrashFolder(filePathToCopy, destinationPath);
+      }
+    } catch (IOException e) {
+      // in case there is any issue while copying the file to the trash 
folder, we need to delete
+      // the complete segment folder from the trash folder. The 
trashFolderWithTimestamp contains
+      // the segment folder too. Delete the folder as it is.
+      FileFactory.deleteFile(trashFolderWithTimestamp);
+      LOGGER.error("Error while checking trash folder: " + destinationPath + " 
or copying" +
+          " file: " + filePathToCopy + " to the trash folder at path", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. 
Here, the data files
+   * in segment are listed and copied one by one to the trash folder.
+   *
+   * @param segmentPath              the folder which are to be moved to the 
trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp 
and segment number
+   */
+  public static void copySegmentToTrash(CarbonFile segmentPath,
+      String trashFolderWithTimestamp) throws IOException {
+    try {
+      if (segmentPath.isFileExist()) {
+        if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+          FileFactory.mkdirs(trashFolderWithTimestamp);
+        }
+        CarbonFile[] dataFiles = segmentPath.listFiles();
+        for (CarbonFile carbonFile : dataFiles) {
+          copyFileToTrashFolder(carbonFile.getAbsolutePath(), 
trashFolderWithTimestamp);
+        }
+        LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " has been 
copied to" +
+            " the trash folder successfully. Total files copied: " + 
dataFiles.length);
+      } else {
+        LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " does not 
exist");
+      }
+    } catch (IOException e) {
+      LOGGER.error("Error while copying the segment: " + segmentPath.getName() 
+ " to the trash" +
+          " Folder: " + trashFolderWithTimestamp, e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method copies multiple files belonging to 1 segment to the 
trash folder.
+   *
+   * @param filesToCopy              absolute paths of the files to copy to 
the trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp 
and segment number
+   * @param segmentNumber            segment number of the files which are 
being copied to trash
+   */
+  public static void copyFilesToTrash(List<String> filesToCopy,
+      String trashFolderWithTimestamp, String segmentNumber) throws 
IOException {
+    try {
+      if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+        FileFactory.mkdirs(trashFolderWithTimestamp);
+      }
+      for (String fileToCopy : filesToCopy) {
+        // check if file exists before copying
+        if (FileFactory.isFileExist(fileToCopy)) {
+          copyFileToTrashFolder(fileToCopy, trashFolderWithTimestamp);
+        }
+      }
+      LOGGER.info("Segment: " + segmentNumber + " has been copied to" +
+          " the trash folder successfully");
+    } catch (IOException e) {
+      LOGGER.error("Error while copying files of segment: " + segmentNumber + 
" to the trash" +
+          " folder", e);
+      throw e;
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder 
which have expired as
+   * per the user defined retention time
+   */
+  public static void deleteExpiredDataFromTrash(String tablePath) {
+    String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the 
given timestamp.
+    try {
+      if (FileFactory.isFileExist(trashPath)) {
+        List<CarbonFile> timestampFolderList = 
FileFactory.getFolderList(trashPath);
+        for (CarbonFile timestampFolder : timestampFolderList) {
+          // If the timeStamp at which the timeStamp subdirectory has expired 
as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if 
(isTrashRetentionTimeoutExceeded(Long.parseLong(timestampFolder.getName()))) {
+            FileFactory.deleteAllCarbonFilesOfDir(timestampFolder);
+            LOGGER.info("Timestamp subfolder from the Trash folder deleted: " 
+ timestampFolder
+                .getAbsolutePath());
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Error during deleting expired timestamp folder from the 
trash folder", e);
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folder of 
a carbon table.
+   */
+  public static void emptyTrash(String tablePath) {
+    String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+    // if the trash folder exists delete the contents of the trash folder
+    try {
+      if (FileFactory.isFileExist(trashPath)) {
+        List<CarbonFile> carbonFileList = FileFactory.getFolderList(trashPath);
+        for (CarbonFile carbonFile : carbonFileList) {
+          FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Error while emptying the trash folder", e);
+    }
+  }
+
+  /**
+   * This will tell whether the trash retention time has expired or not
+   *
+   * @param fileTimestamp
+   * @return
+   */
+  public static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
+    // record current time.
+    long currentTime = CarbonUpdateUtil.readCurrentTime();
+    long retentionMilliSeconds = 
(long)Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, 
Integer.toString(
+          CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT))) * 
TimeUnit.DAYS
+        .toMillis(1);
+    long difference = currentTime - fileTimestamp;
+    return difference > retentionMilliSeconds;
+  }
+
+  /**
+   * This will give the complete path of the trash folder with the timestamp 
and the segment number
+   *
+   * @param tablePath          absolute table path
+   * @param timeStampSubFolder the timestamp for the clean files operation
+   * @param segmentNumber      the segment number for which files are moved to 
the trash folder
+   */
+  public static String getCompleteTrashFolderPath(String tablePath, long 
timeStampSubFolder,
+      String segmentNumber) {
+    return CarbonTablePath.getTrashFolderPath(tablePath) + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath
+      .SEGMENT_PREFIX + segmentNumber;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 1b43acc..40bf3f8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -45,8 +45,10 @@ public class CarbonTablePath {
   private static final String PARTITION_PREFIX = "Part";
   private static final String DATA_PART_PREFIX = "part-";
   public static final String BATCH_PREFIX = "_batchno";
+  public static final String TRASH_DIR = ".Trash";
   private static final String LOCK_DIR = "LockFiles";
 
+  public static final String SEGMENTS_METADATA_DIR = "segments";
   public static final String TABLE_STATUS_FILE = "tablestatus";
   public static final String TABLE_STATUS_HISTORY_FILE = "tablestatus.history";
   public static final String CARBON_DATA_EXT = ".carbondata";
@@ -728,15 +730,16 @@ public class CarbonTablePath {
    * Get the segment file locations of table
    */
   public static String getSegmentFilesLocation(String tablePath) {
-    return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + 
"segments";
+    return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR +
+        SEGMENTS_METADATA_DIR;
   }
 
   /**
    * Get the segment file path of table
    */
   public static String getSegmentFilePath(String tablePath, String 
segmentFileName) {
-    return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + 
"segments"
-        + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName;
+    return getSegmentFilesLocation(tablePath) + 
CarbonCommonConstants.FILE_SEPARATOR +
+        segmentFileName;
   }
 
   /**
@@ -792,4 +795,8 @@ public class CarbonTablePath {
       return dataFilePath;
     }
   }
+
+  public static String getTrashFolderPath(String carbonTablePath) {
+    return carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + TRASH_DIR;
+  }
 }
diff --git a/docs/clean-files.md b/docs/clean-files.md
new file mode 100644
index 0000000..14ffb60
--- /dev/null
+++ b/docs/clean-files.md
@@ -0,0 +1,46 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more 
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership. 
+    The ASF licenses this file to you under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with 
+    the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing, software 
+    distributed under the License is distributed on an "AS IS" BASIS, 
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and 
+    limitations under the License.
+-->
+
+
+## CLEAN FILES
+
+Clean files command is used to remove the Compacted, Marked For Delete ,In 
Progress which are stale and partial(Segments which are missing from the table 
status file but their data is present)
+ segments from the store.
+
+ Clean Files Command
+   ```
+   CLEAN FILES FOR TABLE TABLE_NAME
+   ```
+
+
+### TRASH FOLDER
+
+  Carbondata supports a Trash Folder which is used as a redundant folder where 
all stale(segments whose entry is not in tablestatus file) carbondata segments 
are moved to during clean files operation.
+  This trash folder is mantained inside the table path and is a hidden 
folder(.Trash). The segments that are moved to the trash folder are mantained 
under a timestamp 
+  subfolder(each clean files operation is represented by a timestamp). This 
helps the user to list down segments in the trash folder by timestamp.  By 
default all the timestamp sub-directory have an expiration
+  time of 7 days(since the timestamp it was created) and it can be configured 
by the user using the following carbon property. The supported values are 
between 0 and 365(both included.)
+   ```
+   carbon.trash.retention.days = "Number of days"
+   ``` 
+  Once the timestamp subdirectory is expired as per the configured expiration 
day value, that subdirectory is deleted from the trash folder in the subsequent 
clean files command.
+
+### FORCE DELETE TRASH
+The force option with clean files command deletes all the files and folders 
from the trash folder.
+
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME options('force'='true')
+  ```
+Since Clean Files operation with force option will delete data that can never 
be recovered, the force option by default is disabled. Clean files with force 
option is only allowed when the carbon property 
```carbon.clean.file.force.allowed``` is set to true. The default value of this 
property is false.
\ No newline at end of file
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index b79fd2b..0baafeb 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -49,6 +49,8 @@ This section provides the details of all the configurations 
required for the Car
 | carbon.fs.custom.file.provider | None | To support FileTypeInterface for 
configuring custom CarbonFile implementation to work with custom FileSystem. |
 | carbon.timeseries.first.day.of.week | SUNDAY | This parameter configures 
which day of the week to be considered as first day of the week. Because first 
day of the week will be different in different parts of the world. |
 | carbon.enable.tablestatus.backup | false | In cloud object store scenario, 
overwriting table status file is not an atomic operation since it uses rename 
API. Thus, it is possible that table status is corrupted if process crashed 
when overwriting the table status file. To protect from file corruption, user 
can enable this property. |
+| carbon.trash.retention.days | 7 | This parameter specifies the number of 
days after which the timestamp based subdirectories are expired in the trash 
folder. Allowed Min value = 0, Allowed Max Value = 365 days|
+| carbon.clean.file.force.allowed | false | This paramter specifies if the 
clean files operation with force option is allowed or not.|
 
 ## Data Loading Configuration
 
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index 278ccf8..405d7c8 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -27,6 +27,7 @@ CarbonData DML statements are documented here,which includes:
 * [UPDATE AND DELETE](#update-and-delete)
 * [COMPACTION](#compaction)
 * [SEGMENT MANAGEMENT](./segment-management-on-carbondata.md)
+* [CLEAN FILES](clean-files.md)
 
 
 ## LOAD DATA
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 6c516d6..fe8f98b 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -29,10 +29,10 @@ import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.ConcurrentOperationException
-import org.apache.carbondata.core.index.IndexStoreManager
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CleanFilesUtil, 
TrashUtil}
 import org.apache.carbondata.events._
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.view.MVManagerInSpark
@@ -48,13 +48,18 @@ import org.apache.carbondata.view.MVManagerInSpark
 case class CarbonCleanFilesCommand(
     databaseNameOp: Option[String],
     tableName: Option[String],
+    options: Option[List[(String, String)]],
     forceTableClean: Boolean = false,
     isInternalCleanCall: Boolean = false,
     truncateTable: Boolean = false)
   extends AtomicRunnableCommand {
 
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
   var carbonTable: CarbonTable = _
   var cleanFileCommands: List[CarbonCleanFilesCommand] = List.empty
+  val optionsMap = options.getOrElse(List.empty[(String, String)]).toMap
+  // forceClean will empty trash
+  val forceClean = optionsMap.getOrElse("force", "false").toBoolean
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName.get)(sparkSession)
@@ -71,6 +76,7 @@ case class CarbonCleanFilesCommand(
           CarbonCleanFilesCommand(
             Some(relationIdentifier.getDatabaseName),
             Some(relationIdentifier.getTableName),
+            options,
             isInternalCleanCall = true)
       }.toList
       commands.foreach(_.processMetadata(sparkSession))
@@ -91,6 +97,25 @@ case class CarbonCleanFilesCommand(
     OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, 
operationContext)
     if (tableName.isDefined) {
       Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
+      if (forceClean) {
+        // empty the trash folder
+        if (CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+          TrashUtil.emptyTrash(carbonTable.getTablePath)
+        } else {
+          LOGGER.error("Clean Files with Force option deletes the physical 
data and it cannot be" +
+              " recovered. It is disabled by default, to enable clean files 
with force option," +
+              " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED 
+ " to true")
+            throw new RuntimeException("Clean files with force operation not 
permitted by default")
+        }
+      } else {
+        // clear trash based on timestamp
+        TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+      }
+      if (carbonTable.isHivePartitionTable) {
+        CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
+      } else {
+        CleanFilesUtil.cleanStaleSegments(carbonTable)
+      }
       if (forceTableClean) {
         deleteAllData(sparkSession, databaseNameOp, tableName.get)
       } else {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
index 1963fbe..ab481f8 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
@@ -48,6 +48,7 @@ case class CarbonTruncateCommand(child: TruncateTableCommand) 
extends DataComman
     CarbonCleanFilesCommand(
       databaseNameOp = Option(dbName),
       tableName = Option(tableName),
+      None,
       truncateTable = true
     ).run(sparkSession)
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 2ee3c86..dc385e6 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -514,11 +514,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
   protected lazy val cleanFiles: Parser[LogicalPlan] =
-    CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
-      case databaseName ~ tableName =>
+    CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
+      (OPTIONS ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ {
+      case databaseName ~ tableName ~ optionList =>
         CarbonCleanFilesCommand(
           CarbonParserUtil.convertDbNameToLowerCase(databaseName),
-          Option(tableName.toLowerCase()))
+          Option(tableName.toLowerCase()), optionList)
     }
 
   protected lazy val explainPlan: Parser[LogicalPlan] =
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
new file mode 100644
index 0000000..3618274
--- /dev/null
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete and 
Compacted segments") {
+    // do not send MFD folders to trash
+    createTable()
+    loadData()
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(4)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 5)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with stale segments") {
+    createTable()
+    loadData()
+    sql(s"""alter table cleantest compact 'minor'""")
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2, "name"""")
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(5)))
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4 segments are made as stale segments and should be moved to trash
+    deleteTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // recovering data from trash folder
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ "0.1"
+    val segment4Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ '4'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment4Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    // test after recovering data from trash
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(5)))
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    intercept[RuntimeException] {
+      sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    }
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("clean up maintable table and test trash folder with SI with stale 
segments") {
+    createTable()
+    loadData()
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest(add) as 'carbondata' """)
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(4)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext
+      .sparkSession).getTablePath
+    deleteTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = 
CarbonTablePath.getTrashFolderPath(mainTablePath)
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 8)
+
+    // recovering data from trash folder
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+    val segment0Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '2'
+    val segment3Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '3'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    intercept[RuntimeException] {
+      sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    }
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("test trash folder with 2 segments with same segment number") {
+    createTable()
+    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4  segments are made as stale segments, they should be moved to the 
trash folder
+    deleteTableStatusFile(path)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 2)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    intercept[RuntimeException] {
+      sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    }
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("test carbon.trash.retenion.property") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, "0")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4 segments are made as stale segments and should be moved to trash
+    deleteTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS)
+  }
+
+  def editTableStatusFile(carbonTablePath: String) : Unit = {
+    // original table status file
+    val f1 = new File(CarbonTablePath.getTableStatusFilePath(carbonTablePath))
+    val f2 = new File(CarbonTablePath.getMetadataPath(carbonTablePath) + 
CarbonCommonConstants
+        .FILE_SEPARATOR + CarbonCommonConstants.FILE_SEPARATOR + "tmp")
+    val w = new PrintWriter(f2)
+    val bufferedSource = Source.fromFile(f1)
+      bufferedSource.getLines
+      .map { x =>
+        x.replaceAll("Success", "In Progress")
+      }
+      // scalastyle:off println
+      .foreach(x => w.println(x))
+    // scalastyle:on println
+    bufferedSource.close()
+    w.close()
+    f2.renameTo(f1)
+  }
+
+  def getFileCountInTrashFolder(dirPath: String) : Int = {
+    val fileName = new File(dirPath)
+    val files = fileName.listFiles()
+    if (files != null) {
+      files.foreach(file => {
+        if (file.isFile) {
+          count = count + 1
+        }
+        if (file.isDirectory()) {
+          getFileCountInTrashFolder(file.getAbsolutePath())
+        }
+      })
+    }
+    count
+  }
+
+  def getTimestampFolderName(trashPath: String) : String = {
+    val timeStampList = FileFactory.getFolderList(trashPath)
+    timeStampList.get(0).getName
+  }
+
+  def deleteTableStatusFile(carbonTablePath: String) : Unit = {
+    val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + 
"Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tablestatus")  // Original File
+    f1.delete()
+  }
+
+  def createTable() : Unit = {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int, add String)
+        | STORED AS carbondata
+      """.stripMargin)
+  }
+
+  def loadData() : Unit = {
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1, "name"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1, "name"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1, "name"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1, "name"""")
+  }
+
+}
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
new file mode 100644
index 0000000..8954b12
--- /dev/null
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFilesCommandPartitionTable extends QueryTest with 
BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.TRASH_DIR
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete and 
Compacted segments") {
+    // do not send MFD folders to trash
+    createParitionTable()
+    loadData()
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(4)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 5)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("test trash folder with 2 segments with same segment number") {
+    createParitionTable()
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    deleteTableStatusFile(path)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 2)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    intercept[RuntimeException] {
+      sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    }
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with stale segments") {
+    sql("""DROP TABLE IF EXISTS C1""")
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    // All 4  segments are made as stale segments, they should be moved to the 
trash folder
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // test recovery from partition table
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_0"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_1"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment2Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_2"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment3Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_3"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+
+    sql("""DROP TABLE IF EXISTS C1""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+
+  test("clean up table and test trash folder with stale segments part 2") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS C1""")
+
+    sql("create table cleantest(" +
+      "value int) partitioned by (name string, age int) stored as carbondata")
+    sql("insert into cleantest values (30, 'amy', 12), (40, 'bob', 13)")
+    sql("insert into cleantest values (30, 'amy', 20), (10, 'bob', 13)")
+    sql("insert into cleantest values (30, 'cat', 12), (40, 'dog', 13)")
+
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    // All 4  segments are made as stale segments, they should be moved to the 
trash folder
+    // createStaleSegments(path)
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // test recovery from partition table
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_0"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_1"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment2Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_2"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(6)))
+    checkAnswer(sql(s"""select count(*) from cleantest where age=13"""),
+      Seq(Row(3)))
+
+    sql("""DROP TABLE IF EXISTS C1""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up maintable table and test trash folder with SI with stale 
segments") {
+    createParitionTable()
+    loadData()
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest(name) as 'carbondata' """)
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(4)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext
+      .sparkSession).getTablePath
+    deleteTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = mainTablePath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonTablePath.TRASH_DIR
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 8)
+
+    // recovering data from trash folder
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+
+    val segment0Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '2'
+    val segment3Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '3'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    intercept[RuntimeException] {
+      sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    }
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+    sql("show segments for table cleantest").show()
+    sql("show segments for table si_cleantest").show()
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  def editTableStatusFile(carbonTablePath: String) : Unit = {
+    // Original Table status file
+    val f1 = new File(CarbonTablePath.getTableStatusFilePath(carbonTablePath))
+    // duplicate
+    val f2 = new File(CarbonTablePath.getMetadataPath(carbonTablePath) + 
CarbonCommonConstants
+      .FILE_SEPARATOR + CarbonCommonConstants.FILE_SEPARATOR + "tmp")
+    val w = new PrintWriter(f2)
+    val bufferedSource = Source.fromFile(f1)
+      bufferedSource.getLines
+      .map { x =>
+        x.replaceAll("Success", "In Progress")
+      }
+      // scalastyle:off println
+      .foreach(x => w.println(x))
+    // scalastyle:on println
+    bufferedSource.close
+    w.close()
+    f2.renameTo(f1)
+  }
+
+  def getFileCountInTrashFolder(dirPath: String): Int = {
+    val fileName = new File(dirPath)
+    val files = fileName.listFiles()
+    if (files != null) {
+      files.foreach(file => {
+        if (file.isFile) {
+          count = count + 1
+        }
+        if (file.isDirectory()) {
+          getFileCountInTrashFolder(file.getAbsolutePath())
+        }
+      })
+    }
+    count
+  }
+
+  def getTimestampFolderName(trashPath: String) : String = {
+    val timeStampList = FileFactory.getFolderList(trashPath)
+    timeStampList.get(0).getName
+  }
+
+  def deleteTableStatusFile(carbonTablePath: String) : Unit = {
+    val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + 
"Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tablestatus")  // Original File
+    f1.delete()
+  }
+
+  def createParitionTable() : Unit = {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql(
+      """
+        | CREATE TABLE CLEANTEST (id Int, id1 INT, name STRING ) PARTITIONED 
BY (add String)
+        | STORED AS carbondata
+      """.stripMargin)
+  }
+
+  def loadData() : Unit = {
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"bob","abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"jack","abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"johnny","adc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"Reddit","adc"""")
+  }
+}

Reply via email to