akashrn5 commented on a change in pull request #4005:
URL: https://github.com/apache/carbondata/pull/4005#discussion_r531089568



##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2086,6 +2087,34 @@ public int getMaxSIRepairLimit(String dbName, String 
tableName) {
     return Math.abs(Integer.parseInt(thresholdValue));
   }
 
+  /**
+   * The below method returns the time(in milliseconds) for which timestamp 
folder retention in
+   * trash folder will take place.
+   */
+  public long getTrashFolderRetentionTime() {
+    String propertyValue = 
getProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS);

Review comment:
       instead of this, just call `getProperty` with default value also, then 
all these null checks are not needed

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for 
clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the 
source folder after
+   * copying the data to the trash and also remove the .segment files of the 
stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = 
fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = 
FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + 
fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, 
CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + 
CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " 
from after moving" +
+                " it to the trash folder : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete 
the source folders
+   * after copying the data to the trash and also remove the .segment files of 
the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable 
carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the 
carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String file : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(file, 
CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + 
timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(file));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) 
{
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, 
CarbonTablePath.getTrashFolderPath(carbonTable
+                .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + 
timeStampForTrashFolder
+                + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.SEGMENT_PREFIX
+                + segmentNumber);
+            filesToDelete.add(FileFactory.getCarbonFile(file));
+          }
+        }
+        // Delete the segment file too
+        
filesToDelete.add(FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilePath(carbonTable
+            .getTablePath(), staleSegment)));
+        // After every file of that segment has been copied, need to delete 
those files.
+        LOGGER.info("Segment number: " + segmentNumber + "has been 
successfully copied to the" +
+            " trash folder");
+        try {
+          for (CarbonFile file : filesToDelete) {
+            FileFactory.deleteFile(file.getAbsolutePath());
+            // deleting empty  partition folders of partition table
+            
SegmentFileStore.deleteEmptyPartitionFolders(FileFactory.getCarbonFile(new 
Path(file
+                .getAbsolutePath()).getParent().toString()));
+          }
+          filesToDelete.clear();
+        } catch (IOException e) {
+          LOGGER.error("Error while deleting the source data files", e);
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will find all the stale segments by comparing the segment 
files in the
+   * metadata directory with the segments in the table status file. Any 
segment which has entry
+   * in the metadata folder and is not present in the table status file is 
considered as a
+   * stale segment. Only comparing from tablestatus file, not checking 
tablestatus.history file
+   */
+  private static List<String> getStaleSegments(CarbonTable carbonTable) {
+    // details contains segments in the tablestatus file, and all segments 
contains segments files.
+    // Segment number from those segment files is extracted and Stale segement 
file name is
+    // returned.
+    String metaDataLocation = carbonTable.getMetadataPath();
+    String segmentFilesLocation =
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
+    CarbonFile[] segmentFilesList = 
FileFactory.getCarbonFile(segmentFilesLocation).listFiles();
+    ArrayList<String> staleSegmentList = new 
ArrayList<>(segmentFilesList.length);
+    // there are no segments present in the Metadata folder. Can return here
+    if (segmentFilesList.length == 0) {
+      return staleSegmentList;
+    }
+    LoadMetadataDetails[] details = 
SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    Set<String> loadNameSet = new HashSet<>(details.length);
+    for (LoadMetadataDetails detail : details) {

Review comment:
       can you use functional way of java to collect the loadname and to covert 
to set, instead of traditional for loops

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete segments") 
{
+    // do not send MFD folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 1)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with compaction") {
+    // do not send compacted folders to trash
+    createTable()
+    loadData()
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 4)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with stale segments") {
+    createTable()
+    loadData()
+    sql(s"""alter table cleantest compact 'minor'""")
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2, "name"""")
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(5)))
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4 segments are made as stale segments and should be moved to trash
+    deleteTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // recovering data from trash folder
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ "0.1"
+    val segment4Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ '4'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment4Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    // test after recovering data from trash
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(5)))
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("clean up maintable table and test trash folder with SI with stale 
segments") {
+    createTable()
+    loadData()
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest(add) as 'carbondata' """)
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(4)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext
+      .sparkSession).getTablePath
+    deleteTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = 
CarbonTablePath.getTrashFolderPath(mainTablePath)
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 8)
+
+    // recovering data from trash folder
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+    val segment0Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '2'
+    val segment3Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '3'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("test trash folder with 2 segments with same segment number") {
+    createTable()
+    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4  segments are made as stale segments, they should be moved to the 
trash folder
+    deleteTableStatusFile(path)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 2)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("test carbon.trash.retenion.property") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, "0")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4 segments are made as stale segments and should be moved to trash
+    deleteTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS)
+  }
+
+  def editTableStatusFile(carbonTablePath: String) : Unit = {
+    val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + 
"Metadata" +

Review comment:
       please use direct APIs only, instead of making path on your own.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for 
clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the 
source folder after
+   * copying the data to the trash and also remove the .segment files of the 
stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = 
fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = 
FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + 
fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, 
CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + 
CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " 
from after moving" +
+                " it to the trash folder : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete 
the source folders
+   * after copying the data to the trash and also remove the .segment files of 
the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable 
carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the 
carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String file : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(file, 
CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + 
timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(file));
+        }
+        // get carbondata files from here
+        Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+        for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) 
{
+          for (String file : entry.getValue()) {
+            // copy the carbondata file to trash
+            TrashUtil.copyFileToTrashFolder(file, 
CarbonTablePath.getTrashFolderPath(carbonTable

Review comment:
       instead of copy both index and data files separately, collect all and 
send once to copy all, and then based on boolean return , you can add all files 
to delete and then delete on success.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for 
clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the 
source folder after
+   * copying the data to the trash and also remove the .segment files of the 
stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = 
fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = 
FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + 
fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, 
CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + 
CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " 
from after moving" +
+                " it to the trash folder : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete 
the source folders
+   * after copying the data to the trash and also remove the .segment files of 
the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable 
carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the 
carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,

Review comment:
       here read index files method reading the file footer also, which is not 
required for your use, it will add extra time and IO which is unnecessary in 
this scenario, so please check for other APIs and just get the indexfilenames 
you need without doing any extra operations.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for 
clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the 
source folder after
+   * copying the data to the trash and also remove the .segment files of the 
stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = 
fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = 
FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + 
fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, 
CarbonTablePath.getTrashFolderPath(

Review comment:
       i think its better if `copySegmentToTrash` return true or false based on 
copy success or failure, then based on that return value, you can decide 
whether to delete source folders or not

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for 
clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the 
source folder after
+   * copying the data to the trash and also remove the .segment files of the 
stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = 
fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = 
FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + 
fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, 
CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + 
CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " 
from after moving" +
+                " it to the trash folder : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete 
the source folders
+   * after copying the data to the trash and also remove the .segment files of 
the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable 
carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the 
carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String file : indexOrMergeFiles) {

Review comment:
       ```suggestion
           for (String indexFile: indexOrMergeFiles) {
   ```

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the clean files command in carbondata. This class has methods for 
clean files
+ * operation.
+ */
+public class CleanFilesUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CleanFilesUtil.class.getName());
+
+  /**
+   * This method will clean all the stale segments for a table, delete the 
source folder after
+   * copying the data to the trash and also remove the .segment files of the 
stale segments
+   */
+  public static void cleanStaleSegments(CarbonTable carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        Map<String, SegmentFileStore.FolderDetails> locationMap = 
fileStore.getSegmentFile()
+            .getLocationMap();
+        if (locationMap != null) {
+          CarbonFile segmentLocation = 
FileFactory.getCarbonFile(carbonTable.getTablePath() +
+              CarbonCommonConstants.FILE_SEPARATOR + 
fileStore.getSegmentFile().getLocationMap()
+              .entrySet().iterator().next().getKey());
+          // copy the complete segment to the trash folder
+          TrashUtil.copySegmentToTrash(segmentLocation, 
CarbonTablePath.getTrashFolderPath(
+              carbonTable.getTablePath()) + 
CarbonCommonConstants.FILE_SEPARATOR +
+              timeStampForTrashFolder + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath
+              .SEGMENT_PREFIX + segmentNumber);
+          // Deleting the stale Segment folders.
+          try {
+            CarbonUtil.deleteFoldersAndFiles(segmentLocation);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Unable to delete the segment: " + segmentNumber + " 
from after moving" +
+                " it to the trash folder : " + e.getMessage(), e);
+          }
+          // delete the segment file as well
+          
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+              staleSegment));
+        }
+      }
+      staleSegments.clear();
+    }
+  }
+
+  /**
+   * This method will clean all the stale segments for partition table, delete 
the source folders
+   * after copying the data to the trash and also remove the .segment files of 
the stale segments
+   */
+  public static void cleanStaleSegmentsForPartitionTable(CarbonTable 
carbonTable)
+    throws IOException {
+    long timeStampForTrashFolder = System.currentTimeMillis();
+    List<String> staleSegments = getStaleSegments(carbonTable);
+    if (staleSegments.size() > 0) {
+      for (String staleSegment : staleSegments) {
+        String segmentNumber = 
staleSegment.split(CarbonCommonConstants.UNDERSCORE)[0];
+        // for each segment we get the indexfile first, then we get the 
carbondata file. Move both
+        // of those to trash folder
+        List<CarbonFile> filesToDelete = new ArrayList<>();
+        SegmentFileStore fileStore = new 
SegmentFileStore(carbonTable.getTablePath(),
+            staleSegment);
+        List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+            FileFactory.getConfiguration());
+        for (String file : indexOrMergeFiles) {
+          // copy the index or merge file to the trash folder
+          TrashUtil.copyFileToTrashFolder(file, 
CarbonTablePath.getTrashFolderPath(carbonTable
+              .getTablePath()) + CarbonCommonConstants.FILE_SEPARATOR + 
timeStampForTrashFolder +
+              CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.SEGMENT_PREFIX +
+              segmentNumber);
+          filesToDelete.add(FileFactory.getCarbonFile(file));

Review comment:
       same as above comment of returning boolean from copy method to take 
decision to delete the files.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
##########
@@ -792,4 +795,9 @@ public static String getParentPath(String dataFilePath) {
       return dataFilePath;
     }
   }
+
+  public static String getTrashFolderPath(String carbonTablePath) {
+    return carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath
+      .TRASH_DIR;

Review comment:
       just use constant, as its in same class, remove `CarbonTablePath`

##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Mantains the trash folder in carbondata. This class has methods to copy 
data to the trash and
+ * remove data from the trash.
+ */
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(TrashUtil.class.getName());
+
+  /**
+   * Base method to copy the data to the trash folder.
+   *
+   * @param fromPath the path from which to copy the file
+   * @param toPath  the path where the file will be copied
+   * @return
+   */
+  private static void copyToTrashFolder(String fromPath, String toPath) throws 
IOException {

Review comment:
       can you rename to src and destination? it looks more clear

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete segments") 
{
+    // do not send MFD folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 1)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with compaction") {
+    // do not send compacted folders to trash
+    createTable()
+    loadData()
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 4)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with stale segments") {
+    createTable()
+    loadData()
+    sql(s"""alter table cleantest compact 'minor'""")
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2, "name"""")
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(5)))
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4 segments are made as stale segments and should be moved to trash
+    deleteTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // recovering data from trash folder
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ "0.1"
+    val segment4Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ '4'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment4Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    // test after recovering data from trash
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(5)))
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("clean up maintable table and test trash folder with SI with stale 
segments") {
+    createTable()
+    loadData()
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest(add) as 'carbondata' """)
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(4)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext
+      .sparkSession).getTablePath
+    deleteTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = 
CarbonTablePath.getTrashFolderPath(mainTablePath)
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 8)
+
+    // recovering data from trash folder
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+    val segment0Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '2'
+    val segment3Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '3'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("test trash folder with 2 segments with same segment number") {
+    createTable()
+    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4  segments are made as stale segments, they should be moved to the 
trash folder
+    deleteTableStatusFile(path)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 2)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("test carbon.trash.retenion.property") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, "0")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4 segments are made as stale segments and should be moved to trash
+    deleteTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS)
+  }
+
+  def editTableStatusFile(carbonTablePath: String) : Unit = {
+    val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + 
"Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tablestatus")  // Original File
+    val f2 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + 
"Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tmp") // Temporary File
+    val w = new PrintWriter(f2)
+    Source.fromFile(f1).getLines
+      .map { x =>
+        x.replaceAll("Success", "In Progress")
+      }
+      // scalastyle:off println
+      .foreach(x => w.println(x))
+    // scalastyle:on println
+    w.close()
+    f2.renameTo(f1)
+  }
+
+
+  def getFileCountInTrashFolder(dirPath: String) : Int = {

Review comment:
       please reformat the code style of this method

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete segments") 
{
+    // do not send MFD folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 1)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with compaction") {
+    // do not send compacted folders to trash
+    createTable()
+    loadData()
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 4)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with stale segments") {
+    createTable()
+    loadData()
+    sql(s"""alter table cleantest compact 'minor'""")
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2, "name"""")
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(5)))
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4 segments are made as stale segments and should be moved to trash
+    deleteTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // recovering data from trash folder
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ "0.1"
+    val segment4Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ '4'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment4Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    // test after recovering data from trash
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(5)))
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("clean up maintable table and test trash folder with SI with stale 
segments") {
+    createTable()
+    loadData()
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest(add) as 'carbondata' """)
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(4)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext
+      .sparkSession).getTablePath
+    deleteTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = 
CarbonTablePath.getTrashFolderPath(mainTablePath)
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 8)
+
+    // recovering data from trash folder
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+    val segment0Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '2'
+    val segment3Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '3'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("test trash folder with 2 segments with same segment number") {
+    createTable()
+    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4  segments are made as stale segments, they should be moved to the 
trash folder
+    deleteTableStatusFile(path)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 2)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+  test("test carbon.trash.retenion.property") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, "0")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4 segments are made as stale segments and should be moved to trash
+    deleteTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS)
+  }
+
+  def editTableStatusFile(carbonTablePath: String) : Unit = {
+    val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + 
"Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tablestatus")  // Original File
+    val f2 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + 
"Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tmp") // Temporary File
+    val w = new PrintWriter(f2)
+    Source.fromFile(f1).getLines

Review comment:
       at the end close this source

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestCleanFilesCommandPartitionTable extends QueryTest with 
BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with IN PROGRESS segments") {
+    // do not send the segment folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.TRASH_DIR
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with Marked For Delete segments") 
{
+    // do not send MFD folders to trash
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 1)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with compaction") {
+    // do not send compacted folders to trash
+    createParitionTable()
+    loadData()
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == segmentNumber2 + 4)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    val list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+
+
+  test("test trash folder with 2 segments with same segment number") {
+    createParitionTable()
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath.TRASH_DIR
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    deleteTableStatusFile(path)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 2)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up table and test trash folder with stale segments") {
+    sql("""DROP TABLE IF EXISTS C1""")
+    createParitionTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    // All 4  segments are made as stale segments, they should be moved to the 
trash folder
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // test recovery from partition table
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_0"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_1"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment2Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_2"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment3Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_3"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+
+    sql("""DROP TABLE IF EXISTS C1""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+
+  test("clean up table and test trash folder with stale segments part 2") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS C1""")
+
+    sql("create table cleantest(" +
+      "value int) partitioned by (name string, age int) stored as carbondata")
+    sql("insert into cleantest values (30, 'amy', 12), (40, 'bob', 13)")
+    sql("insert into cleantest values (30, 'amy', 20), (10, 'bob', 13)")
+    sql("insert into cleantest values (30, 'cat', 12), (40, 'dog', 13)")
+
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    // All 4  segments are made as stale segments, they should be moved to the 
trash folder
+    // createStaleSegments(path)
+    deleteTableStatusFile(path)
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+    // test recovery from partition table
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_0"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_1"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    val segment2Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      "/Segment_2"
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(6)))
+    checkAnswer(sql(s"""select count(*) from cleantest where age=13"""),
+      Seq(Row(3)))
+
+    sql("""DROP TABLE IF EXISTS C1""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  test("clean up maintable table and test trash folder with SI with stale 
segments") {
+    createParitionTable()
+    loadData()
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest(name) as 'carbondata' """)
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(4)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext
+      .sparkSession).getTablePath
+    deleteTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = mainTablePath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonTablePath.TRASH_DIR
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 8)
+
+    // recovering data from trash folder
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+
+    val segment0Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '2'
+    val segment3Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '3'
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
+    sql("INSERT INTO cleantest select * from c1").show()
+    sql("drop table c1")
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(4)))
+    sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+    sql("show segments for table cleantest").show()
+    sql("show segments for table si_cleantest").show()
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+  def editTableStatusFile(carbonTablePath: String) : Unit = {
+    val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + 
"Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tablestatus")  // Original File
+    val f2 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + 
"Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tmp") // Temporary File
+    val w = new PrintWriter(f2)
+    Source.fromFile(f1).getLines
+      .map { x =>
+        x.replaceAll("Success", "In Progress")
+      }
+      // scalastyle:off println
+      .foreach(x => w.println(x))
+    // scalastyle:on println
+    w.close()
+    f2.renameTo(f1)
+  }
+
+
+  def getFileCountInTrashFolder(dirPath: String) : Int = {

Review comment:
       reformat the method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to