This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 5ec3536 [CARBONDATA-4126] Concurrent compaction failed with load on
table
5ec3536 is described below
commit 5ec35367684a9b8e7024d52442eae138d166440e
Author: Karan980 <[email protected]>
AuthorDate: Thu Feb 11 13:14:07 2021 +0530
[CARBONDATA-4126] Concurrent compaction failed with load on table
Why is this PR needed?
Concurrent compaction was failing when run in parallel with load.
During load we acquire SegmentLock for a particular segment, and
when this same lock we try to acquire during compaction, we were
not able to acquire this lock and compaction fails.
What changes were proposed in this PR?
Skipped compaction for segments for which we are not able to acquire
the SegmentLock instead of throwing the exception.
This closes #4093
---
.../apache/carbondata/spark/rdd/CarbonTableCompactor.scala | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index a381089..5db344b 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -92,19 +92,22 @@ class CarbonTableCompactor(
val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
+ val validSegments = new java.util.ArrayList[LoadMetadataDetails]
loadsToMerge.asScala.foreach { segmentId =>
val segmentLock = CarbonLockFactory
.getCarbonLockObj(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(segmentId.getLoadName) +
LockUsage.LOCK)
- if (!segmentLock.lockWithRetries()) {
- throw new Exception(s"Failed to acquire lock on segment
${segmentId.getLoadName}," +
- s" during compaction of table
${compactionModel.carbonTable.getQualifiedName}")
+ if (segmentLock.lockWithRetries()) {
+ validSegments.add(segmentId)
+ segmentLocks += segmentLock
+ } else {
+ LOGGER.warn(s"Failed to acquire lock on segment
${segmentId.getLoadName}, " +
+ s"during compaction of table
${compactionModel.carbonTable.getQualifiedName}")
}
- segmentLocks += segmentLock
}
try {
- scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments,
compactedLoad)
+ scanSegmentsAndSubmitJob(validSegments, compactedSegments,
compactedLoad)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in compaction thread ${ e.getMessage }", e)