This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d535a1e  [CARBONDATA-4154] Fix various concurrent issues with clean 
files
d535a1e is described below

commit d535a1e22f1800afab9a148f42a9124efc6df192
Author: Vikram Ahuja <[email protected]>
AuthorDate: Tue Mar 16 16:57:10 2021 +0530

    [CARBONDATA-4154] Fix various concurrent issues with clean files
    
    Why is this PR needed?
    There are 2 issues in clean files operation when ran concurrently with 
multiple load operations:
    
    Dry run can show negative space freed for clean files with concurrent load.
    Accidental deletion of Insert in progress(ongoing load) during clean files 
operation.
    What changes were proposed in this PR?
    To solve the dry run negative result, saving the old metadatadetails before 
the clean files operation and comparing it with loadmetadetails after the clean 
files operation and just ignoring any new entry that has been added, basically 
doing an intersection of new and old metadatadetails to show the correct space 
freed.
    In case of load failure issue, there can be scenarios where load in going 
on(insert in progress state and segment lock is occupied) and as during clean 
files operation when the final table status lock is removed, there can be 
scenarios where the load has completed and the segment lock is released but in 
the clean files in the final list of loadmetadatadetails to be deleted, that 
load can still be in Insert In Progress state with segment lock released by the 
load. The clean files opera [...]
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4109
---
 .../core/statusmanager/SegmentStatusManager.java   | 24 +++---
 .../carbondata/core/util/DeleteLoadFolders.java    | 89 ++++++++++++----------
 .../apache/carbondata/trash/DataTrashManager.scala | 39 ++++++++--
 3 files changed, 92 insertions(+), 60 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 38d9e56..5735839 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -27,11 +27,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -1039,10 +1035,10 @@ public class SegmentStatusManager {
 
   private static class ReturnTuple {
     LoadMetadataDetails[] details;
-    boolean isUpdateRequired;
-    ReturnTuple(LoadMetadataDetails[] details, boolean isUpdateRequired) {
+    Set<String> loadsToDelete;
+    ReturnTuple(LoadMetadataDetails[] details, Set<String> loadsToDelete) {
       this.details = details;
-      this.isUpdateRequired = isUpdateRequired;
+      this.loadsToDelete = loadsToDelete;
     }
   }
 
@@ -1050,10 +1046,10 @@ public class SegmentStatusManager {
       AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] 
details,
       boolean cleanStaleInProgress) {
     // Delete marked loads
-    boolean isUpdateRequired = DeleteLoadFolders
+    Set<String> loadsToDelete = DeleteLoadFolders
         .deleteLoadFoldersFromFileSystem(absoluteTableIdentifier, 
isForceDeletion, details,
             carbonTable.getMetadataPath(), cleanStaleInProgress);
-    return new ReturnTuple(details, isUpdateRequired);
+    return new ReturnTuple(details, loadsToDelete);
   }
 
   public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, 
boolean isForceDeletion,
@@ -1066,11 +1062,12 @@ public class SegmentStatusManager {
     if (isLoadDeletionRequired(metadataDetails)) {
       AbsoluteTableIdentifier identifier = 
carbonTable.getAbsoluteTableIdentifier();
       boolean updateCompletionStatus = false;
+      Set<String> loadsToDelete = new HashSet<>();
       LoadMetadataDetails[] newAddedLoadHistoryList = null;
       ReturnTuple tuple =
           isUpdateRequired(isForceDeletion, carbonTable, identifier, 
metadataDetails,
               cleanStaleInprogress);
-      if (tuple.isUpdateRequired) {
+      if (!tuple.loadsToDelete.isEmpty()) {
         ICarbonLock carbonTableStatusLock =
             CarbonLockFactory.getCarbonLockObj(identifier, 
LockUsage.TABLE_STATUS_LOCK);
         boolean locked = false;
@@ -1091,7 +1088,7 @@ public class SegmentStatusManager {
             ReturnTuple tuple2 =
                 isUpdateRequired(isForceDeletion, carbonTable,
                     identifier, details, cleanStaleInprogress);
-            if (!tuple2.isUpdateRequired) {
+            if (tuple2.loadsToDelete.isEmpty()) {
               return;
             }
             // read latest table status again.
@@ -1130,6 +1127,7 @@ public class SegmentStatusManager {
                   latestStatus.toArray(new LoadMetadataDetails[0]));
             }
             updateCompletionStatus = true;
+            loadsToDelete = tuple2.loadsToDelete;
           } else {
             String dbName = 
identifier.getCarbonTableIdentifier().getDatabaseName();
             String tableName = 
identifier.getCarbonTableIdentifier().getTableName();
@@ -1147,7 +1145,7 @@ public class SegmentStatusManager {
           if (updateCompletionStatus) {
             DeleteLoadFolders
                 .physicalFactAndMeasureMetadataDeletion(carbonTable, 
newAddedLoadHistoryList,
-                  isForceDeletion, partitionSpecs, cleanStaleInprogress);
+                  isForceDeletion, partitionSpecs, cleanStaleInprogress, 
loadsToDelete);
           }
         }
       }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java 
b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index 49db220..746fbb6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -19,7 +19,9 @@ package org.apache.carbondata.core.util;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -69,7 +71,8 @@ public final class DeleteLoadFolders {
       LoadMetadataDetails[] newAddedLoadHistoryList,
       boolean isForceDelete,
       List<PartitionSpec> specs,
-      boolean cleanStaleInProgress) {
+      boolean cleanStaleInProgress,
+      Set<String> loadsToDelete) {
     LoadMetadataDetails[] currentDetails =
         SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
     physicalFactAndMeasureMetadataDeletion(carbonTable,
@@ -77,14 +80,16 @@ public final class DeleteLoadFolders {
         isForceDelete,
         specs,
         currentDetails,
-        cleanStaleInProgress);
+        cleanStaleInProgress,
+        loadsToDelete);
     if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) 
{
       physicalFactAndMeasureMetadataDeletion(carbonTable,
           newAddedLoadHistoryList,
           isForceDelete,
           specs,
           currentDetails,
-          cleanStaleInProgress);
+          cleanStaleInProgress,
+          loadsToDelete);
     }
   }
 
@@ -98,7 +103,8 @@ public final class DeleteLoadFolders {
    */
   private static void physicalFactAndMeasureMetadataDeletion(CarbonTable 
carbonTable,
       LoadMetadataDetails[] loadDetails, boolean isForceDelete, 
List<PartitionSpec> specs,
-      LoadMetadataDetails[] currLoadDetails, boolean cleanStaleInProgress) {
+      LoadMetadataDetails[] currLoadDetails, boolean cleanStaleInProgress,
+      Set<String> loadsToDelete) {
     List<TableIndex> indexes = new ArrayList<>();
     try {
       for (TableIndex index : 
IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) {
@@ -115,7 +121,7 @@ public final class DeleteLoadFolders {
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
     for (final LoadMetadataDetails oneLoad : loadDetails) {
-      if (canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress)) {
+      if (loadsToDelete.contains(oneLoad.getLoadName())) {
         try {
           if (oneLoad.getSegmentFile() != null) {
             String tablePath = 
carbonTable.getAbsoluteTableIdentifier().getTablePath();
@@ -180,15 +186,18 @@ public final class DeleteLoadFolders {
   }
 
   private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
-      boolean isForceDelete, boolean cleanStaleInProgress) {
+      boolean isForceDelete, boolean cleanStaleInProgress, 
AbsoluteTableIdentifier
+      absoluteTableIdentifier) {
     if (oneLoad.getVisibility().equalsIgnoreCase("true")) {
-      return canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress);
+      return canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress,
+          absoluteTableIdentifier);
     }
     return false;
   }
 
-  public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad,
-      boolean isForceDelete, boolean cleanStaleInProgress) {
+  public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad, boolean
+      isForceDelete, boolean cleanStaleInProgress, AbsoluteTableIdentifier
+      absoluteTableIdentifier) {
     /*
      * if cleanStaleInProgress == false and  isForceDelete == false, clean MFD 
and Compacted
      *  segments will depend on query timeout(1 hr) and 
trashRetentionTimeout(7 days, default).
@@ -213,7 +222,8 @@ public final class DeleteLoadFolders {
         return canDelete;
       case INSERT_IN_PROGRESS:
       case INSERT_OVERWRITE_IN_PROGRESS:
-        return canDelete && cleanStaleInProgress;
+        return canDelete && cleanStaleInProgress && 
canSegmentLockBeAcquired(oneLoad,
+            absoluteTableIdentifier);
       default:
         return false;
     }
@@ -230,45 +240,46 @@ public final class DeleteLoadFolders {
     return null;
   }
 
-  public static boolean deleteLoadFoldersFromFileSystem(
+  public static Set<String> deleteLoadFoldersFromFileSystem(
       AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, 
LoadMetadataDetails[]
       details, String metadataPath, boolean cleanStaleInProgress) {
-    boolean isDeleted = false;
+    Set<String> loadsToDelete = new HashSet<>();
     if (details != null && details.length != 0) {
       for (LoadMetadataDetails oneLoad : details) {
-        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete, 
cleanStaleInProgress)) {
-          ICarbonLock segmentLock = 
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
-              CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + 
LockUsage.LOCK);
-          try {
-            if (oneLoad.getSegmentStatus() == 
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
-                || oneLoad.getSegmentStatus() == 
SegmentStatus.INSERT_IN_PROGRESS) {
-              if (segmentLock.lockWithRetries(1, 5)) {
-                LOGGER.info("Info: Acquired segment lock on segment:" + 
oneLoad.getLoadName());
-                LoadMetadataDetails currentDetails =
-                    getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), 
metadataPath);
-                if (currentDetails != null && 
checkIfLoadCanBeDeleted(currentDetails,
-                    isForceDelete, cleanStaleInProgress)) {
-                  oneLoad.setVisibility("false");
-                  isDeleted = true;
-                  LOGGER.info("Info: Deleted the load " + 
oneLoad.getLoadName());
-                }
-              } else {
-                LOGGER.info("Info: Load in progress for segment" + 
oneLoad.getLoadName());
-                return isDeleted;
-              }
-            } else {
+        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete, 
cleanStaleInProgress,
+            absoluteTableIdentifier)) {
+          if (oneLoad.getSegmentStatus() == 
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
+              || oneLoad.getSegmentStatus() == 
SegmentStatus.INSERT_IN_PROGRESS) {
+            LoadMetadataDetails currentDetails =
+                getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), 
metadataPath);
+            if (currentDetails != null && 
checkIfLoadCanBeDeleted(currentDetails,
+                isForceDelete, cleanStaleInProgress, absoluteTableIdentifier)) 
{
               oneLoad.setVisibility("false");
-              isDeleted = true;
-              LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
+              loadsToDelete.add(oneLoad.getLoadName());
+              LOGGER.info("Deleted the load " + oneLoad.getLoadName());
             }
-          } finally {
-            segmentLock.unlock();
-            LOGGER.info("Info: Segment lock on segment:" + 
oneLoad.getLoadName() + " is released");
+          } else {
+            oneLoad.setVisibility("false");
+            loadsToDelete.add(oneLoad.getLoadName());
+            LOGGER.info("Deleted the load " + oneLoad.getLoadName());
           }
         }
       }
     }
-    return isDeleted;
+    return loadsToDelete;
   }
 
+  private static boolean canSegmentLockBeAcquired(LoadMetadataDetails oneLoad,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    ICarbonLock segmentLock = 
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+        CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + 
LockUsage.LOCK);
+    if (segmentLock.lockWithRetries()) {
+      LOGGER.info("Segment Lock on segment: " + oneLoad.getLoadName() + "can 
be acquired.");
+      return segmentLock.unlock();
+    } else {
+      LOGGER.info("Segment Lock on segment: " + oneLoad.getLoadName() + "can 
not be" +
+          " acquired. Load going on for that load");
+    }
+    return false;
+  }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
index 1d0bd3b..a41a323 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
@@ -86,11 +86,13 @@ object DataTrashManager {
       // Since calculating the the size before and after clean files can be a 
costly operation
       // have exposed an option where user can change this behaviour.
       if (showStatistics) {
-        val sizeBeforeCleaning = getSizeSnapshot(carbonTable)
+        val metadataDetails = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        val sizeBeforeCleaning = getPreOpSizeSnapshot(carbonTable, 
metadataDetails)
         checkAndCleanExpiredSegments(carbonTable, isForceDelete,
           cleanStaleInProgress, partitionSpecs)
-        val sizeAfterCleaning = getSizeSnapshot(carbonTable)
-        sizeBeforeCleaning - sizeAfterCleaning + trashFolderSizeStats._1
+        val sizeAfterCleaning = getPostOpSizeSnapshot(carbonTable, 
metadataDetails
+            .map(a => a.getLoadName).toSet)
+        (sizeBeforeCleaning - sizeAfterCleaning + trashFolderSizeStats._1).abs
       } else {
         checkAndCleanExpiredSegments(carbonTable, isForceDelete,
           cleanStaleInProgress, partitionSpecs)
@@ -107,11 +109,11 @@ object DataTrashManager {
   }
 
   /**
-   * Checks the size of the segment files as well as datafiles, this method is 
used before and after
-   * clean files operation to check how much space is actually freed, during 
the operation.
+   * Checks the size of the segment files as well as datafiles and index 
files, this method
+   * is used before clean files operation.
    */
-  def getSizeSnapshot(carbonTable: CarbonTable): Long = {
-    val metadataDetails = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+  def getPreOpSizeSnapshot(carbonTable: CarbonTable, metadataDetails:
+      Array[LoadMetadataDetails]): Long = {
     var size: Long = 0
     val segmentFileLocation = 
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)
     if (FileFactory.isFileExist(segmentFileLocation)) {
@@ -126,6 +128,26 @@ object DataTrashManager {
   }
 
   /**
+   * Checks the size of the segment files as well as datafiles, this method is 
used after
+   * clean files operation.
+   */
+  def getPostOpSizeSnapshot(carbonTable: CarbonTable, metadataDetails: 
Set[String]): Long = {
+    val finalMetadataDetails = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    var size: Long = 0
+    val segmentFileLocation = 
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)
+    if (FileFactory.isFileExist(segmentFileLocation)) {
+      size += FileFactory.getDirectorySize(segmentFileLocation)
+    }
+    finalMetadataDetails.foreach(oneLoad =>
+      if (metadataDetails.contains(oneLoad.getLoadName) && 
oneLoad.getVisibility.toBoolean) {
+        size += calculateSegmentSizeForOneLoad(carbonTable, oneLoad, 
finalMetadataDetails)
+      }
+    )
+    size
+  }
+
+
+  /**
    * Method to handle the Clean files dry run operation
    */
   def cleanFilesDryRunOperation (
@@ -198,7 +220,8 @@ object DataTrashManager {
         if (!oneLoad.getVisibility.equalsIgnoreCase("false")) {
           val segmentFilePath = 
CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath,
               oneLoad.getSegmentFile)
-          if (DeleteLoadFolders.canDeleteThisLoad(oneLoad, isForceDelete, 
cleanStaleInProgress)) {
+          if (DeleteLoadFolders.canDeleteThisLoad(oneLoad, isForceDelete, 
cleanStaleInProgress,
+              carbonTable.getAbsoluteTableIdentifier)) {
             // No need to consider physical data for external segments, only 
consider metadata.
             if (oneLoad.getPath() == null || 
oneLoad.getPath().equalsIgnoreCase("NA")) {
               sizeFreed += calculateSegmentSizeForOneLoad(carbonTable, 
oneLoad, loadMetadataDetails)

Reply via email to