This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new d9f69ae [CARBONDATA-4110] Support clean files dry run operation and
show statistics after clean files operation
d9f69ae is described below
commit d9f69aef5affad6d687820c0642d4f3e1655f217
Author: Vikram Ahuja <[email protected]>
AuthorDate: Thu Jan 7 11:19:54 2021 +0530
[CARBONDATA-4110] Support clean files dry run operation and show statistics
after clean files
operation
Why is this PR needed?
Currently in the clean files operation the user does not know how much
space will be freed.
The idea is the add support for dry run in clean files which can tell the
user how much space
will be freed in the clean files operation without cleaning the actual data.
What changes were proposed in this PR?
This PR has the following changes:
1. Support dry run in clean files: It will show the user how much space
will be freed by the
clean files operation and how much space left (which can be released
after expiration time)
after the clean files operation.
2. Clean files output: Total size released during the clean files operation
3. Disable clean files Statistics option in case the user does not want
clean files statistics
4. Clean files log: To enhance the clean files log to print the name of
every file that is being
deleted in the info log.
This closes #4072
---
.../carbondata/core/metadata/SegmentFileStore.java | 11 +-
.../carbondata/core/mutate/CarbonUpdateUtil.java | 2 +-
.../core/statusmanager/SegmentStatusManager.java | 22 ++-
.../carbondata/core/util/CleanFilesUtil.java | 10 ++
.../carbondata/core/util/DeleteLoadFolders.java | 2 +-
.../org/apache/carbondata/core/util/TrashUtil.java | 56 +++++++-
docs/clean-files.md | 39 +++++-
.../TestCreateIndexForCleanAndDeleteSegment.scala | 16 ++-
.../apache/carbondata/trash/DataTrashManager.scala | 154 +++++++++++++++++++--
.../management/CarbonCleanFilesCommand.scala | 55 ++++++--
.../scala/org/apache/spark/util/CleanFiles.scala | 1 +
.../cleanfiles/TestCleanFileCommand.scala | 76 ++++++++--
.../TestCleanFilesCommandPartitionTable.scala | 20 ++-
.../FlatFolderTableLoadingTestCase.scala | 8 +-
14 files changed, 428 insertions(+), 44 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 0a26b5e..869cc5c 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -1125,17 +1125,23 @@ public class SegmentFileStore {
List<String> indexOrMergeFiles =
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
FileFactory.getConfiguration());
Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+ List<String> deletedFiles = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
FileFactory.deleteFile(entry.getKey());
+ deletedFiles.add(entry.getKey());
for (String file : entry.getValue()) {
String[] deltaFilePaths =
updateStatusManager.getDeleteDeltaFilePath(file,
segment.getSegmentNo());
for (String deltaFilePath : deltaFilePaths) {
FileFactory.deleteFile(deltaFilePath);
+ deletedFiles.add(deltaFilePath);
}
FileFactory.deleteFile(file);
+ deletedFiles.add(file);
}
}
+ LOGGER.info("Deleted the files: " + String.join(",", deletedFiles) + " on
clean" +
+ " files operation");
deletePhysicalPartition(partitionSpecs, indexFilesMap, indexOrMergeFiles,
tablePath);
}
@@ -1147,16 +1153,19 @@ public class SegmentFileStore {
*/
private static void deletePhysicalPartition(List<PartitionSpec>
partitionSpecs,
Map<String, List<String>> locationMap, List<String> indexOrMergeFiles,
String tablePath) {
+ LOGGER.info("Deleting files: ");
for (String indexOrMergeFile : indexOrMergeFiles) {
if (null != partitionSpecs) {
Path location = new Path(indexOrMergeFile);
boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
if (!exists) {
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+ LOGGER.info(location.toString());
}
} else {
Path location = new Path(indexOrMergeFile);
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+ LOGGER.info(location.toString());
}
}
for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
@@ -1199,7 +1208,7 @@ public class SegmentFileStore {
}
}
- private static boolean pathExistsInPartitionSpec(List<PartitionSpec>
partitionSpecs,
+ public static boolean pathExistsInPartitionSpec(List<PartitionSpec>
partitionSpecs,
Path partitionPath) {
for (PartitionSpec spec : partitionSpecs) {
if (spec.getLocation().equals(partitionPath)) {
diff --git
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index e78b630..41c4b2b 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -517,7 +517,7 @@ public class CarbonUpdateUtil {
long minutesElapsed = (difference / (1000 * 60));
- return minutesElapsed > maxTime;
+ return minutesElapsed >= maxTime;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 5b800a2..38d9e56 100755
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -983,7 +983,27 @@ public class SegmentStatusManager {
}
}
- private static boolean isLoadDeletionRequired(LoadMetadataDetails[] details)
{
+ public static boolean isExpiredSegment(LoadMetadataDetails oneLoad,
AbsoluteTableIdentifier
+ absoluteTableIdentifier) {
+ boolean isExpiredSegment = false;
+ if (oneLoad.getSegmentStatus() == SegmentStatus.COMPACTED ||
oneLoad.getSegmentStatus() ==
+ SegmentStatus.MARKED_FOR_DELETE) {
+ isExpiredSegment = true;
+ } else if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
|| oneLoad
+ .getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+ // check if lock can be acquired
+ ICarbonLock segmentLock =
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+ CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) +
LockUsage.LOCK);
+ try {
+ isExpiredSegment = segmentLock.lockWithRetries();
+ } finally {
+ CarbonLockUtil.fileUnlock(segmentLock, LockUsage.LOCK);
+ }
+ }
+ return isExpiredSegment;
+ }
+
+ public static boolean isLoadDeletionRequired(LoadMetadataDetails[] details) {
if (details != null && details.length > 0) {
for (LoadMetadataDetails oneRow : details) {
if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus()
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
index 9889fe3..44d1d8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -74,13 +74,19 @@ public class CleanFilesUtil {
// delete the segment file as well
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
staleSegmentFile));
+ List<String> deletedFiles = new ArrayList<>();
+ deletedFiles.add(staleSegmentFile);
for (String duplicateStaleSegmentFile : redundantSegmentFile) {
if
(DataFileUtil.getSegmentNoFromSegmentFile(duplicateStaleSegmentFile)
.equals(segmentNumber)) {
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable
.getTablePath(), duplicateStaleSegmentFile));
+ deletedFiles.add(duplicateStaleSegmentFile);
}
}
+ LOGGER.info("Deleted the Segment :" + segmentPath.getName() + "
after"
+ + " moving it to the trash folder");
+ LOGGER.info("Deleted stale segment files: " + String.join(",",
deletedFiles));
} catch (IOException | InterruptedException e) {
LOGGER.error("Unable to delete the segment: " + segmentPath + "
from after moving" +
" it to the trash folder. Please delete them manually : " +
e.getMessage(), e);
@@ -122,16 +128,20 @@ public class CleanFilesUtil {
try {
for (String file : filesToProcess) {
FileFactory.deleteFile(file);
+ LOGGER.info("Deleted file :" + file + " after moving it to the trash
folder");
}
// Delete the segment file too
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
staleSegmentFile));
+ LOGGER.info("Deleted stale segment file after moving it to the trash
folder :"
+ + staleSegmentFile);
// remove duplicate segment files if any
for (String duplicateStaleSegmentFile : redundantSegmentFile) {
if
(DataFileUtil.getSegmentNoFromSegmentFile(duplicateStaleSegmentFile)
.equals(segmentNumber)) {
FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
duplicateStaleSegmentFile));
+ LOGGER.info("Deleted redundant segment file :" +
duplicateStaleSegmentFile);
}
}
} catch (IOException e) {
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index f8f2607..49db220 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -187,7 +187,7 @@ public final class DeleteLoadFolders {
return false;
}
- private static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad,
+ public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad,
boolean isForceDelete, boolean cleanStaleInProgress) {
/*
* if cleanStaleInProgress == false and isForceDelete == false, clean MFD
and Compacted
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
index ae5476d..47d196b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.util;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -150,48 +151,91 @@ public final class TrashUtil {
/**
* The below method deletes timestamp subdirectories in the trash folder
which have expired as
- * per the user defined retention time
+ * per the user defined retention time. It return an array where the first
element has the size
+ * freed from the trash folder and the second element has the remaining size
in the trash folder
*/
- public static void deleteExpiredDataFromTrash(String tablePath) {
+ public static long[] deleteExpiredDataFromTrash(String tablePath, Boolean
isDryRun,
+ Boolean showStats) {
CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
.getTrashFolderPath(tablePath));
+ long sizeFreed = 0;
+ long trashFolderSize = 0;
// Deleting the timestamp based subdirectories in the trashfolder by the
given timestamp.
try {
if (trashFolder.isFileExist()) {
+ if (isDryRun || showStats) {
+ trashFolderSize =
FileFactory.getDirectorySize(trashFolder.getAbsolutePath());
+ }
CarbonFile[] timestampFolderList = trashFolder.listFiles();
+ List<CarbonFile> filesToDelete = new ArrayList<>();
for (CarbonFile timestampFolder : timestampFolderList) {
// If the timeStamp at which the timeStamp subdirectory has expired
as per the user
// defined value, delete the complete timeStamp subdirectory
if (timestampFolder.isDirectory() &&
isTrashRetentionTimeoutExceeded(Long
.parseLong(timestampFolder.getName()))) {
- FileFactory.deleteAllCarbonFilesOfDir(timestampFolder);
- LOGGER.info("Timestamp subfolder from the Trash folder deleted: "
+ timestampFolder
+ // only calculate size in case of dry run or in case clean files
is with show stats
+ if (isDryRun || showStats) {
+ sizeFreed +=
FileFactory.getDirectorySize(timestampFolder.getAbsolutePath());
+ }
+ filesToDelete.add(timestampFolder);
+ }
+ }
+ if (!isDryRun) {
+ for (CarbonFile carbonFile : filesToDelete) {
+ LOGGER.info("Timestamp subfolder from the Trash folder deleted: "
+ carbonFile
.getAbsolutePath());
+ FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
}
}
}
} catch (IOException e) {
LOGGER.error("Error during deleting expired timestamp folder from the
trash folder", e);
}
+ return new long[] {sizeFreed, trashFolderSize - sizeFreed};
}
/**
* The below method deletes all the files and folders in the trash folder of
a carbon table.
+ * Returns an array in which the first element contains the size freed in
case of clean files
+ * operation or size that can be freed in case of dry run and the second
element contains the
+ * remaining size.
*/
- public static void emptyTrash(String tablePath) {
+ public static long[] emptyTrash(String tablePath, Boolean isDryRun, Boolean
showStats) {
CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
.getTrashFolderPath(tablePath));
// if the trash folder exists delete the contents of the trash folder
+ long sizeFreed = 0;
+ long[] sizeStatistics = new long[]{0, 0};
try {
if (trashFolder.isFileExist()) {
CarbonFile[] carbonFileList = trashFolder.listFiles();
+ List<CarbonFile> filesToDelete = new ArrayList<>();
for (CarbonFile carbonFile : carbonFileList) {
- FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
+ //Only calculate size when it is dry run operation or when show
statistics is
+ // true with actual operation
+ if (isDryRun || showStats) {
+ sizeFreed +=
FileFactory.getDirectorySize(carbonFile.getAbsolutePath());
+ }
+ filesToDelete.add(carbonFile);
+ }
+ sizeStatistics[0] = sizeFreed;
+ if (!isDryRun) {
+ for (CarbonFile carbonFile : filesToDelete) {
+ FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
+ }
+ LOGGER.info("Trash Folder has been emptied for table: " + tablePath);
+ if (showStats) {
+ sizeStatistics[1] =
FileFactory.getDirectorySize(trashFolder.getAbsolutePath());
+ }
+ } else {
+ sizeStatistics[1] =
FileFactory.getDirectorySize(trashFolder.getAbsolutePath()) -
+ sizeFreed;
}
}
} catch (IOException e) {
LOGGER.error("Error while emptying the trash folder", e);
}
+ return sizeStatistics;
}
/**
diff --git a/docs/clean-files.md b/docs/clean-files.md
index dbe611b..e5bde49 100644
--- a/docs/clean-files.md
+++ b/docs/clean-files.md
@@ -64,4 +64,41 @@ The stale_inprogress option with force option will delete
Marked for delete, Com
```
CLEAN FILES FOR TABLE TABLE_NAME options('stale_inprogress'='true',
'force'='true')
- ```
\ No newline at end of file
+ ```
+### DRY RUN OPTION
+Clean files also support a dry run option which will let the user know how
much space will we freed
+during the actual clean files operation. The dry run operation will not delete
any data but will just give
+size based statistics on the data which will be cleaned in clean files. Dry
run operation will return two columns where the first will
+show how much space will be freed by that clean files operation and the second
column will show the
+remaining stale data(data which can be deleted but has not yet expired as per
the ```max.query.execution.time``` and ``` carbon.trash.retention.days``` values
+). By default the value of ```dryrun``` option is ```false```.
+
+Dry Run Operation is supported with four types of commands:
+ ```
+ CLEAN FILES FOR TABLE TABLE_NAME options('dryrun'='true')
+ ```
+ ```
+ CLEAN FILES FOR TABLE TABLE_NAME options('force'='true', 'dryrun'='true')
+ ```
+ ```
+ CLEAN FILES FOR TABLE TABLE_NAME
options('stale_inprogress'='true','dryrun'='true')
+ ```
+
+ ```
+ CLEAN FILES FOR TABLE TABLE_NAME options('stale_inprogress'='true',
'force'='true','dryrun'='true')
+ ```
+
+**NOTE**:
+ * Since the dry run operation will calculate size and will access File level
API's, the operation can
+ be a costly and a time consuming operation in case of tables with large
number of segments.
+ * When dry run is true, the statistics option will not matter.
+
+### SHOW STATISTICS
+Clean files operation tells how much size is freed during that operation to
the user. By default, the clean files operation
+will show the size freed statistics. Since calculating and showing statistics
can be a costly operation and reduce the performance of the
+clean files operation, the user can disable that option by using ```statistics
= false``` in the clean files options.
+
+ ```
+ CLEAN FILES FOR TABLE TABLE_NAME options('statistics'='false')
+ ```
+
\ No newline at end of file
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
index 32183ab..ded9c87 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
@@ -19,6 +19,9 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
/**
* test cases for testing clean and delete segment functionality for index
tables
*/
@@ -60,7 +63,10 @@ class TestCreateIndexForCleanAndDeleteSegment extends
QueryTest with BeforeAndAf
"SEGMENT.STARTTIME BEFORE '2025-06-01 12:05:06'")
sql("create materialized view mv1 as select empname, deptname, " +
"avg(salary) from delete_segment_by_id group by empname, deptname")
- sql("clean files for table delete_segment_by_id")
+ var dryRun = sql("clean files for table delete_segment_by_id
OPTIONS('dryrun'='true')")
+ .collect()
+ var cleanFiles = sql("clean files for table
delete_segment_by_id").collect()
+ assert(cleanFiles(0).get(0) == dryRun(0).get(0))
checkAnswer(sql("select count(*) from delete_segment_by_id"),
sql("select count(*) from index_no_dictionary"))
val postDeleteSegmentsByDate = sql("SHOW SEGMENTS FOR TABLE
delete_segment_by_id").count()
@@ -69,6 +75,14 @@ class TestCreateIndexForCleanAndDeleteSegment extends
QueryTest with BeforeAndAf
assert(result.get(0).get(2).toString.equalsIgnoreCase("ENABLED"))
assert(result.get(0).get(3).toString.equalsIgnoreCase("full"))
assert(result.get(0).get(4).toString.equalsIgnoreCase("on_commit"))
+ dryRun = sql("clean files for table delete_segment_by_id" +
+ " OPTIONS('dryrun'='true', 'force'='true')").collect()
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ cleanFiles = sql("clean files for table delete_segment_by_id
OPTIONS('force'='true')").collect()
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+ assert(cleanFiles(0).get(0) == dryRun(0).get(0))
sql("drop materialized view if exists mv1 ")
sql("drop table if exists delete_segment_by_id")
}
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
index cadc43b..1d0bd3b 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
@@ -19,16 +19,19 @@ package org.apache.carbondata.trash
import scala.collection.JavaConverters._
+import org.apache.commons.lang.StringUtils
+
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock,
LockUsage}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil,
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil,
CleanFilesUtil, DeleteLoadFolders, TrashUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
object DataTrashManager {
@@ -48,7 +51,8 @@ object DataTrashManager {
carbonTable: CarbonTable,
isForceDelete: Boolean,
cleanStaleInProgress: Boolean,
- partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+ showStatistics: Boolean,
+ partitionSpecs: Option[Seq[PartitionSpec]] = None) : Long = {
// if isForceDelete = true need to throw exception if
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
if (isForceDelete &&
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
LOGGER.error("Clean Files with Force option deletes the physical data
and it cannot be" +
@@ -72,11 +76,26 @@ object DataTrashManager {
carbonDeleteSegmentLock = CarbonLockUtil.getLockObject(carbonTable
.getAbsoluteTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK,
deleteSegmentErrorMsg)
// step 1: check and clean trash folder
- checkAndCleanTrashFolder(carbonTable, isForceDelete)
+ // trashFolderSizeStats(0) contains the size that is freed/or can be
freed and
+ // trashFolderSizeStats(1) contains the size of remaining data in the
trash folder
+ val trashFolderSizeStats = checkAndCleanTrashFolder(carbonTable,
isForceDelete,
+ isDryRun = false, showStatistics)
// step 2: move stale segments which are not exists in metadata into
.Trash
moveStaleSegmentsToTrash(carbonTable)
// step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In
Progress)
- checkAndCleanExpiredSegments(carbonTable, isForceDelete,
cleanStaleInProgress, partitionSpecs)
+ // Since calculating the the size before and after clean files can be a
costly operation
+ // have exposed an option where user can change this behaviour.
+ if (showStatistics) {
+ val sizeBeforeCleaning = getSizeSnapshot(carbonTable)
+ checkAndCleanExpiredSegments(carbonTable, isForceDelete,
+ cleanStaleInProgress, partitionSpecs)
+ val sizeAfterCleaning = getSizeSnapshot(carbonTable)
+ sizeBeforeCleaning - sizeAfterCleaning + trashFolderSizeStats._1
+ } else {
+ checkAndCleanExpiredSegments(carbonTable, isForceDelete,
+ cleanStaleInProgress, partitionSpecs)
+ 0
+ }
} finally {
if (carbonCleanFilesLock != null) {
CarbonLockUtil.fileUnlock(carbonCleanFilesLock,
LockUsage.CLEAN_FILES_LOCK)
@@ -87,13 +106,54 @@ object DataTrashManager {
}
}
- private def checkAndCleanTrashFolder(carbonTable: CarbonTable,
isForceDelete: Boolean): Unit = {
+ /**
+ * Checks the size of the segment files as well as datafiles, this method is
used before and after
+ * clean files operation to check how much space is actually freed, during
the operation.
+ */
+ def getSizeSnapshot(carbonTable: CarbonTable): Long = {
+ val metadataDetails =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ var size: Long = 0
+ val segmentFileLocation =
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)
+ if (FileFactory.isFileExist(segmentFileLocation)) {
+ size += FileFactory.getDirectorySize(segmentFileLocation)
+ }
+ metadataDetails.foreach(oneLoad =>
+ if (oneLoad.getVisibility.toBoolean) {
+ size += calculateSegmentSizeForOneLoad(carbonTable, oneLoad,
metadataDetails)
+ }
+ )
+ size
+ }
+
+ /**
+ * Method to handle the Clean files dry run operation
+ */
+ def cleanFilesDryRunOperation (
+ carbonTable: CarbonTable,
+ isForceDelete: Boolean,
+ cleanStaleInProgress: Boolean,
+ showStats: Boolean): (Long, Long) = {
+ // get size freed from the trash folder
+ val trashFolderSizeStats = checkAndCleanTrashFolder(carbonTable,
isForceDelete,
+ isDryRun = true, showStats)
+ // get size that will be deleted (MFD, COmpacted, Inprogress segments)
+ val expiredSegmentsSizeStats = dryRunOnExpiredSegments(carbonTable,
isForceDelete,
+ cleanStaleInProgress)
+ (trashFolderSizeStats._1 + expiredSegmentsSizeStats._1,
trashFolderSizeStats._2 +
+ expiredSegmentsSizeStats._2)
+ }
+
+ private def checkAndCleanTrashFolder(carbonTable: CarbonTable,
isForceDelete: Boolean,
+ isDryRun: Boolean, showStats: Boolean): (Long, Long) = {
if (isForceDelete) {
// empty the trash folder
- TrashUtil.emptyTrash(carbonTable.getTablePath)
+ val sizeStatistics = TrashUtil.emptyTrash(carbonTable.getTablePath,
isDryRun, showStats)
+ (sizeStatistics.head, sizeStatistics(1))
} else {
// clear trash based on timestamp
- TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+ val sizeStatistics =
TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath,
+ isDryRun, showStats)
+ (sizeStatistics.head, sizeStatistics(1))
}
}
@@ -122,6 +182,84 @@ object DataTrashManager {
}
/**
+ * Does Clean files dry run operation on the expired segments. Returns the
size freed
+ * during that clean files operation and also shows the remaining trash
size, which can be
+ * cleaned after those segments are expired
+ */
+ private def dryRunOnExpiredSegments(
+ carbonTable: CarbonTable,
+ isForceDelete: Boolean,
+ cleanStaleInProgress: Boolean): (Long, Long) = {
+ var sizeFreed: Long = 0
+ var trashSizeRemaining: Long = 0
+ val loadMetadataDetails =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ if (SegmentStatusManager.isLoadDeletionRequired(loadMetadataDetails)) {
+ loadMetadataDetails.foreach { oneLoad =>
+ if (!oneLoad.getVisibility.equalsIgnoreCase("false")) {
+ val segmentFilePath =
CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath,
+ oneLoad.getSegmentFile)
+ if (DeleteLoadFolders.canDeleteThisLoad(oneLoad, isForceDelete,
cleanStaleInProgress)) {
+ // No need to consider physical data for external segments, only
consider metadata.
+ if (oneLoad.getPath() == null ||
oneLoad.getPath().equalsIgnoreCase("NA")) {
+ sizeFreed += calculateSegmentSizeForOneLoad(carbonTable,
oneLoad, loadMetadataDetails)
+ }
+ if (FileFactory.isFileExist(segmentFilePath)) {
+ sizeFreed += FileFactory.getCarbonFile(segmentFilePath).getSize
+ }
+ } else {
+ if (SegmentStatusManager.isExpiredSegment(oneLoad, carbonTable
+ .getAbsoluteTableIdentifier)) {
+ trashSizeRemaining +=
calculateSegmentSizeForOneLoad(carbonTable, oneLoad,
+ loadMetadataDetails)
+ if (FileFactory.isFileExist(segmentFilePath)) {
+ trashSizeRemaining +=
FileFactory.getCarbonFile(segmentFilePath).getSize
+ }
+ }
+ }
+ }
+ }
+ }
+ (sizeFreed, trashSizeRemaining)
+ }
+
+ /**
+ * calculates the segment size based of a segment
+ */
+ def calculateSegmentSizeForOneLoad( carbonTable: CarbonTable, oneLoad:
LoadMetadataDetails,
+ loadMetadataDetails: Array[LoadMetadataDetails]) : Long = {
+ var size : Long = 0
+ if (!StringUtils.isEmpty(oneLoad.getDataSize)) {
+ size += oneLoad.getDataSize.toLong
+ }
+ if (!StringUtils.isEmpty(oneLoad.getIndexSize)) {
+ size += oneLoad.getIndexSize.toLong
+ }
+ if (!oneLoad.getUpdateDeltaStartTimestamp.isEmpty &&
!oneLoad.getUpdateDeltaEndTimestamp
+ .isEmpty) {
+ size += calculateDeltaFileSize(carbonTable, oneLoad, loadMetadataDetails)
+ }
+ size
+ }
+
+ /**
+ * calculates the size of delta files for one segment
+ */
+ def calculateDeltaFileSize( carbonTable: CarbonTable, oneLoad:
LoadMetadataDetails,
+ loadMetadataDetails: Array[LoadMetadataDetails]) : Long = {
+ var size: Long = 0
+ val segmentUpdateStatusManager = new
SegmentUpdateStatusManager(carbonTable,
+ loadMetadataDetails)
+
segmentUpdateStatusManager.getBlockNameFromSegment(oneLoad.getLoadName).asScala.foreach
{
+ block =>
+ segmentUpdateStatusManager.getDeleteDeltaFilesList(Segment
+ .toSegment(oneLoad.getLoadName), block).asScala.foreach{ deltaFile =>
+ size += FileFactory.getCarbonFile(deltaFile).getSize
+ }
+ }
+ size
+ }
+
+ /**
* clean the stale compact segment immediately after compaction failure
*/
def cleanStaleCompactionSegment(
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index c51efef..6461604 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -19,14 +19,16 @@ package org.apache.spark.sql.execution.command.management
import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.types.StringType
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.ByteUtil
import org.apache.carbondata.events._
import org.apache.carbondata.trash.DataTrashManager
@@ -41,6 +43,27 @@ case class CarbonCleanFilesCommand(
extends DataCommand {
val LOGGER: Logger =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val isDryRun: Boolean = options.getOrElse("dryrun", "false").toBoolean
+ val showStats: Boolean = if (isInternalCleanCall) {
+ false
+ } else {
+ options.getOrElse("statistics", "true").toBoolean
+ }
+
+ override def output: Seq[AttributeReference] = {
+ if (isDryRun) {
+ // dry run operation
+ Seq(
+ AttributeReference("Size To Be Freed", StringType, nullable = false)(),
+ AttributeReference("Trash Data Remaining", StringType, nullable =
false)())
+ } else if (showStats) {
+ Seq(
+ AttributeReference("Size Freed", StringType, nullable = false)())
+ // actual clean files operation
+ } else {
+ Seq.empty
+ }
+ }
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
@@ -58,18 +81,32 @@ case class CarbonCleanFilesCommand(
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non
transactional table")
}
-
- val preEvent = CleanFilesPreEvent(carbonTable, sparkSession)
- val postEvent = CleanFilesPostEvent(carbonTable, sparkSession, options)
- withEvents(preEvent, postEvent) {
- DataTrashManager.cleanGarbageData(
+ var sizeCleaned : Long = 0
+ if (isDryRun) {
+ val result = DataTrashManager.cleanFilesDryRunOperation(
carbonTable,
options.getOrElse("force", "false").toBoolean,
options.getOrElse("stale_inprogress", "false").toBoolean,
- CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession,
carbonTable))
+ showStats)
+ Seq(Row(ByteUtil.convertByteToReadable(result._1), ByteUtil
+ .convertByteToReadable(result._2)))
+ } else {
+ val preEvent = CleanFilesPreEvent(carbonTable, sparkSession)
+ val postEvent = CleanFilesPostEvent(carbonTable, sparkSession, options)
+ withEvents(preEvent, postEvent) {
+ sizeCleaned = DataTrashManager.cleanGarbageData(
+ carbonTable,
+ options.getOrElse("force", "false").toBoolean,
+ options.getOrElse("stale_inprogress", "false").toBoolean,
+ showStats,
+ CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession,
carbonTable))
+ }
+ if (showStats) {
+ Seq(Row(ByteUtil.convertByteToReadable(sizeCleaned)))
+ } else {
+ Seq.empty
+ }
}
-
- Seq.empty
}
override protected def opName: String = "CLEAN FILES"
diff --git
a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
index 16898b2..40bd805 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -40,6 +40,7 @@ object CleanFiles {
carbonTable,
isForceDeletion,
cleanStaleInProgress,
+ false,
CarbonFilters.getPartitions(Seq.empty[Expression], spark, carbonTable))
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
index 8812253..c5589fe 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
@@ -31,7 +31,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.statusmanager.{SegmentStatus,
SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties,
CarbonTestUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
@@ -53,7 +53,11 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
assert(segmentNumber1 == 4)
- sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('stale_inprogress'='true')").show
+ val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest " +
+ s"OPTIONS('stale_inprogress'='true','dryrun'='true')").collect()
+ val cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest" +
+ s" OPTIONS('stale_inprogress'='true')").collect()
+ assert(cleanFiles(0).get(0) == dryRun(0).get(0))
val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
assert(4 == segmentNumber2)
assert(!FileFactory.isFileExist(trashFolderPath))
@@ -76,7 +80,11 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
assert(segmentNumber1 == 4)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
- sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('stale_inprogress'='true','force'='true')").show
+ val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS" +
+ s"('stale_inprogress'='true','force'='true','dryrun'='true')").collect()
+ val cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS" +
+ s"('stale_inprogress'='true','force'='true')").collect()
+ assert(cleanFiles(0).get(0) == dryRun(0).get(0))
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
@@ -100,10 +108,18 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
assert(!FileFactory.isFileExist(trashFolderPath))
sql(s"""Delete from table cleantest where segment.id in(4)""")
+
+ var dryRun = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('dryrun'='true')").collect()
+ var cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+ sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('statistics'='false')").show()
+ assert(cleanFiles(0).get(0) == dryRun(0).get(0))
+ dryRun = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('dryrun'='true','force'='true')")
+ .collect()
val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
- sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+ cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('force'='true')").collect()
+ assert(cleanFiles(0).get(0) == dryRun(0).get(0))
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
sql(s"""show segments for table cleantest""").show()
@@ -130,7 +146,9 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
sqlContext.sparkSession), "4")
assert(!FileFactory.isFileExist(trashFolderPath))
- sql(s"CLEAN FILES FOR TABLE cleantest").show()
+ var dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('dryrun'='true')").collect()
+ var cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+ assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
checkAnswer(sql(s"""select count(*) from cleantest"""),
Seq(Row(4)))
count = 0
@@ -149,17 +167,19 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
checkAnswer(sql(s"""select count(*) from cleantest"""),
Seq(Row(5)))
- sql(s"CLEAN FILES FOR TABLE cleantest").show()
+ dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('dryrun'='true')").collect()
+ cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+ assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
count = 0
list = getFileCountInTrashFolder(trashFolderPath)
assert(list == 2)
- intercept[RuntimeException] {
- sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
- }
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
- sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+ dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true',
'dryrun'='true')")
+ .collect()
+ cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('force'='true')").collect()
+ assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
@@ -466,6 +486,42 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
}
+ test("Test clean files after delete command") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql("drop table if exists cleantest")
+ sql(
+ """
+ | CREATE TABLE cleantest (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED AS carbondata
+ """.stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
cleantest OPTIONS
+ |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+ val table = CarbonEnv.getCarbonTable(None, "cleantest")
(sqlContext.sparkSession)
+ sql("delete from cleantest where deptno='10'")
+ sql(s"""Delete from table cleantest where segment.id in(0)""")
+ val segmentSize =
FileFactory.getDirectorySize(CarbonTablePath.getSegmentPath(table
+ .getTablePath, "0")) + FileFactory.getDirectorySize(CarbonTablePath
+ .getSegmentFilesLocation(table.getTablePath))
+ var dryRun = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('dryrun'='true')").collect()
+ var cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+ assert(cleanFiles(0).get(0) == dryRun(0).get(0))
+ dryRun = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('dryrun'='true','force'='true')")
+ .collect()
+ cleanFiles = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('force'='true')").collect()
+ assert(cleanFiles(0).get(0) == dryRun(0).get(0))
+ assert(ByteUtil.convertByteToReadable(segmentSize) == cleanFiles(0).get(0))
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+ CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+ sql("drop table if exists cleantest")
+ }
+
+
def editTableStatusFile(carbonTablePath: String) : Unit = {
// original table status file
val f1 = new File(CarbonTablePath.getTableStatusFilePath(carbonTablePath))
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
index c494404..7f2a0e1 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
@@ -65,15 +65,25 @@ class TestCleanFilesCommandPartitionTable extends QueryTest
with BeforeAndAfterA
loadData()
sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
loadData()
+ var dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('dryrun'='true')").collect()
+ var cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+ assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
val path = CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(sqlContext.sparkSession)
.getTablePath
val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
CarbonTablePath.TRASH_DIR
assert(!FileFactory.isFileExist(trashFolderPath))
sql(s"""Delete from table cleantest where segment.id in(4)""")
+ dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest
options('dryrun'='true')").collect()
+ cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest").collect()
+ assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
+
val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
- sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+ dryRunRes = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('force'='true','dryrun'='true')")
+ .collect()
+ cleanFilesRes = sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('force'='true')").collect()
+ assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
@@ -139,7 +149,9 @@ class TestCleanFilesCommandPartitionTable extends QueryTest
with BeforeAndAfterA
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
sqlContext.sparkSession), "2")
- sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+ val dryRunRes = sql(s"CLEAN FILES FOR TABLE CLEANTEST
OPTIONS('dryrun'='true')").collect()
+ val cleanFilesRes = sql(s"CLEAN FILES FOR TABLE CLEANTEST").collect()
+ assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
checkAnswer(sql(s"""select count(*) from cleantest"""),
Seq(Row(2)))
@@ -186,7 +198,9 @@ class TestCleanFilesCommandPartitionTable extends QueryTest
with BeforeAndAfterA
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
sqlContext.sparkSession), "2")
- sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
+ val dryRunRes = sql(s"CLEAN FILES FOR TABLE CLEANTEST
OPTIONS('dryrun'='true')").collect()
+ val cleanFilesRes = sql(s"CLEAN FILES FOR TABLE CLEANTEST").collect()
+ assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
val timeStamp = getTimestampFolderName(trashFolderPath)
// test recovery from partition table
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
index cde2f88..a32f352 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -101,7 +101,9 @@ class FlatFolderTableLoadingTestCase extends QueryTest with
BeforeAndAfterAll {
assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
.listFiles()
.count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT))
== 5)
- sql("clean files for table t1 options('force'='true')")
+ var dryRunRes = sql("clean files for table t1 options('force'='true',
'dryrun'='true')").collect()
+ var cleanFilesRes = sql("clean files for table t1
options('force'='true')").collect()
+ assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
.listFiles()
.count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT))
== 4)
@@ -110,7 +112,9 @@ class FlatFolderTableLoadingTestCase extends QueryTest with
BeforeAndAfterAll {
assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
.listFiles()
.count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT))
== 5)
- sql("clean files for table t1 options('force'='true')")
+ dryRunRes = sql("clean files for table t1 options('force'='true',
'dryrun'='true')").collect()
+ cleanFilesRes = sql("clean files for table t1
options('force'='true')").collect()
+ assert(cleanFilesRes(0).get(0) == dryRunRes(0).get(0))
assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
.listFiles()
.count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT))
== 1)