[
https://issues.apache.org/jira/browse/HBASE-17434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15808912#comment-15808912
]
Manjunath Anand commented on HBASE-17434:
-----------------------------------------
If you are fine with the above discussion and if you can give me contributor
access then I can send the modified patch file. Anyways for now I have added it
along with this comment:-
{code}
>From 9c40770283175a293a168da92450b7d09aed363c Mon Sep 17 00:00:00 2001
From: Manjunath Anand <[email protected]>
Date: Sun, 8 Jan 2017 12:06:14 +0530
Subject: [PATCH] Use read write lock as new synchronization scheme for
compaction pipeline
---
.../hbase/regionserver/CompactingMemStore.java | 6 +--
.../hbase/regionserver/CompactionPipeline.java | 55 +++++++++++++++-------
2 files changed, 41 insertions(+), 20 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index e1289f8..99c1685 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore {
@VisibleForTesting
@Override
protected List<Segment> getSegments() {
- List<Segment> pipelineList = pipeline.getSegments();
- List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
+ List<? extends Segment> pipelineList = pipeline.getSegments();
+ List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
list.add(this.active);
list.addAll(pipelineList);
list.add(this.snapshot);
@@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore {
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
- List<Segment> pipelineList = pipeline.getSegments();
+ List<? extends Segment> pipelineList = pipeline.getSegments();
long order = pipelineList.size();
// The list of elements in pipeline + the active element + the snapshot
segment
// TODO : This will change when the snapshot is made of more than one
element
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 9d5df77..132e8d6 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,36 +50,49 @@ public class CompactionPipeline {
private final RegionServicesForStores region;
private LinkedList<ImmutableSegment> pipeline;
+ private volatile LinkedList<ImmutableSegment> readOnlyCopy;
private long version;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public CompactionPipeline(RegionServicesForStores region) {
this.region = region;
this.pipeline = new LinkedList<>();
+ this.readOnlyCopy = new LinkedList<>();
this.version = 0;
}
public boolean pushHead(MutableSegment segment) {
ImmutableSegment immutableSegment = SegmentFactory.instance().
createImmutableSegment(segment);
- synchronized (pipeline){
- return addFirst(immutableSegment);
+ lock.writeLock().lock();
+ try {
+ boolean res = addFirst(immutableSegment);
+ readOnlyCopy = new LinkedList<>(pipeline);
+ return res;
+ } finally {
+ lock.writeLock().unlock();
}
}
public VersionedSegmentsList getVersionedList() {
- synchronized (pipeline){
- List<ImmutableSegment> segmentList = new ArrayList<>(pipeline);
- return new VersionedSegmentsList(segmentList, version);
+ lock.readLock().lock();
+ try {
+ return new VersionedSegmentsList(readOnlyCopy, version);
+ } finally {
+ lock.readLock().unlock();
}
}
public VersionedSegmentsList getVersionedTail() {
- synchronized (pipeline){
+ lock.readLock().lock();
+ try {
List<ImmutableSegment> segmentList = new ArrayList<>();
if(!pipeline.isEmpty()) {
segmentList.add(0, pipeline.getLast());
}
return new VersionedSegmentsList(segmentList, version);
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -99,7 +113,9 @@ public class CompactionPipeline {
return false;
}
List<ImmutableSegment> suffix;
- synchronized (pipeline){
+ // A write lock as pipeline is modified
+ lock.writeLock().lock();
+ try {
if(versionedList.getVersion() != version) {
return false;
}
@@ -115,6 +131,9 @@ public class CompactionPipeline {
+ ", and the number of cells in new segment is:" + count);
}
swapSuffix(suffix, segment, closeSuffix);
+ readOnlyCopy = new LinkedList<>(pipeline);
+ } finally {
+ lock.writeLock().unlock();
}
if (closeSuffix && region != null) {
// update the global memstore size counter
@@ -168,7 +187,8 @@ public class CompactionPipeline {
return false;
}
- synchronized (pipeline){
+ lock.readLock().lock();
+ try {
if(requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match");
return false;
@@ -186,6 +206,8 @@ public class CompactionPipeline {
}
}
+ } finally {
+ lock.readLock().unlock();
}
// do not update the global memstore size counter and do not increase the
version,
// because all the cells remain in place
@@ -193,33 +215,32 @@ public class CompactionPipeline {
}
public boolean isEmpty() {
- return pipeline.isEmpty();
+ return readOnlyCopy.isEmpty();
}
- public List<Segment> getSegments() {
- synchronized (pipeline){
- return new LinkedList<>(pipeline);
- }
+ public List<? extends Segment> getSegments() {
+ return readOnlyCopy;
}
public long size() {
- return pipeline.size();
+ return readOnlyCopy.size();
}
public long getMinSequenceId() {
long minSequenceId = Long.MAX_VALUE;
if (!isEmpty()) {
- minSequenceId = pipeline.getLast().getMinSequenceId();
+ minSequenceId = readOnlyCopy.getLast().getMinSequenceId();
}
return minSequenceId;
}
public MemstoreSize getTailSize() {
+ LinkedList<? extends Segment> localCopy = readOnlyCopy;
if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
- return new MemstoreSize(pipeline.peekLast().keySize(),
pipeline.peekLast().heapOverhead());
+ return new MemstoreSize(localCopy.peekLast().keySize(),
localCopy.peekLast().heapOverhead());
}
- private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment
segment,
+ private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment
segment,
boolean closeSegmentsInSuffix) {
version++;
// During index merge we won't be closing the segments undergoing the
merge. Segment#close()
--
1.9.1
{code}
> New Synchronization Scheme for Compaction Pipeline
> --------------------------------------------------
>
> Key: HBASE-17434
> URL: https://issues.apache.org/jira/browse/HBASE-17434
> Project: HBase
> Issue Type: Bug
> Reporter: Eshcar Hillel
> Assignee: Eshcar Hillel
> Attachments: HBASE-17434-V01.patch
>
>
> A new copyOnWrite synchronization scheme is introduced for the compaction
> pipeline.
> The new scheme is better since it removes the lock from getSegments() which
> is invoked in every get and scan operation, and it reduces the number of
> LinkedList objects that are created at runtime, thus can reduce GC (not by
> much, but still...).
> In addition, it fixes the method getTailSize() in compaction pipeline. This
> method creates a MemstoreSize object which comprises the data size and the
> overhead size of the segment and needs to be atomic.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)