This is an automated email from the ASF dual-hosted git repository. qiangcai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 097e80c [CARBONDATA-4028] Fix failed to unlock issue during update 097e80c is described below commit 097e80c6a17fb26185ca2e9e74c8e5ba43ad9809 Author: haomarch <marchp...@126.com> AuthorDate: Mon Oct 12 12:52:52 2020 +0800 [CARBONDATA-4028] Fix failed to unlock issue during update Why is this PR needed? 1. In the update flow, we unpresist dataset before unlocking. unlock will fail once the dataset unpresist is interrupted. 2. cleanStaleDeltaFiles will hold the lock, which degrade the concurrency perf a lot. What changes were proposed in this PR? 1. unlock before unpresisting dataset 2. cleanStaleDeltaFiles won't hold the lock. Does this PR introduce any user interface change? No Is any new testcase added? No This closes #3978 --- .../mutation/CarbonProjectForDeleteCommand.scala | 15 ++++++-- .../mutation/CarbonProjectForUpdateCommand.scala | 42 +++++++++++++++++----- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index 6c6d65a..d2ce2d4 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -169,8 +169,19 @@ private[sql] case class CarbonProjectForDeleteCommand( if (lockStatus) { CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK) } - updateLock.unlock() - compactionLock.unlock() + + if (updateLock.unlock()) { + LOGGER.info(s"updateLock unlocked successfully after delete operation $tableName") + } else { + LOGGER.error(s"Unable to unlock updateLock for table $tableName after delete operation"); + } + + if (compactionLock.unlock()) { + LOGGER.info(s"compactionLock unlocked successfully after delete operation $tableName") + } else { + LOGGER.error(s"Unable to unlock compactionLock for " + + s"table $tableName after delete operation"); + } } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 50d1e21..032f410 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -118,6 +118,8 @@ private[sql] case class CarbonProjectForUpdateCommand( // var dataFrame: DataFrame = null var dataSet: DataFrame = null val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset + var hasException = false + var fileTimestamp = "" try { lockStatus = metadataLock.lockWithRetries() if (lockStatus) { @@ -218,14 +220,14 @@ private[sql] case class CarbonProjectForUpdateCommand( LOGGER.error( "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e) // In case of failure , clean all related delta files - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) - + fileTimestamp = e.compactionTimeStamp.toString + hasException = true case e: Exception => LOGGER.error("Exception in update operation", e) // ****** start clean up. // In case of failure , clean all related delete delta files - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "") - + fileTimestamp = currentTime + "" + hasException = true // *****end clean up. if (null != e.getMessage) { sys.error("Update operation failed. " + e.getMessage) @@ -235,14 +237,38 @@ private[sql] case class CarbonProjectForUpdateCommand( } sys.error("Update operation failed. please check logs.") } finally { - if (null != dataSet && isPersistEnabled) { - dataSet.unpersist() + if (updateLock.unlock()) { + LOGGER.info(s"updateLock unlocked successfully after update $tableName") + } else { + LOGGER.error(s"Unable to unlock updateLock for table $tableName after table update"); + } + + if (compactionLock.unlock()) { + LOGGER.info(s"compactionLock unlocked successfully after update $tableName") + } else { + LOGGER.error(s"Unable to unlock compactionLock for " + + s"table $tableName after update"); } - updateLock.unlock() - compactionLock.unlock() + if (lockStatus) { CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK) } + + if (null != dataSet && isPersistEnabled) { + try { + dataSet.unpersist() + } catch { + case e: Exception => + LOGGER.error(s"Exception in update $tableName" + e.getMessage, e) + } + } + + // In case of failure, clean all related delete delta files. + if (hasException) { + // When the table has too many segemnts, it will take a long time. + // So moving it to the end and it is outside of locking. + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp) + } } Seq(Row(updatedRowCount)) }