This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 097e80c  [CARBONDATA-4028] Fix failed to unlock issue during update
097e80c is described below

commit 097e80c6a17fb26185ca2e9e74c8e5ba43ad9809
Author: haomarch <marchp...@126.com>
AuthorDate: Mon Oct 12 12:52:52 2020 +0800

    [CARBONDATA-4028] Fix failed to unlock issue during update
    
    Why is this PR needed?
    1. In the update flow, we unpresist dataset before unlocking. unlock will 
fail once the dataset unpresist is interrupted.
    2. cleanStaleDeltaFiles will hold the lock, which degrade the concurrency 
perf a lot.
    
    What changes were proposed in this PR?
    1. unlock before unpresisting dataset
    2. cleanStaleDeltaFiles won't hold the lock.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3978
---
 .../mutation/CarbonProjectForDeleteCommand.scala   | 15 ++++++--
 .../mutation/CarbonProjectForUpdateCommand.scala   | 42 +++++++++++++++++-----
 2 files changed, 47 insertions(+), 10 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 6c6d65a..d2ce2d4 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -169,8 +169,19 @@ private[sql] case class CarbonProjectForDeleteCommand(
       if (lockStatus) {
         CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
       }
-      updateLock.unlock()
-      compactionLock.unlock()
+
+      if (updateLock.unlock()) {
+        LOGGER.info(s"updateLock unlocked successfully after delete operation 
$tableName")
+      } else {
+        LOGGER.error(s"Unable to unlock updateLock for table $tableName after 
delete operation");
+      }
+
+      if (compactionLock.unlock()) {
+        LOGGER.info(s"compactionLock unlocked successfully after delete 
operation $tableName")
+      } else {
+        LOGGER.error(s"Unable to unlock compactionLock for " +
+          s"table $tableName after delete operation");
+      }
     }
   }
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 50d1e21..032f410 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -118,6 +118,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
     //    var dataFrame: DataFrame = null
     var dataSet: DataFrame = null
     val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
+    var hasException = false
+    var fileTimestamp = ""
     try {
       lockStatus = metadataLock.lockWithRetries()
       if (lockStatus) {
@@ -218,14 +220,14 @@ private[sql] case class CarbonProjectForUpdateCommand(
         LOGGER.error(
           "Update operation passed. Exception in Horizontal Compaction. Please 
check logs." + e)
         // In case of failure , clean all related delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, 
e.compactionTimeStamp.toString)
-
+        fileTimestamp = e.compactionTimeStamp.toString
+        hasException = true
       case e: Exception =>
         LOGGER.error("Exception in update operation", e)
         // ****** start clean up.
         // In case of failure , clean all related delete delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
-
+        fileTimestamp = currentTime + ""
+        hasException = true
         // *****end clean up.
         if (null != e.getMessage) {
           sys.error("Update operation failed. " + e.getMessage)
@@ -235,14 +237,38 @@ private[sql] case class CarbonProjectForUpdateCommand(
         }
         sys.error("Update operation failed. please check logs.")
     } finally {
-      if (null != dataSet && isPersistEnabled) {
-        dataSet.unpersist()
+      if (updateLock.unlock()) {
+        LOGGER.info(s"updateLock unlocked successfully after update 
$tableName")
+      } else {
+        LOGGER.error(s"Unable to unlock updateLock for table $tableName after 
table update");
+      }
+
+      if (compactionLock.unlock()) {
+        LOGGER.info(s"compactionLock unlocked successfully after update 
$tableName")
+      } else {
+        LOGGER.error(s"Unable to unlock compactionLock for " +
+          s"table $tableName after update");
       }
-      updateLock.unlock()
-      compactionLock.unlock()
+
       if (lockStatus) {
         CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
       }
+
+      if (null != dataSet && isPersistEnabled) {
+        try {
+          dataSet.unpersist()
+        } catch {
+          case e: Exception =>
+            LOGGER.error(s"Exception in update $tableName" + e.getMessage, e)
+        }
+      }
+
+      // In case of failure, clean all related delete delta files.
+      if (hasException) {
+        // When the table has too many segemnts, it will take a long time.
+        // So moving it to the end and it is outside of locking.
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp)
+      }
     }
     Seq(Row(updatedRowCount))
   }

Reply via email to