This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aafb6b  [CARBONDATA-4081] Fix multiple issues with clean files command
7aafb6b is described below

commit 7aafb6b769b175cb3ca9ed6fbd7e67b2b55b52b5
Author: Vikram Ahuja <[email protected]>
AuthorDate: Thu Dec 10 14:38:17 2020 +0530

    [CARBONDATA-4081] Fix multiple issues with clean files command
    
    Why is this PR needed?
    While getting stale segment, we do a list files and take all the files, if 
there is any folder/file other than .segment file, it will lead to further 
issues while copying data to the trash folder.
    
    In the case when AbstractDFSCarbonFile is created with path and HadoopConf 
instead of fileStatus and if the file does not exist, since fileStatus is empty 
ListDirs returns empty result and getAbsolutePath throws file does not exist 
exception
    
    Clean files is not allowed with concurrent insert overwrite in progress, In 
case with concurrent loading to MV is by default an insert overwrite operation, 
clean files operation on that MV would fail and throw exception.
    
    What changes were proposed in this PR?
    Added a filter, to only consider the files ending with ".segment"
    Using listfiles instead of list dirs in the trash folder
    No need to throw exceptions, just put a info message in case of such MV 
tables and continue blocking clean files for such MV.
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4051
---
 .../apache/carbondata/core/util/CleanFilesUtil.java   |  3 ++-
 .../org/apache/carbondata/core/util/TrashUtil.java    | 19 +++++++++++--------
 docs/clean-files.md                                   |  3 +++
 .../command/management/CarbonCleanFilesCommand.scala  | 11 ++++++++++-
 .../TestCleanFilesCommandPartitionTable.scala         | 11 +++++++++++
 5 files changed, 37 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
index 3a817a0..9889fe3 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -154,7 +154,8 @@ public class CleanFilesUtil {
     String segmentFilesLocation =
         CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
     List<String> segmentFiles = 
Arrays.stream(FileFactory.getCarbonFile(segmentFilesLocation)
-        .listFiles()).map(CarbonFile::getName).collect(Collectors.toList());
+        .listFiles()).map(CarbonFile::getName).filter(segmentFileName -> 
segmentFileName
+        .endsWith(CarbonTablePath.SEGMENT_EXT)).collect(Collectors.toList());
     // there are no segments present in the Metadata folder. Can return here
     if (segmentFiles.size() == 0) {
       return;
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
index c9db279..ae5476d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
@@ -111,7 +111,7 @@ public final class TrashUtil {
         LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " has been 
copied to" +
             " the trash folder successfully. Total files copied: " + 
dataFiles.length);
       } else {
-        LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " does not 
exist");
+        LOGGER.info("Segment: " + segmentPath.getName() + " does not exist");
       }
     } catch (IOException e) {
       LOGGER.error("Error while copying the segment: " + segmentPath.getName() 
+ " to the trash" +
@@ -153,15 +153,17 @@ public final class TrashUtil {
    * per the user defined retention time
    */
   public static void deleteExpiredDataFromTrash(String tablePath) {
-    String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+    CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
+        .getTrashFolderPath(tablePath));
     // Deleting the timestamp based subdirectories in the trashfolder by the 
given timestamp.
     try {
-      if (FileFactory.isFileExist(trashPath)) {
-        List<CarbonFile> timestampFolderList = 
FileFactory.getFolderList(trashPath);
+      if (trashFolder.isFileExist()) {
+        CarbonFile[] timestampFolderList = trashFolder.listFiles();
         for (CarbonFile timestampFolder : timestampFolderList) {
           // If the timeStamp at which the timeStamp subdirectory has expired 
as per the user
           // defined value, delete the complete timeStamp subdirectory
-          if 
(isTrashRetentionTimeoutExceeded(Long.parseLong(timestampFolder.getName()))) {
+          if (timestampFolder.isDirectory() && 
isTrashRetentionTimeoutExceeded(Long
+              .parseLong(timestampFolder.getName()))) {
             FileFactory.deleteAllCarbonFilesOfDir(timestampFolder);
             LOGGER.info("Timestamp subfolder from the Trash folder deleted: " 
+ timestampFolder
                 .getAbsolutePath());
@@ -177,11 +179,12 @@ public final class TrashUtil {
    * The below method deletes all the files and folders in the trash folder of 
a carbon table.
    */
   public static void emptyTrash(String tablePath) {
-    String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+    CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
+        .getTrashFolderPath(tablePath));
     // if the trash folder exists delete the contents of the trash folder
     try {
-      if (FileFactory.isFileExist(trashPath)) {
-        List<CarbonFile> carbonFileList = FileFactory.getFolderList(trashPath);
+      if (trashFolder.isFileExist()) {
+        CarbonFile[] carbonFileList = trashFolder.listFiles();
         for (CarbonFile carbonFile : carbonFileList) {
           FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
         }
diff --git a/docs/clean-files.md b/docs/clean-files.md
index baf81db..dbe611b 100644
--- a/docs/clean-files.md
+++ b/docs/clean-files.md
@@ -26,6 +26,9 @@ Clean files command is used to remove the Compacted, Marked 
For Delete ,In Progr
    ```
 The above clean files command will clean Marked For Delete and Compacted 
segments depending on ```max.query.execution.time``` (default 1 hr) and ``` 
carbon.trash.retention.days``` (default 7 days). It will also delete the 
timestamp subdirectories from the trash folder after expiration day(default 7 
day, can be configured)
 
+**NOTE**:
+  * Clean files operation not supported on non transactional tables.
+  * Clean files operation not supported on tables with concurrent insert 
overwrite operation.
 
 ### TRASH FOLDER
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 3f956e2..c51efef 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -17,12 +17,14 @@
 
 package org.apache.spark.sql.execution.command.management
 
+import org.apache.log4j.Logger
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.optimizer.CarbonFilters
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events._
@@ -38,13 +40,20 @@ case class CarbonCleanFilesCommand(
     isInternalCleanCall: Boolean = false)
   extends DataCommand {
 
+  val LOGGER: Logger = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
     setAuditTable(carbonTable)
     // if insert overwrite in progress, do not allow delete segment
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", 
"clean file")
+      if ((carbonTable.isMV && !isInternalCleanCall) || !carbonTable.isMV) {
+        throw new ConcurrentOperationException(carbonTable, "insert 
overwrite", "clean file")
+      } else if (carbonTable.isMV && isInternalCleanCall) {
+        LOGGER.info(s"Clean files not allowed on table: 
{${carbonTable.getTableName}}")
+        return Seq.empty
+      }
     }
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
index 06ac6f8..c494404 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
@@ -132,6 +132,7 @@ class TestCleanFilesCommandPartitionTable extends QueryTest 
with BeforeAndAfterA
     loadData()
     val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
       .getTablePath
+    addRandomFiles(path)
     val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
     
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(
       sqlContext.sparkSession), "1")
@@ -178,6 +179,7 @@ class TestCleanFilesCommandPartitionTable extends QueryTest 
with BeforeAndAfterA
 
     val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
       .getTablePath
+    addRandomFiles(path)
     val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
     
removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(
       sqlContext.sparkSession), "1")
@@ -332,4 +334,13 @@ class TestCleanFilesCommandPartitionTable extends 
QueryTest with BeforeAndAfterA
     sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"Reddit","adc"""")
   }
 
+  def addRandomFiles(carbonTablePath: String) : Unit = {
+    val f1 = CarbonTablePath.getSegmentFilesLocation(carbonTablePath) +
+      CarbonCommonConstants.FILE_SEPARATOR  + "_.tmp"
+    val f2 = CarbonTablePath.getSegmentFilesLocation(carbonTablePath) +
+      CarbonCommonConstants.FILE_SEPARATOR  + "1_.tmp"
+      FileFactory.createNewFile(f1)
+      FileFactory.createNewFile(f2)
+  }
+
 }

Reply via email to