This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 7aafb6b [CARBONDATA-4081] Fix multiple issues with clean files command
7aafb6b is described below
commit 7aafb6b769b175cb3ca9ed6fbd7e67b2b55b52b5
Author: Vikram Ahuja <[email protected]>
AuthorDate: Thu Dec 10 14:38:17 2020 +0530
[CARBONDATA-4081] Fix multiple issues with clean files command
Why is this PR needed?
While getting stale segment, we do a list files and take all the files, if
there is any folder/file other than .segment file, it will lead to further
issues while copying data to the trash folder.
In the case when AbstractDFSCarbonFile is created with path and HadoopConf
instead of fileStatus and if the file does not exist, since fileStatus is empty
ListDirs returns empty result and getAbsolutePath throws file does not exist
exception
Clean files is not allowed with concurrent insert overwrite in progress, In
case with concurrent loading to MV is by default an insert overwrite operation,
clean files operation on that MV would fail and throw exception.
What changes were proposed in this PR?
Added a filter, to only consider the files ending with ".segment"
Using listfiles instead of list dirs in the trash folder
No need to throw exceptions, just put a info message in case of such MV
tables and continue blocking clean files for such MV.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #4051
---
.../apache/carbondata/core/util/CleanFilesUtil.java | 3 ++-
.../org/apache/carbondata/core/util/TrashUtil.java | 19 +++++++++++--------
docs/clean-files.md | 3 +++
.../command/management/CarbonCleanFilesCommand.scala | 11 ++++++++++-
.../TestCleanFilesCommandPartitionTable.scala | 11 +++++++++++
5 files changed, 37 insertions(+), 10 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
index 3a817a0..9889fe3 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -154,7 +154,8 @@ public class CleanFilesUtil {
String segmentFilesLocation =
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
List<String> segmentFiles =
Arrays.stream(FileFactory.getCarbonFile(segmentFilesLocation)
- .listFiles()).map(CarbonFile::getName).collect(Collectors.toList());
+ .listFiles()).map(CarbonFile::getName).filter(segmentFileName ->
segmentFileName
+ .endsWith(CarbonTablePath.SEGMENT_EXT)).collect(Collectors.toList());
// there are no segments present in the Metadata folder. Can return here
if (segmentFiles.size() == 0) {
return;
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
index c9db279..ae5476d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
@@ -111,7 +111,7 @@ public final class TrashUtil {
LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " has been
copied to" +
" the trash folder successfully. Total files copied: " +
dataFiles.length);
} else {
- LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " does not
exist");
+ LOGGER.info("Segment: " + segmentPath.getName() + " does not exist");
}
} catch (IOException e) {
LOGGER.error("Error while copying the segment: " + segmentPath.getName()
+ " to the trash" +
@@ -153,15 +153,17 @@ public final class TrashUtil {
* per the user defined retention time
*/
public static void deleteExpiredDataFromTrash(String tablePath) {
- String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+ CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
+ .getTrashFolderPath(tablePath));
// Deleting the timestamp based subdirectories in the trashfolder by the
given timestamp.
try {
- if (FileFactory.isFileExist(trashPath)) {
- List<CarbonFile> timestampFolderList =
FileFactory.getFolderList(trashPath);
+ if (trashFolder.isFileExist()) {
+ CarbonFile[] timestampFolderList = trashFolder.listFiles();
for (CarbonFile timestampFolder : timestampFolderList) {
// If the timeStamp at which the timeStamp subdirectory has expired
as per the user
// defined value, delete the complete timeStamp subdirectory
- if
(isTrashRetentionTimeoutExceeded(Long.parseLong(timestampFolder.getName()))) {
+ if (timestampFolder.isDirectory() &&
isTrashRetentionTimeoutExceeded(Long
+ .parseLong(timestampFolder.getName()))) {
FileFactory.deleteAllCarbonFilesOfDir(timestampFolder);
LOGGER.info("Timestamp subfolder from the Trash folder deleted: "
+ timestampFolder
.getAbsolutePath());
@@ -177,11 +179,12 @@ public final class TrashUtil {
* The below method deletes all the files and folders in the trash folder of
a carbon table.
*/
public static void emptyTrash(String tablePath) {
- String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+ CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
+ .getTrashFolderPath(tablePath));
// if the trash folder exists delete the contents of the trash folder
try {
- if (FileFactory.isFileExist(trashPath)) {
- List<CarbonFile> carbonFileList = FileFactory.getFolderList(trashPath);
+ if (trashFolder.isFileExist()) {
+ CarbonFile[] carbonFileList = trashFolder.listFiles();
for (CarbonFile carbonFile : carbonFileList) {
FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
}
diff --git a/docs/clean-files.md b/docs/clean-files.md
index baf81db..dbe611b 100644
--- a/docs/clean-files.md
+++ b/docs/clean-files.md
@@ -26,6 +26,9 @@ Clean files command is used to remove the Compacted, Marked
For Delete ,In Progr
```
The above clean files command will clean Marked For Delete and Compacted
segments depending on ```max.query.execution.time``` (default 1 hr) and ```
carbon.trash.retention.days``` (default 7 days). It will also delete the
timestamp subdirectories from the trash folder after expiration day(default 7
day, can be configured)
+**NOTE**:
+ * Clean files operation not supported on non transactional tables.
+ * Clean files operation not supported on tables with concurrent insert
overwrite operation.
### TRASH FOLDER
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 3f956e2..c51efef 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -17,12 +17,14 @@
package org.apache.spark.sql.execution.command.management
+import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.optimizer.CarbonFilters
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events._
@@ -38,13 +40,20 @@ case class CarbonCleanFilesCommand(
isInternalCleanCall: Boolean = false)
extends DataCommand {
+ val LOGGER: Logger =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp,
tableName)(sparkSession)
setAuditTable(carbonTable)
// if insert overwrite in progress, do not allow delete segment
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
- throw new ConcurrentOperationException(carbonTable, "insert overwrite",
"clean file")
+ if ((carbonTable.isMV && !isInternalCleanCall) || !carbonTable.isMV) {
+ throw new ConcurrentOperationException(carbonTable, "insert
overwrite", "clean file")
+ } else if (carbonTable.isMV && isInternalCleanCall) {
+ LOGGER.info(s"Clean files not allowed on table:
{${carbonTable.getTableName}}")
+ return Seq.empty
+ }
}
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non
transactional table")
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
index 06ac6f8..c494404 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
@@ -132,6 +132,7 @@ class TestCleanFilesCommandPartitionTable extends QueryTest
with BeforeAndAfterA
loadData()
val path = CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(sqlContext.sparkSession)
.getTablePath
+ addRandomFiles(path)
val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
sqlContext.sparkSession), "1")
@@ -178,6 +179,7 @@ class TestCleanFilesCommandPartitionTable extends QueryTest
with BeforeAndAfterA
val path = CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(sqlContext.sparkSession)
.getTablePath
+ addRandomFiles(path)
val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"),
"cleantest")(
sqlContext.sparkSession), "1")
@@ -332,4 +334,13 @@ class TestCleanFilesCommandPartitionTable extends
QueryTest with BeforeAndAfterA
sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"Reddit","adc"""")
}
+ def addRandomFiles(carbonTablePath: String) : Unit = {
+ val f1 = CarbonTablePath.getSegmentFilesLocation(carbonTablePath) +
+ CarbonCommonConstants.FILE_SEPARATOR + "_.tmp"
+ val f2 = CarbonTablePath.getSegmentFilesLocation(carbonTablePath) +
+ CarbonCommonConstants.FILE_SEPARATOR + "1_.tmp"
+ FileFactory.createNewFile(f1)
+ FileFactory.createNewFile(f2)
+ }
+
}