This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ec3536  [CARBONDATA-4126] Concurrent compaction failed with load on 
table
5ec3536 is described below

commit 5ec35367684a9b8e7024d52442eae138d166440e
Author: Karan980 <[email protected]>
AuthorDate: Thu Feb 11 13:14:07 2021 +0530

    [CARBONDATA-4126] Concurrent compaction failed with load on table
    
    Why is this PR needed?
    Concurrent compaction was failing when run in parallel with load.
    During load we acquire SegmentLock for a particular segment, and
    when this same lock we try to acquire during compaction, we were
    not able to acquire this lock and compaction fails.
    
    What changes were proposed in this PR?
    Skipped compaction for segments for which we are not able to acquire
    the SegmentLock instead of throwing the exception.
    
    This closes #4093
---
 .../apache/carbondata/spark/rdd/CarbonTableCompactor.scala  | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index a381089..5db344b 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -92,19 +92,22 @@ class CarbonTableCompactor(
       val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
       val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
       var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
+      val validSegments = new java.util.ArrayList[LoadMetadataDetails]
       loadsToMerge.asScala.foreach { segmentId =>
         val segmentLock = CarbonLockFactory
           
.getCarbonLockObj(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
             .getAbsoluteTableIdentifier,
             CarbonTablePath.addSegmentPrefix(segmentId.getLoadName) + 
LockUsage.LOCK)
-        if (!segmentLock.lockWithRetries()) {
-          throw new Exception(s"Failed to acquire lock on segment 
${segmentId.getLoadName}," +
-            s" during compaction of table 
${compactionModel.carbonTable.getQualifiedName}")
+        if (segmentLock.lockWithRetries()) {
+          validSegments.add(segmentId)
+          segmentLocks += segmentLock
+        } else {
+          LOGGER.warn(s"Failed to acquire lock on segment 
${segmentId.getLoadName}, " +
+                      s"during compaction of table 
${compactionModel.carbonTable.getQualifiedName}")
         }
-        segmentLocks += segmentLock
       }
       try {
-        scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments, 
compactedLoad)
+        scanSegmentsAndSubmitJob(validSegments, compactedSegments, 
compactedLoad)
       } catch {
         case e: Exception =>
           LOGGER.error(s"Exception in compaction thread ${ e.getMessage }", e)

Reply via email to