This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 7e58bbb [CARBONDATA-4067]: Removing force option in clean files
command and changing behaviour when MFD, Compacted and stale Inprogress
segments can be deleted
7e58bbb is described below
commit 7e58bbb5b28f497b6597dc58573cff902ee519b9
Author: Vikram Ahuja <[email protected]>
AuthorDate: Fri Nov 6 12:59:43 2020 +0530
[CARBONDATA-4067]: Removing force option in clean files command and
changing behaviour when MFD, Compacted and stale Inprogress segments can be
deleted
Why is this PR needed?
Change the behaviour change for clean files operation
Old behaviour: Clean files command is by default force option = true and
ignores query timeout. So, the MFD, Compacted and Inprogress segments are
removed immediately when clean files is called. The user can by mistake call
delete some important data
What changes were proposed in this PR?
Instead of just a force option deleting all MFD, Compacted and Insert In
Progress segments, dividing them into 2 parameters, forceClean and
cleanStaleInProgress. forceClean parameter will clean data immediately and
cleanStaleInProgress will decide if stale InProgress segments can be deleted.
The behaviour is described below.
New Behaviour: clean files is no longer force by default and depends on 2
variables(forceClean and cleanStaleInProgress).
default clean files behaviour(clean files on table t1): clean MFD and
Compacted segments will depend on query timeout(1 hr) and
trashRetentionTimeout(7 days, default). For example:
If trashRetentionTimeout is 7 days and query timeout is 1 hr--> Delete
after 7 days
If trashRetentionTimeout is 0 days and query timeout is 1 hr--> Delete
after 1 hr
It will also empty trash based on trash retention time.
clean files on table t1 options('force'='true'): clean MFD and Compacted
segments immediately(Do not check for any timeout) and empty trash immediately.
clean files on table t1 options('clean_inprgress'='true') : clean stale
inprogress, MFD and Compacted segments depends on trashRetentionTimeout, after
7 days(default behaviour) and empty trash based on trash retention time.
clean files on table t1 options('clean_inprgress'='true', 'force'='true') :
clean MFD, Compacted and stale inprogress segments immediately(Do not check for
any timeout) and empty trash immediately.
Does this PR introduce any user interface change?
Yes. Document is updated.
Is any new testcase added?
No (previous test cases changed)
This closes #4035
---
.../core/statusmanager/SegmentStatusManager.java | 58 +++---------
.../carbondata/core/util/CleanFilesUtil.java | 3 +
.../carbondata/core/util/DeleteLoadFolders.java | 97 ++++++++++++-------
docs/clean-files.md | 21 ++++-
.../TestIndexModelWithAggQueries.scala | 8 +-
.../secondaryindex/TestIndexModelWithIUD.scala | 8 +-
.../testsuite/secondaryindex/TestIndexRepair.scala | 30 +++---
.../secondaryindex/TestSIWithSecondaryIndex.scala | 6 ++
.../org/apache/carbondata/api/CarbonStore.scala | 19 +++-
.../carbondata/events/CleanFilesEvents.scala | 3 +-
.../apache/carbondata/spark/util/CommonUtil.scala | 3 +-
.../org/apache/carbondata/view/MVRefresher.scala | 2 +-
.../management/CarbonCleanFilesCommand.scala | 24 +++--
.../management/CarbonInsertIntoWithDf.scala | 2 +-
.../command/management/CarbonLoadDataCommand.scala | 2 +-
.../events/CleanFilesPostEventListener.scala | 9 +-
.../secondaryindex/rdd/SecondaryIndexCreator.scala | 2 +-
.../org/apache/spark/sql/test/util/QueryTest.scala | 10 ++
.../scala/org/apache/spark/util/CleanFiles.scala | 15 ++-
.../bloom/BloomCoarseGrainIndexFunctionSuite.scala | 6 +-
.../testsuite/binary/TestBinaryDataType.scala | 6 +-
.../dataload/TestLoadDataWithCompression.scala | 6 +-
.../allqueries/InsertIntoCarbonTableTestCase.scala | 6 +-
.../cleanfiles/TestCleanFileCommand.scala | 103 ++++++++++++---------
.../TestCleanFilesCommandPartitionTable.scala | 80 +++++++---------
.../CompactionSupportGlobalSortFunctionTest.scala | 16 ++--
.../TableLevelCompactionOptionTest.scala | 12 ++-
.../FlatFolderTableLoadingTestCase.scala | 13 ++-
.../testsuite/iud/DeleteCarbonTableTestCase.scala | 8 +-
.../testsuite/segment/ShowSegmentTestCase.scala | 22 ++++-
.../TestSegmentReadingForMultiThreading.scala | 6 ++
.../StandardPartitionGlobalSortTestCase.scala | 12 ++-
.../StandardPartitionTableCleanTestCase.scala | 7 +-
.../StandardPartitionTableCompactionTestCase.scala | 7 +-
.../view/rewrite/TestPartitionWithMV.scala | 7 +-
.../org/apache/spark/util/CarbonCommandSuite.scala | 9 +-
36 files changed, 398 insertions(+), 250 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 6fc0754..c062a02 100755
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -990,46 +990,6 @@ public class SegmentStatusManager {
return newListMetadata;
}
- private static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
- List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
- String dataLoadLocation =
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
-
- DataOutputStream dataOutputStream;
- Gson gsonObjectToWrite = new Gson();
- BufferedWriter brWriter = null;
-
- AtomicFileOperations writeOperation =
- AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
-
- try {
-
- dataOutputStream =
writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
- brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
- Charset.forName(DEFAULT_CHARSET)));
-
- // make the table status file smaller by removing fields that are
default value
-
listOfLoadFolderDetails.forEach(LoadMetadataDetails::removeUnnecessaryField);
-
- String metadataInstance =
gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
- brWriter.write(metadataInstance);
- } catch (IOException ie) {
- LOG.error("Error message: " + ie.getLocalizedMessage());
- writeOperation.setFailed();
- throw ie;
- } finally {
- try {
- if (null != brWriter) {
- brWriter.flush();
- }
- } catch (Exception e) {
- LOG.error("error in flushing ");
-
- }
- CarbonUtil.closeStreams(brWriter);
- writeOperation.close();
- }
- }
-
private static class ReturnTuple {
LoadMetadataDetails[] details;
boolean isUpdateRequired;
@@ -1040,16 +1000,18 @@ public class SegmentStatusManager {
}
private static ReturnTuple isUpdateRequired(boolean isForceDeletion,
CarbonTable carbonTable,
- AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[]
details) {
+ AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[]
details,
+ boolean cleanStaleInProgress) {
// Delete marked loads
boolean isUpdateRequired = DeleteLoadFolders
.deleteLoadFoldersFromFileSystem(absoluteTableIdentifier,
isForceDeletion, details,
- carbonTable.getMetadataPath());
+ carbonTable.getMetadataPath(), cleanStaleInProgress);
return new ReturnTuple(details, isUpdateRequired);
}
public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable,
boolean isForceDeletion,
- List<PartitionSpec> partitionSpecs) throws IOException {
+ List<PartitionSpec> partitionSpecs, boolean cleanStaleInprogress,
+ boolean isCleanFilesOperation) throws IOException {
LoadMetadataDetails[] metadataDetails =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
// delete the expired segment lock files
@@ -1059,7 +1021,8 @@ public class SegmentStatusManager {
boolean updateCompletionStatus = false;
LoadMetadataDetails[] newAddedLoadHistoryList = null;
ReturnTuple tuple =
- isUpdateRequired(isForceDeletion, carbonTable, identifier,
metadataDetails);
+ isUpdateRequired(isForceDeletion, carbonTable, identifier,
metadataDetails,
+ cleanStaleInprogress);
if (tuple.isUpdateRequired) {
ICarbonLock carbonTableStatusLock =
CarbonLockFactory.getCarbonLockObj(identifier,
LockUsage.TABLE_STATUS_LOCK);
@@ -1079,7 +1042,8 @@ public class SegmentStatusManager {
LoadMetadataDetails[] details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
ReturnTuple tuple2 =
- isUpdateRequired(isForceDeletion, carbonTable, identifier,
details);
+ isUpdateRequired(isForceDeletion, carbonTable,
+ identifier, details, cleanStaleInprogress);
if (!tuple2.isUpdateRequired) {
return;
}
@@ -1095,7 +1059,7 @@ public class SegmentStatusManager {
// if execute command 'clean files' or the number of invisible
segment info
// exceeds the value of 'carbon.invisible.segments.preserve.count',
// it need to append the invisible segment list to
'tablestatus.history' file.
- if (isForceDeletion || (invisibleSegmentCnt >
invisibleSegmentPreserveCnt)) {
+ if (isCleanFilesOperation || invisibleSegmentCnt >
invisibleSegmentPreserveCnt) {
TableStatusReturnTuple tableStatusReturn =
separateVisibleAndInvisibleSegments(
tuple2.details, latestMetadata, invisibleSegmentCnt,
maxSegmentId);
LoadMetadataDetails[] oldLoadHistoryList =
readLoadHistoryMetadata(
@@ -1136,7 +1100,7 @@ public class SegmentStatusManager {
if (updateCompletionStatus) {
DeleteLoadFolders
.physicalFactAndMeasureMetadataDeletion(carbonTable,
newAddedLoadHistoryList,
- isForceDeletion, partitionSpecs);
+ isForceDeletion, partitionSpecs, cleanStaleInprogress);
}
}
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
index 6311d3a..bdfab6b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -158,6 +158,9 @@ public class CleanFilesUtil {
}
LoadMetadataDetails[] details =
SegmentStatusManager.readLoadMetadata(carbonTable
.getMetadataPath());
+ if (details == null || details.length == 0) {
+ return;
+ }
Set<String> loadNameSet =
Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
.collect(Collectors.toSet());
List<String> staleSegments = segmentFiles.stream().filter(segmentFile ->
!loadNameSet.contains(
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index d509959..e1b4663 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -69,20 +69,23 @@ public final class DeleteLoadFolders {
public static void physicalFactAndMeasureMetadataDeletion(CarbonTable
carbonTable,
LoadMetadataDetails[] newAddedLoadHistoryList,
boolean isForceDelete,
- List<PartitionSpec> specs) {
+ List<PartitionSpec> specs,
+ boolean cleanStaleInProgress) {
LoadMetadataDetails[] currentDetails =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
physicalFactAndMeasureMetadataDeletion(carbonTable,
currentDetails,
isForceDelete,
specs,
- currentDetails);
+ currentDetails,
+ cleanStaleInProgress);
if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0)
{
physicalFactAndMeasureMetadataDeletion(carbonTable,
newAddedLoadHistoryList,
isForceDelete,
specs,
- currentDetails);
+ currentDetails,
+ cleanStaleInProgress);
}
}
@@ -90,13 +93,13 @@ public final class DeleteLoadFolders {
* Delete the invalid data physically from table.
* @param carbonTable table
* @param loadDetails Load details which need clean up
- * @param isForceDelete is Force delete requested by user
+ * @param isForceDelete Force delete Compacted and MFD segments. it will
empty the trash folder
* @param specs Partition specs
* @param currLoadDetails Current table status load details which are
required for update manager.
*/
private static void physicalFactAndMeasureMetadataDeletion(CarbonTable
carbonTable,
LoadMetadataDetails[] loadDetails, boolean isForceDelete,
List<PartitionSpec> specs,
- LoadMetadataDetails[] currLoadDetails) {
+ LoadMetadataDetails[] currLoadDetails, boolean cleanStaleInProgress) {
List<TableIndex> indexes = new ArrayList<>();
try {
for (TableIndex index :
IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) {
@@ -113,7 +116,7 @@ public final class DeleteLoadFolders {
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
for (final LoadMetadataDetails oneLoad : loadDetails) {
- if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+ if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete,
cleanStaleInProgress)) {
try {
if (oneLoad.getSegmentFile() != null) {
SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(),
@@ -173,40 +176,66 @@ public final class DeleteLoadFolders {
}
private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
- boolean isForceDelete) {
- if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
- SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() ||
- SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() ||
- SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS ==
oneLoad.getSegmentStatus())
- && oneLoad.getVisibility().equalsIgnoreCase("true")) {
- if (isForceDelete) {
- return true;
- }
- long deletionTime = oneLoad.getModificationOrDeletionTimestamp();
- return TrashUtil.isTrashRetentionTimeoutExceeded(deletionTime) &&
CarbonUpdateUtil
- .isMaxQueryTimeoutExceeded(deletionTime);
+ boolean isForceDelete, boolean cleanStaleInProgress) {
+ if (oneLoad.getVisibility().equalsIgnoreCase("true")) {
+ return canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress);
}
-
return false;
}
private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails
oneLoad,
- boolean isForceDelete) {
+ boolean isForceDelete, boolean cleanStaleInProgress) {
// Check if the segment is added externally and path is set then do not
delete it
- if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus()
- || SegmentStatus.COMPACTED == oneLoad.getSegmentStatus()) &&
(oneLoad.getPath() == null
- || oneLoad.getPath().equalsIgnoreCase("NA"))) {
- if (isForceDelete) {
+ if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA"))
{
+ return canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress);
+ }
+ return false;
+ }
+
+ private static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad,
+ boolean isForceDelete, boolean cleanStaleInProgress) {
+ /*
+ * if cleanStaleInProgress == false and isForceDelete == false, clean MFD
and Compacted
+ * segments will depend on query timeout(1 hr) and
trashRetentionTimeout(7 days, default).
+ * For example:
+ * If trashRetentionTimeout is 7 days and query timeout is 1 hr--> Delete
after 7 days
+ * If trashRetentionTimeout is 0 days and query timeout is 1 hr--> Delete
after 1 hr
+ *
+ * if cleanStaleInProgress == false and isForceDelete == true, clean MFD
and Compacted
+ * segments immediately(Do not check for any timeout)
+ *
+ * if cleanStaleInProgress == true and isForceDelete == false, clean
Stale Inprogress, MFD and
+ * compacted segments after 7 days(taking carbon.trash.retention.time
value)
+ *
+ * if cleanStaleInProgress == true and isForceDelete == true, clean MFD,
Compacted and
+ * stale inprogress segments immediately.(Do not check for any timeout)
+ */
+ if (isForceDelete) {
+ // immediately delete compacted and MFD
+ if (SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() ||
SegmentStatus
+ .MARKED_FOR_DELETE == oneLoad.getSegmentStatus()) {
return true;
}
- long deletionTime = oneLoad.getModificationOrDeletionTimestamp();
-
+ // immediately delete inprogress segments if cleanstaleinprogress is true
+ return cleanStaleInProgress && (SegmentStatus.INSERT_IN_PROGRESS ==
oneLoad
+ .getSegmentStatus() || SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS ==
oneLoad
+ .getSegmentStatus());
+ }
+ long deletionTime = oneLoad.getModificationOrDeletionTimestamp();
+ // in case there is no deletion or modification timestamp, take the load
start time as
+ // deleteTime
+ if (deletionTime == 0) {
+ deletionTime = oneLoad.getLoadStartTime();
+ }
+ if (SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() || SegmentStatus
+ .MARKED_FOR_DELETE == oneLoad.getSegmentStatus()) {
+ // delete MFD, compacted segments after checking trash timeout and query
timeout
return TrashUtil.isTrashRetentionTimeoutExceeded(deletionTime) &&
CarbonUpdateUtil
- .isMaxQueryTimeoutExceeded(deletionTime);
-
+ .isMaxQueryTimeoutExceeded(deletionTime);
}
-
- return false;
+ return (SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() ||
SegmentStatus
+ .INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus()) &&
cleanStaleInProgress &&
+ TrashUtil.isTrashRetentionTimeoutExceeded(deletionTime);
}
private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String
segmentId,
@@ -221,12 +250,12 @@ public final class DeleteLoadFolders {
}
public static boolean deleteLoadFoldersFromFileSystem(
- AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
- LoadMetadataDetails[] details, String metadataPath) {
+ AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
LoadMetadataDetails[]
+ details, String metadataPath, boolean cleanStaleInProgress) {
boolean isDeleted = false;
if (details != null && details.length != 0) {
for (LoadMetadataDetails oneLoad : details) {
- if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
+ if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete,
cleanStaleInProgress)) {
ICarbonLock segmentLock =
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) +
LockUsage.LOCK);
try {
@@ -237,7 +266,7 @@ public final class DeleteLoadFolders {
LoadMetadataDetails currentDetails =
getCurrentLoadStatusOfSegment(oneLoad.getLoadName(),
metadataPath);
if (currentDetails != null &&
checkIfLoadCanBeDeleted(currentDetails,
- isForceDelete)) {
+ isForceDelete, cleanStaleInProgress)) {
oneLoad.setVisibility("false");
isDeleted = true;
LOGGER.info("Info: Deleted the load " +
oneLoad.getLoadName());
diff --git a/docs/clean-files.md b/docs/clean-files.md
index 14ffb60..db4885b 100644
--- a/docs/clean-files.md
+++ b/docs/clean-files.md
@@ -24,6 +24,7 @@ Clean files command is used to remove the Compacted, Marked
For Delete ,In Progr
```
CLEAN FILES FOR TABLE TABLE_NAME
```
+The above clean files command will clean Marked For Delete and Compacted
segments depending on ```max.query.execution.time``` (default 1 hr) and ```
carbon.trash.retention.days``` (default 7 days). It will also delete the
timestamp subdirectories from the trash folder after expiration day(default 7
day, can be configured)
### TRASH FOLDER
@@ -37,10 +38,24 @@ Clean files command is used to remove the Compacted, Marked
For Delete ,In Progr
```
Once the timestamp subdirectory is expired as per the configured expiration
day value, that subdirectory is deleted from the trash folder in the subsequent
clean files command.
-### FORCE DELETE TRASH
-The force option with clean files command deletes all the files and folders
from the trash folder.
+### FORCE OPTION
+The force option with clean files command deletes all the files and folders
from the trash folder and delete the Marked for Delete and Compacted segments
immediately. Since Clean Files operation with force option will delete data
that can never be recovered, the force option by default is disabled. Clean
files with force option is only allowed when the carbon property
```carbon.clean.file.force.allowed``` is set to true. The default value of this
property is false.
+
+
```
CLEAN FILES FOR TABLE TABLE_NAME options('force'='true')
```
-Since Clean Files operation with force option will delete data that can never
be recovered, the force option by default is disabled. Clean files with force
option is only allowed when the carbon property
```carbon.clean.file.force.allowed``` is set to true. The default value of this
property is false.
\ No newline at end of file
+
+### STALE_INPROGRESS OPTION
+The stale_inprogress option deletes the stale Insert In Progress segments
after the expiration of the property ```carbon.trash.retention.days```
+
+ ```
+ CLEAN FILES FOR TABLE TABLE_NAME options('stale_inprogress'='true')
+ ```
+
+The stale_inprogress option with force option will delete Marked for delete,
Compacted and stale Insert In progress immediately. It will also empty the
trash folder immediately.
+
+ ```
+ CLEAN FILES FOR TABLE TABLE_NAME options('stale_inprogress'='true',
'force'='true')
+ ```
\ No newline at end of file
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithAggQueries.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithAggQueries.scala
index 5255ebd..45a2af3 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithAggQueries.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithAggQueries.scala
@@ -21,7 +21,9 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import
org.apache.carbondata.common.exceptions.sql.MalformedIndexCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
/**
* test cases with secondary index and agg queries
@@ -161,7 +163,11 @@ class TestIndexModelWithAggQueries extends QueryTest with
BeforeAndAfterAll {
case Some(row) => assert(row.get(1).toString.contains("Marked for
Delete"))
case None => assert(false)
}
- sql("clean files for table clean")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql("clean files for table clean options('force'='true')")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
val mainTable = CarbonEnv.getCarbonTable(Some("default"),
"clean")(sqlContext.sparkSession)
val indexTable = CarbonEnv.getCarbonTable(Some("default"), "clean_index")(
sqlContext.sparkSession)
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
index 6f1b709..a18036d 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
@@ -23,7 +23,9 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
/**
@@ -59,7 +61,11 @@ class TestIndexModelWithIUD extends QueryTest with
BeforeAndAfterAll {
.equals(SegmentStatus.MARKED_FOR_DELETE.getMessage))
// execute clean files
- sql("clean files for table dest")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql("clean files for table dest options('force'='true')")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
sql("show segments for table index_dest2").collect()
val exception_index_dest1 = intercept[IndexOutOfBoundsException] {
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
index 5d6f816..6a0d268 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
@@ -19,6 +19,8 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
/**
@@ -30,6 +32,8 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
sql("drop index if exists indextable1 on maintable")
sql("drop index if exists indextable2 on maintable")
sql("drop table if exists maintable")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
}
test("reindex command after deleting segments from SI table") {
@@ -40,7 +44,7 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)")
- sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
sql(s"""ALTER TABLE default.indextable1 SET
|SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
val df1 = sql("select * from maintable where c =
'string2'").queryExecution.sparkPlan
@@ -68,7 +72,7 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE
test.INDEXTABLE1").count()
sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
- sql("CLEAN FILES FOR TABLE test.INDEXTABLE1")
+ sql("CLEAN FILES FOR TABLE test.INDEXTABLE1 options('force'='true')")
sql(s"""ALTER TABLE test.indextable1 SET
|SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
val df1 = sql("select * from test.maintable where c =
'string2'").queryExecution.sparkPlan
@@ -95,7 +99,7 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
- sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
assert(preDeleteSegments!=postDeleteSegments)
sql(s"""ALTER TABLE default.indextable1 SET
@@ -126,7 +130,7 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(1,2,3)")
- sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
assert(preDeleteSegments!=postDeleteSegments)
sql(s"""ALTER TABLE default.indextable1 SET
@@ -150,9 +154,9 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE MAINTABLE").count()
sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
- sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
sql("DELETE FROM TABLE INDEXTABLE2 WHERE SEGMENT.ID IN(0,1)")
- sql("CLEAN FILES FOR TABLE INDEXTABLE2")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE2 options('force'='true')")
val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE1").count()
val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE2").count()
assert(preDeleteSegments!=postDeleteSegmentsIndexOne)
@@ -174,9 +178,9 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE MAINTABLE").count()
sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
- sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
sql("DELETE FROM TABLE INDEXTABLE2 WHERE SEGMENT.ID IN(1)")
- sql("CLEAN FILES FOR TABLE INDEXTABLE2")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE2 options('force'='true')")
val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE1").count()
val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE2").count()
assert(preDeleteSegments != postDeleteSegmentsIndexOne)
@@ -204,9 +208,9 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
val preDeleteSegmentsTableOne = sql("SHOW SEGMENTS FOR TABLE
test.MAINTABLE1").count()
sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
- sql("CLEAN FILES FOR TABLE test.INDEXTABLE1")
+ sql("CLEAN FILES FOR TABLE test.INDEXTABLE1 options('force'='true')")
sql("DELETE FROM TABLE test.INDEXTABLE2 WHERE SEGMENT.ID IN(0,1)")
- sql("CLEAN FILES FOR TABLE test.INDEXTABLE2")
+ sql("CLEAN FILES FOR TABLE test.INDEXTABLE2 options('force'='true')")
val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE
test.INDEXTABLE1").count()
val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE
test.INDEXTABLE2").count()
@@ -219,9 +223,9 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
val preDeleteSegmentsTableTwo = sql("SHOW SEGMENTS FOR TABLE
test.MAINTABLE2").count()
sql("DELETE FROM TABLE test.INDEXTABLE3 WHERE SEGMENT.ID IN(1)")
- sql("CLEAN FILES FOR TABLE test.INDEXTABLE3")
+ sql("CLEAN FILES FOR TABLE test.INDEXTABLE3 options('force'='true')")
sql("DELETE FROM TABLE test.INDEXTABLE4 WHERE SEGMENT.ID IN(0,1)")
- sql("CLEAN FILES FOR TABLE test.INDEXTABLE4")
+ sql("CLEAN FILES FOR TABLE test.INDEXTABLE4 options('force'='true')")
val postDeleteSegmentsIndexThree = sql("SHOW SEGMENTS FOR TABLE
test.INDEXTABLE3").count()
val postDeleteSegmentsIndexFour = sql("SHOW SEGMENTS FOR TABLE
test.INDEXTABLE4").count()
@@ -252,6 +256,8 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
sql("drop index if exists indextable1 on maintable")
sql("drop index if exists indextable2 on maintable")
sql("drop table if exists maintable")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
}
}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
index b8f0e76..286dea5 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
@@ -283,6 +283,12 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
indexTable.getMetadataPath + CarbonCommonConstants.FILE_SEPARATOR +
CarbonTablePath.TABLE_STATUS_FILE,
loadMetadataDetailsList)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql(s"CLEAN FILES FOR TABLE ud_index1
OPTIONS('stale_inprogress'='true','force'='true')")
+ .show()
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
sql(s"""ALTER TABLE default.ud_index1 SET
|SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 480e357..160cdd1 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.api
-import java.io.{DataInputStream, File, FileNotFoundException,
InputStreamReader}
+import java.io.{DataInputStream, FileNotFoundException, InputStreamReader}
import java.time.{Duration, Instant}
import java.util
import java.util.{Collections, Comparator}
@@ -41,6 +41,7 @@ import
org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFile
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{FileFormat,
LoadMetadataDetails, SegmentStatus, SegmentStatusManager, StageInput}
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.segment.StreamSegment
@@ -304,9 +305,19 @@ object CarbonStore {
tablePath: String,
carbonTable: CarbonTable,
forceTableClean: Boolean,
+ isForceDelete: Boolean,
+ cleanStaleInprogress: Boolean,
currentTablePartitions: Option[Seq[PartitionSpec]] = None,
truncateTable: Boolean = false): Unit = {
- var carbonCleanFilesLock: ICarbonLock = null
+ // CleanFiles API is also exposed to the user, if they call this API with
isForceDelete = true,
+ // need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+ if (isForceDelete &&
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+ LOGGER.error("Clean Files with Force option deletes the physical data
and it cannot be" +
+ " recovered. It is disabled by default, to enable clean files with
force option," +
+ " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + "
to true")
+ throw new RuntimeException("Clean files with force operation not
permitted by default")
+ }
+ var carbonCleanFilesLock: ICarbonLock = null
val absoluteTableIdentifier = if (forceTableClean) {
AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
} else {
@@ -328,8 +339,8 @@ object CarbonStore {
if (truncateTable) {
SegmentStatusManager.truncateTable(carbonTable)
}
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(
- carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+ isForceDelete, currentTablePartitions.map(_.asJava).orNull,
cleanStaleInprogress, true)
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
currentTablePartitions match {
case Some(partitions) =>
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
b/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
index b7e9c20..509e166 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
@@ -34,5 +34,6 @@ case class CleanFilesPreEvent(carbonTable: CarbonTable,
sparkSession: SparkSessi
* @param carbonTable
* @param sparkSession
*/
-case class CleanFilesPostEvent(carbonTable: CarbonTable, sparkSession:
SparkSession)
+case class CleanFilesPostEvent(carbonTable: CarbonTable, cleanStaleInProgress:
Boolean,
+ ifForceDelete: Boolean, sparkSession: SparkSession)
extends Event with CleanFilesEventInfo
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index af23001..b355c07 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -595,7 +595,8 @@ object CommonUtil {
try {
val carbonTable = CarbonMetadata.getInstance
.getCarbonTable(tableUniqueName)
-
SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+
SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, false, null,
+ true, true)
} catch {
case _: Exception =>
LOGGER.warn(s"Error while cleaning table " +
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
index f202a17..70b766b 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
@@ -66,7 +66,7 @@ object MVRefresher {
val viewIdentifier = viewSchema.getIdentifier
val viewTableIdentifier = viewTable.getAbsoluteTableIdentifier
// Clean up the old invalid segment data before creating a new entry for
new load.
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(viewTable, false, null)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(viewTable, false, null,
false, false)
val segmentStatusManager: SegmentStatusManager = new
SegmentStatusManager(viewTableIdentifier)
// Acquire table status lock to handle concurrent data loading
val lock: ICarbonLock = segmentStatusManager.getTableStatusLock
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index fe8f98b..c0ca47a 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -58,8 +58,12 @@ case class CarbonCleanFilesCommand(
var carbonTable: CarbonTable = _
var cleanFileCommands: List[CarbonCleanFilesCommand] = List.empty
val optionsMap = options.getOrElse(List.empty[(String, String)]).toMap
- // forceClean will empty trash
+ // forceClean will clean the MFD and Compacted segments immediately and also
empty the trash
+ // folder
val forceClean = optionsMap.getOrElse("force", "false").toBoolean
+ // stale_inprogress will clean the In Progress segments based on retention
time and it will
+ // clean immediately when force is true
+ val staleInprogress = optionsMap.getOrElse("stale_inprogress",
"false").toBoolean
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
carbonTable = CarbonEnv.getCarbonTable(databaseNameOp,
tableName.get)(sparkSession)
@@ -117,9 +121,9 @@ case class CarbonCleanFilesCommand(
CleanFilesUtil.cleanStaleSegments(carbonTable)
}
if (forceTableClean) {
- deleteAllData(sparkSession, databaseNameOp, tableName.get)
+ deleteAllData(sparkSession, databaseNameOp, tableName.get, forceClean,
staleInprogress)
} else {
- cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+ cleanGarbageData(sparkSession, databaseNameOp, tableName.get,
forceClean, staleInprogress)
}
} else {
cleanGarbageDataInAllTables(sparkSession)
@@ -128,13 +132,14 @@ case class CarbonCleanFilesCommand(
cleanFileCommands.foreach(_.processData(sparkSession))
}
val cleanFilesPostEvent: CleanFilesPostEvent =
- CleanFilesPostEvent(carbonTable, sparkSession)
+ CleanFilesPostEvent(carbonTable, staleInprogress, forceClean,
sparkSession)
OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent,
operationContext)
Seq.empty
}
private def deleteAllData(sparkSession: SparkSession,
- databaseNameOp: Option[String], tableName: String): Unit = {
+ databaseNameOp: Option[String], tableName: String, isForceDelete:
Boolean,
+ cleanStaleInprogress: Boolean): Unit = {
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR +
tableName
@@ -143,11 +148,14 @@ case class CarbonCleanFilesCommand(
tableName = tableName,
tablePath = tablePath,
carbonTable = null, // in case of delete all data carbonTable is not
required.
- forceTableClean = forceTableClean)
+ forceTableClean = forceTableClean,
+ isForceDelete = isForceDelete,
+ cleanStaleInprogress = cleanStaleInprogress)
}
private def cleanGarbageData(sparkSession: SparkSession,
- databaseNameOp: Option[String], tableName: String): Unit = {
+ databaseNameOp: Option[String], tableName: String, isForceDelete:
Boolean,
+ cleanStaleInprogress: Boolean): Unit = {
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non
transactional table")
}
@@ -161,6 +169,8 @@ case class CarbonCleanFilesCommand(
tablePath = carbonTable.getTablePath,
carbonTable = carbonTable,
forceTableClean = forceTableClean,
+ isForceDelete = isForceDelete,
+ cleanStaleInprogress = cleanStaleInprogress,
currentTablePartitions = partitions,
truncateTable = truncateTable)
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
index 75d319e..b114191 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
@@ -108,7 +108,7 @@ case class CarbonInsertIntoWithDf(databaseNameOp:
Option[String],
operationContext = operationContext)
// Clean up the old invalid segment data before creating a new entry for
new load.
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false,
currPartitions)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false,
currPartitions, false, false)
// add the start entry for the new load in the table status file
if ((updateModel.isEmpty || updateModel.isDefined)
&& !table.isHivePartitionTable) {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index d5c3c84..65641b3 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -125,7 +125,7 @@ case class CarbonLoadDataCommand(databaseNameOp:
Option[String],
updateModel = None,
operationContext = operationContext)
// Clean up the old invalid segment data before creating a new entry for
new load.
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false,
currPartitions)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false,
currPartitions, false, false)
// add the start entry for the new load in the table status file
if (!table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
index 97f7836..869bd24 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
@@ -24,16 +24,14 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock,
LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus,
SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{CleanFilesPostEvent, Event,
OperationContext, OperationEventListener}
@@ -53,13 +51,16 @@ class CleanFilesPostEventListener extends
OperationEventListener with Logging {
val carbonTable = cleanFilesPostEvent.carbonTable
val indexTables = CarbonIndexUtil
.getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
+ val isForceDelete = cleanFilesPostEvent.ifForceDelete
+ val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
indexTables.foreach { indexTable =>
val partitions: Option[Seq[PartitionSpec]] =
CarbonFilters.getPartitions(
Seq.empty[Expression],
cleanFilesPostEvent.sparkSession,
indexTable)
SegmentStatusManager.deleteLoadsAndUpdateMetadata(
- indexTable, true, partitions.map(_.asJava).orNull)
+ indexTable, isForceDelete, partitions.map(_.asJava).orNull,
inProgressSegmentsClean,
+ true)
CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 82c92d2..cdbc711 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -485,7 +485,7 @@ object SecondaryIndexCreator {
try {
if (!isCompactionCall) {
SegmentStatusManager
- .deleteLoadsAndUpdateMetadata(indexCarbonTable, false, null)
+ .deleteLoadsAndUpdateMetadata(indexCarbonTable, false, null,
false, false)
}
} catch {
case e: Exception =>
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index f3426c1..adcfae1 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -29,7 +29,10 @@ import org.apache.spark.sql.test.TestQueryExecutor
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonLoadOptionConstants}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties,
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -170,6 +173,13 @@ class QueryTest extends PlanTest {
LOGGER.error(sessionParams.asScala.map(x => x._1 + "=" + x._2).mkString(",
"))
}
+ def removeSegmentEntryFromTableStatusFile(carbonTable: CarbonTable,
segmentNo: String) : Unit = {
+ val details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ .filter(as => as.getLoadName != segmentNo)
+
SegmentStatusManager.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
+ carbonTable.getTablePath), details)
+ }
+
def printTable(table: String, database: String = "default"): Unit = {
sql("SELECT current_database()").show(100, false)
sql(s"describe formatted ${ database }.${ table }").show(100, false)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
index a9cb652..f07a30e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -39,7 +39,8 @@ object CleanFiles {
* drop table from hive metastore so should be very
careful to use it.
*/
def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
- forceTableClean: Boolean = false): Unit = {
+ forceTableClean: Boolean = false, isForceDeletion: Boolean = false,
+ cleanStaleInProgress: Boolean = false ): Unit = {
TableAPIUtil.validateTableExists(spark, dbName, tableName)
val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(spark)
val carbonTable = if (!forceTableClean) {
@@ -52,7 +53,9 @@ object CleanFiles {
tableName = tableName,
tablePath = tablePath,
carbonTable = carbonTable,
- forceTableClean = forceTableClean)
+ forceTableClean = forceTableClean,
+ isForceDeletion,
+ cleanStaleInProgress)
}
def main(args: Array[String]): Unit = {
@@ -65,10 +68,14 @@ object CleanFiles {
val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) =
TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
var forceTableClean = false
- if (args.length > 2) {
+ var isForceDeletion = false
+ var cleanInprogress = false
+ if (args.length > 4) {
forceTableClean = args(2).toBoolean
+ isForceDeletion = args(3).toBoolean
+ cleanInprogress = args(4).toBoolean
}
val spark = TableAPIUtil.spark(storePath, s"CleanFiles:
$dbName.$tableName")
- cleanFiles(spark, dbName, tableName, forceTableClean)
+ cleanFiles(spark, dbName, tableName, forceTableClean, isForceDeletion,
cleanInprogress)
}
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala
b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala
index d9d8e84..8f8d2be 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala
@@ -855,7 +855,11 @@ class BloomCoarseGrainIndexFunctionSuite
}
// delete and clean the first segment, the corresponding index files
should be cleaned too
sql(s"DELETE FROM TABLE $bloomSampleTable WHERE SEGMENT.ID IN (0)")
- sql(s"CLEAN FILES FOR TABLE $bloomSampleTable")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql(s"CLEAN FILES FOR TABLE $bloomSampleTable options('force'='true')")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
var indexPath =
CarbonTablePath.getIndexesStorePath(carbonTable.getTablePath, "0", indexName)
assert(!FileUtils.getFile(indexPath).exists(),
"index file of this segment has been deleted, should not exist")
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 2f37d2d..5d798d9 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -1001,7 +1001,11 @@ class TestBinaryDataType extends QueryTest with
BeforeAndAfterAll {
assert(SegmentSequenceIds.length == 8)
// clean files
- segments = sql("CLEAN FILES FOR TABLE carbontable")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ segments = sql("CLEAN FILES FOR TABLE carbontable
options('force'='true')")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
segments = sql("SHOW SEGMENTS FOR TABLE carbontable")
SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0)
}
assert(SegmentSequenceIds.contains("0.2"))
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
index 68e263a..5a90482 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
@@ -380,7 +380,11 @@ class TestLoadDataWithCompression extends QueryTest with
BeforeAndAfterEach with
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8)))
assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 8)
sql(s"ALTER TABLE $tableName COMPACT 'major'")
- sql(s"CLEAN FILES FOR TABLE $tableName")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql(s"CLEAN FILES FOR TABLE $tableName options('force'='true')")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
// after compaction and clean, there should be on segment
checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8)))
assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 1)
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 1d385cb..0d167fe 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -452,7 +452,11 @@ class InsertIntoCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
sql("insert into show_insert select 'abc',1")
sql("insert overwrite table show_insert select * from show_insert")
assert(sql("show segments for table show_insert").collect().length == 4)
- sql("clean files for table show_insert")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql("clean files for table show_insert options('force'='true')")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
assert(sql("show segments for table show_insert").collect().length == 1)
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
index 3618274..5615142 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
@@ -34,6 +34,29 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
var count = 0
+ test("clean up table and test trash folder with IN PROGRESS segments with
trash time = 0") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, "0")
+ // do not send the segment folders to trash
+ createTable()
+ loadData()
+ val path = CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(sqlContext.sparkSession)
+ .getTablePath
+ val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+ editTableStatusFile(path)
+ assert(!FileFactory.isFileExist(trashFolderPath))
+
+ val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+ assert(segmentNumber1 == 4)
+ sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('stale_inprogress'='true')").show
+ val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+ assert(0 == segmentNumber2)
+ assert(!FileFactory.isFileExist(trashFolderPath))
+ // no carbondata file is added to the trash
+ assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+ sql("""DROP TABLE IF EXISTS CLEANTEST""")
+ }
+
test("clean up table and test trash folder with IN PROGRESS segments") {
// do not send the segment folders to trash
createTable()
@@ -46,13 +69,19 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
assert(segmentNumber1 == 4)
- sql(s"CLEAN FILES FOR TABLE cleantest").show
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('stale_inprogress'='true','force'='true')").show
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
assert(0 == segmentNumber2)
assert(!FileFactory.isFileExist(trashFolderPath))
// no carbondata file is added to the trash
assert(getFileCountInTrashFolder(trashFolderPath) == 0)
sql("""DROP TABLE IF EXISTS CLEANTEST""")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS)
}
test("clean up table and test trash folder with Marked For Delete and
Compacted segments") {
@@ -67,7 +96,12 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
assert(!FileFactory.isFileExist(trashFolderPath))
sql(s"""Delete from table cleantest where segment.id in(4)""")
val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
- sql(s"CLEAN FILES FOR TABLE cleantest").show
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+ sql(s"""show segments for table cleantest""").show()
val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
assert(segmentNumber1 == segmentNumber2 + 5)
assert(!FileFactory.isFileExist(trashFolderPath))
@@ -81,7 +115,6 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
createTable()
loadData()
sql(s"""alter table cleantest compact 'minor'""")
- sql(s"CLEAN FILES FOR TABLE cleantest").show
sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2, "name"""")
checkAnswer(sql(s"""select count(*) from cleantest"""),
Seq(Row(5)))
@@ -89,26 +122,20 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
.getTablePath
val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
assert(!FileFactory.isFileExist(trashFolderPath))
- // All 4 segments are made as stale segments and should be moved to trash
- deleteTableStatusFile(path)
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "4")
assert(!FileFactory.isFileExist(trashFolderPath))
sql(s"CLEAN FILES FOR TABLE cleantest").show()
checkAnswer(sql(s"""select count(*) from cleantest"""),
- Seq(Row(0)))
+ Seq(Row(4)))
count = 0
var list = getFileCountInTrashFolder(trashFolderPath)
- assert(list == 4)
+ assert(list == 2)
val timeStamp = getTimestampFolderName(trashFolderPath)
// recovering data from trash folder
- val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ timeStamp +
- CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER
+ "0.1"
val segment4Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ timeStamp +
CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER
+ '4'
- sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
- sql("INSERT INTO cleantest select * from c1").show()
- sql("drop table c1")
-
sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment4Path'")
sql("INSERT INTO cleantest select * from c1").show()
sql("drop table c1")
@@ -120,7 +147,7 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
sql(s"CLEAN FILES FOR TABLE cleantest").show()
count = 0
list = getFileCountInTrashFolder(trashFolderPath)
- assert(list == 4)
+ assert(list == 2)
intercept[RuntimeException] {
sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
@@ -151,34 +178,31 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
val mainTablePath = CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(sqlContext
.sparkSession).getTablePath
- deleteTableStatusFile(mainTablePath)
+ // deleteTableStatusFile(mainTablePath)
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "1")
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "2")
val mainTableTrashFolderPath =
CarbonTablePath.getTrashFolderPath(mainTablePath)
assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
- checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+ checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(2)))
checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
assert(FileFactory.isFileExist(mainTableTrashFolderPath))
count = 0
var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
- assert(listMainTable == 8)
+ assert(listMainTable == 4)
// recovering data from trash folder
val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
- val segment0Path = mainTableTrashFolderPath +
CarbonCommonConstants.FILE_SEPARATOR +
- timeStamp + CarbonCommonConstants.FILE_SEPARATOR +
CarbonCommonConstants.LOAD_FOLDER + '0'
val segment1Path = mainTableTrashFolderPath +
CarbonCommonConstants.FILE_SEPARATOR +
timeStamp + CarbonCommonConstants.FILE_SEPARATOR +
CarbonCommonConstants.LOAD_FOLDER + '1'
val segment2Path = mainTableTrashFolderPath +
CarbonCommonConstants.FILE_SEPARATOR +
timeStamp + CarbonCommonConstants.FILE_SEPARATOR +
CarbonCommonConstants.LOAD_FOLDER + '2'
- val segment3Path = mainTableTrashFolderPath +
CarbonCommonConstants.FILE_SEPARATOR +
- timeStamp + CarbonCommonConstants.FILE_SEPARATOR +
CarbonCommonConstants.LOAD_FOLDER + '3'
- sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
- sql("INSERT INTO cleantest select * from c1").show()
- sql("drop table c1")
sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
sql("INSERT INTO cleantest select * from c1").show()
@@ -188,10 +212,6 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
sql("INSERT INTO cleantest select * from c1").show()
sql("drop table c1")
- sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
- sql("INSERT INTO cleantest select * from c1").show()
- sql("drop table c1")
-
checkAnswer(sql(s"""select count(*) from cleantest"""),
Seq(Row(4)))
intercept[RuntimeException] {
@@ -212,14 +232,15 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
test("test trash folder with 2 segments with same segment number") {
createTable()
- sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+ loadData()
val path = CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(sqlContext.sparkSession)
.getTablePath
val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
assert(!FileFactory.isFileExist(trashFolderPath))
// All 4 segments are made as stale segments, they should be moved to the
trash folder
- deleteTableStatusFile(path)
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "3")
assert(!FileFactory.isFileExist(trashFolderPath))
sql(s"CLEAN FILES FOR TABLE cleantest").show()
@@ -228,7 +249,8 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
assert(list == 2)
sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
- deleteTableStatusFile(path)
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "3")
sql(s"CLEAN FILES FOR TABLE cleantest").show()
count = 0
@@ -262,17 +284,17 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
.getTablePath
val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
assert(!FileFactory.isFileExist(trashFolderPath))
- // All 4 segments are made as stale segments and should be moved to trash
- deleteTableStatusFile(path)
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "1")
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "2")
assert(!FileFactory.isFileExist(trashFolderPath))
sql(s"CLEAN FILES FOR TABLE cleantest").show()
checkAnswer(sql(s"""select count(*) from cleantest"""),
- Seq(Row(0)))
+ Seq(Row(2)))
count = 0
var list = getFileCountInTrashFolder(trashFolderPath)
- assert(list == 8)
- val timeStamp = getTimestampFolderName(trashFolderPath)
-
+ assert(list == 4)
sql(s"CLEAN FILES FOR TABLE cleantest").show()
count = 0
list = getFileCountInTrashFolder(trashFolderPath)
@@ -324,12 +346,6 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
timeStampList.get(0).getName
}
- def deleteTableStatusFile(carbonTablePath: String) : Unit = {
- val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR +
"Metadata" +
- CarbonCommonConstants.FILE_SEPARATOR + "tablestatus") // Original File
- f1.delete()
- }
-
def createTable() : Unit = {
sql("""DROP TABLE IF EXISTS CLEANTEST""")
sql(
@@ -345,5 +361,4 @@ class TestCleanFileCommand extends QueryTest with
BeforeAndAfterAll {
sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1, "name"""")
sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1, "name"""")
}
-
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
index 8954b12..06ac6f8 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
@@ -45,7 +45,11 @@ class TestCleanFilesCommandPartitionTable extends QueryTest
with BeforeAndAfterA
assert(!FileFactory.isFileExist(trashFolderPath))
val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
assert(segmentNumber1 == 4)
- sql(s"CLEAN FILES FOR TABLE cleantest").show
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql(s"CLEAN FILES FOR TABLE cleantest
OPTIONS('stale_inprogress'='true','force'='true')").show
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
assert(0 == segmentNumber2)
assert(!FileFactory.isFileExist(trashFolderPath))
@@ -67,7 +71,11 @@ class TestCleanFilesCommandPartitionTable extends QueryTest
with BeforeAndAfterA
assert(!FileFactory.isFileExist(trashFolderPath))
sql(s"""Delete from table cleantest where segment.id in(4)""")
val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
- sql(s"CLEAN FILES FOR TABLE cleantest").show
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
assert(segmentNumber1 == segmentNumber2 + 5)
assert(!FileFactory.isFileExist(trashFolderPath))
@@ -80,13 +88,13 @@ class TestCleanFilesCommandPartitionTable extends QueryTest
with BeforeAndAfterA
test("test trash folder with 2 segments with same segment number") {
createParitionTable()
- sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
-
+ loadData()
val path = CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(sqlContext.sparkSession)
.getTablePath
val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
CarbonTablePath.TRASH_DIR
assert(!FileFactory.isFileExist(trashFolderPath))
- deleteTableStatusFile(path)
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "3")
assert(!FileFactory.isFileExist(trashFolderPath))
sql(s"CLEAN FILES FOR TABLE cleantest").show()
@@ -95,7 +103,8 @@ class TestCleanFilesCommandPartitionTable extends QueryTest
with BeforeAndAfterA
assert(list == 2)
sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
- deleteTableStatusFile(path)
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "3")
sql(s"CLEAN FILES FOR TABLE cleantest").show()
count = 0
@@ -124,20 +133,17 @@ class TestCleanFilesCommandPartitionTable extends
QueryTest with BeforeAndAfterA
val path = CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(sqlContext.sparkSession)
.getTablePath
val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
- // All 4 segments are made as stale segments, they should be moved to the
trash folder
- deleteTableStatusFile(path)
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "1")
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "2")
sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
checkAnswer(sql(s"""select count(*) from cleantest"""),
- Seq(Row(0)))
+ Seq(Row(2)))
val timeStamp = getTimestampFolderName(trashFolderPath)
// test recovery from partition table
- val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ timeStamp +
- "/Segment_0"
- sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
- sql("INSERT INTO cleantest select * from c1").show()
- sql("drop table c1")
val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ timeStamp +
"/Segment_1"
@@ -151,12 +157,6 @@ class TestCleanFilesCommandPartitionTable extends
QueryTest with BeforeAndAfterA
sql("INSERT INTO cleantest select * from c1").show()
sql("drop table c1")
- val segment3Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ timeStamp +
- "/Segment_3"
- sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
- sql("INSERT INTO cleantest select * from c1").show()
- sql("drop table c1")
-
checkAnswer(sql(s"""select count(*) from cleantest"""),
Seq(Row(4)))
@@ -179,19 +179,15 @@ class TestCleanFilesCommandPartitionTable extends
QueryTest with BeforeAndAfterA
val path = CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(sqlContext.sparkSession)
.getTablePath
val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
- // All 4 segments are made as stale segments, they should be moved to the
trash folder
- // createStaleSegments(path)
- deleteTableStatusFile(path)
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "1")
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "2")
sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
val timeStamp = getTimestampFolderName(trashFolderPath)
// test recovery from partition table
- val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ timeStamp +
- "/Segment_0"
- sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
- sql("INSERT INTO cleantest select * from c1").show()
- sql("drop table c1")
val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ timeStamp +
"/Segment_1"
@@ -225,37 +221,32 @@ class TestCleanFilesCommandPartitionTable extends
QueryTest with BeforeAndAfterA
val mainTablePath = CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(sqlContext
.sparkSession).getTablePath
- deleteTableStatusFile(mainTablePath)
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "1")
+
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
+ sqlContext.sparkSession), "2")
val mainTableTrashFolderPath = mainTablePath +
CarbonCommonConstants.FILE_SEPARATOR +
CarbonTablePath.TRASH_DIR
assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
- checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+ checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(2)))
checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
assert(FileFactory.isFileExist(mainTableTrashFolderPath))
count = 0
var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
- assert(listMainTable == 8)
+ assert(listMainTable == 4)
// recovering data from trash folder
val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
- val segment0Path = mainTableTrashFolderPath +
CarbonCommonConstants.FILE_SEPARATOR +
- timeStamp + CarbonCommonConstants.FILE_SEPARATOR +
CarbonCommonConstants.LOAD_FOLDER + '0'
val segment1Path = mainTableTrashFolderPath +
CarbonCommonConstants.FILE_SEPARATOR +
timeStamp + CarbonCommonConstants.FILE_SEPARATOR +
CarbonCommonConstants.LOAD_FOLDER + '1'
val segment2Path = mainTableTrashFolderPath +
CarbonCommonConstants.FILE_SEPARATOR +
timeStamp + CarbonCommonConstants.FILE_SEPARATOR +
CarbonCommonConstants.LOAD_FOLDER + '2'
- val segment3Path = mainTableTrashFolderPath +
CarbonCommonConstants.FILE_SEPARATOR +
- timeStamp + CarbonCommonConstants.FILE_SEPARATOR +
CarbonCommonConstants.LOAD_FOLDER + '3'
-
- sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
- sql("INSERT INTO cleantest select * from c1").show()
- sql("drop table c1")
sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
sql("INSERT INTO cleantest select * from c1").show()
@@ -265,10 +256,6 @@ class TestCleanFilesCommandPartitionTable extends
QueryTest with BeforeAndAfterA
sql("INSERT INTO cleantest select * from c1").show()
sql("drop table c1")
- sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
- sql("INSERT INTO cleantest select * from c1").show()
- sql("drop table c1")
-
checkAnswer(sql(s"""select count(*) from cleantest"""),
Seq(Row(4)))
intercept[RuntimeException] {
@@ -329,12 +316,6 @@ class TestCleanFilesCommandPartitionTable extends
QueryTest with BeforeAndAfterA
timeStampList.get(0).getName
}
- def deleteTableStatusFile(carbonTablePath: String) : Unit = {
- val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR +
"Metadata" +
- CarbonCommonConstants.FILE_SEPARATOR + "tablestatus") // Original File
- f1.delete()
- }
-
def createParitionTable() : Unit = {
sql("""DROP TABLE IF EXISTS CLEANTEST""")
sql(
@@ -350,4 +331,5 @@ class TestCleanFilesCommandPartitionTable extends QueryTest
with BeforeAndAfterA
sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"johnny","adc"""")
sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"Reddit","adc"""")
}
+
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index 3af3aec..2ae15dc 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -53,9 +53,13 @@ class CompactionSupportGlobalSortFunctionTest
| CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age
INT)
| STORED AS carbondata
""".stripMargin)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
}
override def afterEach {
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
sql("DROP TABLE IF EXISTS compaction_globalsort")
sql("DROP TABLE IF EXISTS carbon_localsort")
resetConf
@@ -255,7 +259,7 @@ class CompactionSupportGlobalSortFunctionTest
sql("alter table compaction_globalsort set
tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
- sql("clean files for table compaction_globalsort")
+ sql("clean files for table compaction_globalsort options('force'='true')")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"),
false, "Compacted")
@@ -290,7 +294,7 @@ class CompactionSupportGlobalSortFunctionTest
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true,
"city,name")
sql("alter table compaction_globalsort set
tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
- sql("clean files for table compaction_globalsort")
+ sql("clean files for table compaction_globalsort options('force'='true')")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"),
false, "Compacted")
@@ -324,7 +328,7 @@ class CompactionSupportGlobalSortFunctionTest
sql("alter table compaction_globalsort set
tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
- sql("clean files for table compaction_globalsort")
+ sql("clean files for table compaction_globalsort options('force'='true')")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"),
false, "Compacted")
@@ -361,7 +365,7 @@ class CompactionSupportGlobalSortFunctionTest
sql("alter table compaction_globalsort set
tblproperties('global_sort_partitions'='2')")
sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
- sql("clean files for table compaction_globalsort")
+ sql("clean files for table compaction_globalsort options('force'='true')")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"),
false, "Compacted")
@@ -394,7 +398,7 @@ class CompactionSupportGlobalSortFunctionTest
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true,
"city,name")
sql("alter table compaction_globalsort set
tblproperties('global_sort_partitions'='2')")
sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
- sql("clean files for table compaction_globalsort")
+ sql("clean files for table compaction_globalsort options('force'='true')")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"),
false, "Compacted")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
@@ -498,7 +502,7 @@ class CompactionSupportGlobalSortFunctionTest
sql("alter table compaction_globalsort set
tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
- sql("clean files for table compaction_globalsort")
+ sql("clean files for table compaction_globalsort options('force'='true')")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0)
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala
index 35a8bca..748fdd2 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala
@@ -32,10 +32,14 @@ class TableLevelCompactionOptionTest extends QueryTest
val sampleFilePath: String = resourcesPath + "/sample.csv"
override def beforeEach {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
cleanTable()
}
override def afterEach {
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
resetConf()
cleanTable()
}
@@ -116,7 +120,7 @@ class TableLevelCompactionOptionTest extends QueryTest
}
sql("ALTER TABLE carbon_table COMPACT 'MAJOR'")
- sql("CLEAN FILES FOR TABLE carbon_table")
+ sql("CLEAN FILES FOR TABLE carbon_table options('force'='true')")
val segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0)
}
@@ -181,7 +185,7 @@ class TableLevelCompactionOptionTest extends QueryTest
for (i <- 0 until 8) {
sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
}
- sql("CLEAN FILES FOR TABLE carbon_table")
+ sql("CLEAN FILES FOR TABLE carbon_table options('force'='true')")
var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0)
}
assert(segmentSequenceIds.size == 1)
@@ -251,7 +255,7 @@ class TableLevelCompactionOptionTest extends QueryTest
for (i <- 0 until 6) {
sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
}
- sql("CLEAN FILES FOR TABLE carbon_table")
+ sql("CLEAN FILES FOR TABLE carbon_table options('force'='true')")
var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0)
}
assert(segmentSequenceIds.contains("0.1"))
@@ -271,7 +275,7 @@ class TableLevelCompactionOptionTest extends QueryTest
for (i <- 0 until 2) {
sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
}
- sql("CLEAN FILES FOR TABLE carbon_table")
+ sql("CLEAN FILES FOR TABLE carbon_table options('force'='true')")
segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
assert(segmentSequenceIds.contains("0.2"))
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
index 1361dac..c6306c0 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -29,7 +29,8 @@ class FlatFolderTableLoadingTestCase extends QueryTest with
BeforeAndAfterAll {
// scalastyle:off lineLength
override def beforeAll {
dropTable
-
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
sql(
@@ -102,7 +103,7 @@ class FlatFolderTableLoadingTestCase extends QueryTest with
BeforeAndAfterAll {
assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
.listFiles()
.count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT))
== 5)
- sql("clean files for table t1")
+ sql("clean files for table t1 options('force'='true')")
assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
.listFiles()
.count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT))
== 4)
@@ -111,7 +112,7 @@ class FlatFolderTableLoadingTestCase extends QueryTest with
BeforeAndAfterAll {
assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
.listFiles()
.count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT))
== 5)
- sql("clean files for table t1")
+ sql("clean files for table t1 options('force'='true')")
assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
.listFiles()
.count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT))
== 1)
@@ -137,7 +138,7 @@ class FlatFolderTableLoadingTestCase extends QueryTest with
BeforeAndAfterAll {
assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length
== 4)
sql("Alter table flatfolder_delete compact 'minor'")
assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length
== 4)
- sql("clean files for table flatfolder_delete")
+ sql("clean files for table flatfolder_delete options('force'='true')")
assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length
== 1)
assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length
== 0)
sql("drop table if exists flatfolder_delete")
@@ -166,7 +167,7 @@ class FlatFolderTableLoadingTestCase extends QueryTest with
BeforeAndAfterAll {
sql("Alter table flatfolder_delete compact 'minor'")
assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
.filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length
== 8)
- sql("clean files for table flatfolder_delete")
+ sql("clean files for table flatfolder_delete options('force'='true')")
assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
.filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length == 1)
assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
@@ -176,6 +177,8 @@ class FlatFolderTableLoadingTestCase extends QueryTest with
BeforeAndAfterAll {
override def afterAll: Unit = {
CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+ CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 14aa542..b6d530c 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -43,6 +43,8 @@ class DeleteCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
|AS carbondata""".stripMargin)
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source2.csv' INTO table
iud_db.source2""")
sql("use iud_db")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
}
test("delete data from carbon table with alias [where clause ]") {
@@ -201,9 +203,9 @@ class DeleteCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
sql("insert into select_after_clean select 3,'uhj'")
sql("insert into select_after_clean select 4,'frg'")
sql("alter table select_after_clean compact 'minor'")
- sql("clean files for table select_after_clean")
+ sql("clean files for table select_after_clean options('force'='true')")
sql("delete from select_after_clean where name='def'")
- sql("clean files for table select_after_clean")
+ sql("clean files for table select_after_clean options('force'='true')")
assertResult(false)(new File(
CarbonTablePath.getSegmentPath(s"$storeLocation/iud_db.db/select_after_clean",
"0")).exists())
checkAnswer(sql("""select * from select_after_clean"""),
@@ -469,5 +471,7 @@ class DeleteCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
override def afterAll {
sql("use default")
sql("drop database if exists iud_db cascade")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
}
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
index 3a280ce..d15d9c6 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
@@ -22,14 +22,26 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
/**
* Test Class for SHOW SEGMENTS command
*/
class ShowSegmentTestCase extends QueryTest with BeforeAndAfterAll {
+ override def beforeAll {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ }
+
+ override def afterAll {
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+ }
+
test("test show segment by query, success case") {
sql("drop table if exists source")
sql(
@@ -173,7 +185,7 @@ class ShowSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
var historyDetail =
SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath)
assert(detail.length == 10)
assert(historyDetail.length == 0)
- sql(s"clean files for table ${tableName}")
+ sql(s"clean files for table ${tableName} options('force'='true')")
assert(sql(s"show segments on ${tableName}").collect().length == 2)
assert(sql(s"show segments on ${tableName} limit 1").collect().length == 1)
detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
@@ -193,13 +205,19 @@ class ShowSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
assert(sql(s"show segments on ${ tableName } as select * from ${ tableName
}_segments")
.collect()
.length == 10)
+ sql(s"show segments on ${ tableName } as select * from ${ tableName
}_segments")
+ .show()
assert(sql(s"show history segments on ${ tableName } as select * from ${
tableName }_segments")
.collect()
.length == 10)
- sql(s"clean files for table ${tableName}")
+ sql(s"show history segments on ${tableName} as select * from
${tableName}_segments").show()
+ sql(s"clean files for table ${tableName} options('force'='true')")
assert(sql(s"show segments on ${ tableName } as select * from ${ tableName
}_segments")
.collect()
.length == 2)
+ sql(s"show segments on ${ tableName } as select * from ${ tableName
}_segments")
+ .show()
+ sql(s"show history segments on ${tableName} as select * from
${tableName}_segments").show()
sql(s"show history segments on ${tableName} as select * from
${tableName}_segments").collect()
var segmentsHistoryList = sql(s"show history segments on ${ tableName } " +
s"as select * from ${ tableName }_segments")
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
index b5cbfb5..1aeea03 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.{CarbonUtils, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
/**
* Testcase for set segment in multhread env
@@ -52,6 +54,8 @@ class TestSegmentReadingForMultiThreading extends QueryTest
with BeforeAndAfterA
sql(
s"LOAD DATA LOCAL INPATH '$resourcesPath/data1.csv' INTO TABLE
carbon_table_MulTI_THread " +
"OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
}
test("test multithreading for segment reading") {
@@ -91,5 +95,7 @@ class TestSegmentReadingForMultiThreading extends QueryTest
with BeforeAndAfterA
override def afterAll: Unit = {
sql("DROP TABLE IF EXISTS carbon_table_MulTI_THread")
CarbonUtils.threadUnset("carbon.input.segments.default.carbon_table_MulTI_THread")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
}
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 4d3c0f3..f771440 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -47,6 +47,8 @@ class StandardPartitionGlobalSortTestCase extends QueryTest
with BeforeAndAfterA
""".stripMargin)
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
}
test("test global sort column as partition column") {
@@ -942,7 +944,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest
with BeforeAndAfterA
assert(sql("select * from comp_dt2").collect().length == 4)
sql("Alter table comp_dt2 compact 'minor'")
assert(sql("select * from comp_dt2").collect().length == 4)
- sql("clean files for table comp_dt2")
+ sql("clean files for table comp_dt2 options('force'='true')")
assert(sql("select * from comp_dt2").collect().length == 4)
sql("insert into comp_dt2 select 5,'E','2003-01-01',3")
sql("insert into comp_dt2 select 6,'F','2003-01-01',3")
@@ -950,7 +952,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest
with BeforeAndAfterA
sql("insert into comp_dt2 select 8,'H','2004-01-01',''")
assert(sql("select * from comp_dt2").collect().length == 8)
sql("Alter table comp_dt2 compact 'minor'")
- sql("clean files for table comp_dt2")
+ sql("clean files for table comp_dt2 options('force'='true')")
assert(sql("select * from comp_dt2").collect().length == 8)
assert(sql("select * from comp_dt2").collect().length == 8)
sql("insert into comp_dt2 select 9,'H','2001-01-01',1")
@@ -961,7 +963,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest
with BeforeAndAfterA
sql("Alter table comp_dt2 compact 'minor'")
assert(sql("show segments for table comp_dt2").collect().length == 8)
assert(sql("select * from comp_dt2").collect().length == 12)
- sql("clean files for table comp_dt2")
+ sql("clean files for table comp_dt2 options('force'='true')")
assert(sql("select * from comp_dt2").collect().length == 12)
sql("insert into comp_dt2 select 13,'L','2004-01-01', 6")
assert(sql("select * from comp_dt2").collect().length == 13)
@@ -969,7 +971,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest
with BeforeAndAfterA
assert(sql("select * from comp_dt2").collect().length == 13)
assert(sql("show segments for table comp_dt2").collect().length == 3)
assert(sql("select * from comp_dt2").collect().length == 13)
- sql("clean files for table comp_dt2")
+ sql("clean files for table comp_dt2 options('force'='true')")
assert(sql("show segments for table comp_dt2").collect().length == 1)
assert(sql("select * from comp_dt2").collect().length == 13)
}
@@ -1071,6 +1073,8 @@ class StandardPartitionGlobalSortTestCase extends
QueryTest with BeforeAndAfterA
override def afterAll: Unit = {
CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+ CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index b9bb98b..cd87bc8 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -33,7 +33,8 @@ class StandardPartitionTableCleanTestCase extends QueryTest
with BeforeAndAfterA
override def beforeAll {
dropTable
-
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
@@ -195,13 +196,15 @@ class StandardPartitionTableCleanTestCase extends
QueryTest with BeforeAndAfterA
sql(s"delete from table partitionalldeleteseg where segment.id in
(1)").collect()
checkExistence(sql(s"show segments for table partitionalldeleteseg"),
true, "Marked for Delete")
checkAnswer(sql(s"Select count(*) from partitionalldeleteseg"),
Seq(Row(30)))
- sql(s"CLEAN FILES FOR TABLE partitionalldeleteseg").collect()
+ sql(s"CLEAN FILES FOR TABLE partitionalldeleteseg
options('force'='true')").collect()
assert(sql(s"show segments for table partitionalldeleteseg").count == 3)
}
override def afterAll: Unit = {
CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+ CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index c59bef9..9d294a5 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -30,7 +30,8 @@ class StandardPartitionTableCompactionTestCase extends
QueryTest with BeforeAndA
defaultConfig()
dropTable
-
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
sql(
@@ -166,7 +167,7 @@ class StandardPartitionTableCompactionTestCase extends
QueryTest with BeforeAndA
for (i <- 0 until 4) {
sql(s"""insert into staticpartitioncompaction
PARTITION(deptname='software') select
empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation
from originTable""")
}
- sql("CLEAN FILES FOR TABLE staticpartitioncompaction").collect()
+ sql("CLEAN FILES FOR TABLE staticpartitioncompaction
options('force'='true')").collect()
val segments = sql("SHOW SEGMENTS FOR TABLE staticpartitioncompaction")
val segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0)
}
assert(segmentSequenceIds.size==1)
@@ -210,6 +211,8 @@ class StandardPartitionTableCompactionTestCase extends
QueryTest with BeforeAndA
override def afterAll: Unit = {
CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+ CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
dropTable
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
index 37c93b9..4c41b3a 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
@@ -35,6 +36,8 @@ class TestPartitionWithMV extends QueryTest with
BeforeAndAfterAll with BeforeAn
override def beforeAll(): Unit = {
defaultConfig()
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
sql("drop database if exists partition_mv cascade")
sql("create database partition_mv")
sql("use partition_mv")
@@ -54,6 +57,8 @@ class TestPartitionWithMV extends QueryTest with
BeforeAndAfterAll with BeforeAn
override def afterAll(): Unit = {
sql("drop database if exists partition_mv cascade")
sql("use default")
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
}
override def beforeEach(): Unit = {
@@ -259,7 +264,7 @@ class TestPartitionWithMV extends QueryTest with
BeforeAndAfterAll with BeforeAn
sql("insert into droppartition values('k',2,2014,1,1)")
sql("insert into droppartition values('k',2,2015,2,3)")
sql("alter table droppartition drop partition(year=2015,month=2,day=3)")
- sql("clean files for table droppartition")
+ sql("clean files for table droppartition options('force'='true')")
val table = CarbonEnv.getCarbonTable(Option("partition_mv"),
"droppartition")(sqlContext.sparkSession)
val mvTable = CarbonEnv.getCarbonTable(Option("partition_mv"),
"droppartition")(sqlContext.sparkSession)
val mvtablePath = mvTable.getTablePath
diff --git
a/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
b/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index 8012630..4ee1542 100644
---
a/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++
b/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -115,9 +115,14 @@ class CarbonCommandSuite extends QueryTest with
BeforeAndAfterAll {
test("clean files") {
val table = "carbon_table3"
+ dropTable(table)
createAndLoadTestTable(table, "csv_table")
DeleteSegmentById.main(Array(s"${location}", table, "0"))
- CleanFiles.main(Array(s"${location}", table))
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ CleanFiles.main(Array(s"${location}", table, "false", "true", "true"))
+ CarbonProperties.getInstance()
+ .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default",
table)
val tablePath = carbonTable.getAbsoluteTableIdentifier.getTablePath
val f = new File(s"$tablePath/Fact/Part0")
@@ -132,7 +137,7 @@ class CarbonCommandSuite extends QueryTest with
BeforeAndAfterAll {
val table = "carbon_table4"
dropTable(table)
createAndLoadTestTable(table, "csv_table")
- CleanFiles.main(Array(s"${location}", table, "true"))
+ CleanFiles.main(Array(s"${location}", table, "true", "false", "false"))
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default",
table)
val tablePath = carbonTable.getTablePath
val f = new File(tablePath)