This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new bce6481  [CARBONDATA-4144] During compaction, the segment lock of SI 
table is not released in abnormal scenarios.
bce6481 is described below

commit bce6481ec79519cd1855b4b9c2c8bc83d7aafd37
Author: liuhe0702 <[email protected]>
AuthorDate: Wed Mar 10 15:45:41 2021 +0800

    [CARBONDATA-4144] During compaction, the segment lock of SI table is not 
released in abnormal
    scenarios.
    
    Why is this PR needed?
    When compact operation fails, the segment lock of SI table is not released. 
Run compaction again,
    can not get the segment lock of the SI table and compation does nothing, 
but in the tablestatus
    file of SI table the merged segment status is set to success and the 
segmentfile is
    xxx_null.segments and the vaule of indexsize is 0.
    
    What changes were proposed in this PR?
    If an exception occurs, release the obtained segment locks.
    If getting segment locks failed, not update the segment status.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4102
---
 .../scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala   | 5 +++++
 .../apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala  | 3 +++
 2 files changed, 8 insertions(+)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index d0bdbc0..3e61713 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -93,6 +93,11 @@ object Compactor {
             segmentToSegmentTimestampMap, null,
             forceAccessSegment, isCompactionCall = true,
             isLoadToFailedSISegments = false)
+        if (segmentLocks.isEmpty) {
+          LOGGER.error(s"Not able to acquire segment lock on the specific 
segment. " +
+            s"Load the compacted segment ${validSegments.head} into SI table 
failed");
+          return
+        }
         allSegmentsLock ++= segmentLocks
         CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus(
           indexCarbonTable,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 4596022..9ab18fd 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -460,6 +460,9 @@ object SecondaryIndexCreator {
     } catch {
       case ex: Exception =>
         LOGGER.error("Load to SI table failed", ex)
+        if (isCompactionCall) {
+          segmentLocks.foreach(segmentLock => segmentLock.unlock())
+        }
         FileInternalUtil
           .updateTableStatus(validSegmentList,
             secondaryIndexModel.carbonLoadModel.getDatabaseName,

Reply via email to