[ 
https://issues.apache.org/jira/browse/HBASE-17434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15808912#comment-15808912
 ] 

Manjunath Anand commented on HBASE-17434:
-----------------------------------------

If you are fine with the above discussion and if you can give me contributor 
access then I can send the modified patch file. Anyways for now I have added it 
along with this comment:-

{code}
>From 9c40770283175a293a168da92450b7d09aed363c Mon Sep 17 00:00:00 2001
From: Manjunath Anand <[email protected]>
Date: Sun, 8 Jan 2017 12:06:14 +0530
Subject: [PATCH] Use read write lock as new synchronization scheme for
 compaction pipeline

---
 .../hbase/regionserver/CompactingMemStore.java     |  6 +--
 .../hbase/regionserver/CompactionPipeline.java     | 55 +++++++++++++++-------
 2 files changed, 41 insertions(+), 20 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index e1289f8..99c1685 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore {
   @VisibleForTesting
   @Override
   protected List<Segment> getSegments() {
-    List<Segment> pipelineList = pipeline.getSegments();
-    List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
+    List<? extends Segment> pipelineList = pipeline.getSegments();
+    List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
     list.add(this.active);
     list.addAll(pipelineList);
     list.add(this.snapshot);
@@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore {
    * Scanners are ordered from 0 (oldest) to newest in increasing order.
    */
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
-    List<Segment> pipelineList = pipeline.getSegments();
+    List<? extends Segment> pipelineList = pipeline.getSegments();
     long order = pipelineList.size();
     // The list of elements in pipeline + the active element + the snapshot 
segment
     // TODO : This will change when the snapshot is made of more than one 
element
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 9d5df77..132e8d6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,36 +50,49 @@ public class CompactionPipeline {
 
   private final RegionServicesForStores region;
   private LinkedList<ImmutableSegment> pipeline;
+  private volatile LinkedList<ImmutableSegment> readOnlyCopy;
   private long version;
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   public CompactionPipeline(RegionServicesForStores region) {
     this.region = region;
     this.pipeline = new LinkedList<>();
+    this.readOnlyCopy = new LinkedList<>();
     this.version = 0;
   }
 
   public boolean pushHead(MutableSegment segment) {
     ImmutableSegment immutableSegment = SegmentFactory.instance().
         createImmutableSegment(segment);
-    synchronized (pipeline){
-      return addFirst(immutableSegment);
+    lock.writeLock().lock();
+    try {
+      boolean res = addFirst(immutableSegment);
+      readOnlyCopy = new LinkedList<>(pipeline);
+      return res;
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
   public VersionedSegmentsList getVersionedList() {
-    synchronized (pipeline){
-      List<ImmutableSegment> segmentList = new ArrayList<>(pipeline);
-      return new VersionedSegmentsList(segmentList, version);
+    lock.readLock().lock();
+    try {
+      return new VersionedSegmentsList(readOnlyCopy, version);
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
   public VersionedSegmentsList getVersionedTail() {
-    synchronized (pipeline){
+    lock.readLock().lock();
+    try {
       List<ImmutableSegment> segmentList = new ArrayList<>();
       if(!pipeline.isEmpty()) {
         segmentList.add(0, pipeline.getLast());
       }
       return new VersionedSegmentsList(segmentList, version);
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
@@ -99,7 +113,9 @@ public class CompactionPipeline {
       return false;
     }
     List<ImmutableSegment> suffix;
-    synchronized (pipeline){
+    // A write lock as pipeline is modified
+    lock.writeLock().lock();
+    try {
       if(versionedList.getVersion() != version) {
         return false;
       }
@@ -115,6 +131,9 @@ public class CompactionPipeline {
             + ", and the number of cells in new segment is:" + count);
       }
       swapSuffix(suffix, segment, closeSuffix);
+      readOnlyCopy = new LinkedList<>(pipeline);
+    } finally {
+      lock.writeLock().unlock();
     }
     if (closeSuffix && region != null) {
       // update the global memstore size counter
@@ -168,7 +187,8 @@ public class CompactionPipeline {
       return false;
     }
 
-    synchronized (pipeline){
+    lock.readLock().lock();
+    try {
       if(requesterVersion != version) {
         LOG.warn("Segment flattening failed, because versions do not match");
         return false;
@@ -186,6 +206,8 @@ public class CompactionPipeline {
         }
       }
 
+    } finally {
+      lock.readLock().unlock();
     }
     // do not update the global memstore size counter and do not increase the 
version,
     // because all the cells remain in place
@@ -193,33 +215,32 @@ public class CompactionPipeline {
   }
 
   public boolean isEmpty() {
-    return pipeline.isEmpty();
+    return readOnlyCopy.isEmpty();
   }
 
-  public List<Segment> getSegments() {
-    synchronized (pipeline){
-      return new LinkedList<>(pipeline);
-    }
+  public List<? extends Segment> getSegments() {
+    return readOnlyCopy;
   }
 
   public long size() {
-    return pipeline.size();
+    return readOnlyCopy.size();
   }
 
   public long getMinSequenceId() {
     long minSequenceId = Long.MAX_VALUE;
     if (!isEmpty()) {
-      minSequenceId = pipeline.getLast().getMinSequenceId();
+      minSequenceId = readOnlyCopy.getLast().getMinSequenceId();
     }
     return minSequenceId;
   }
 
   public MemstoreSize getTailSize() {
+    LinkedList<? extends Segment> localCopy = readOnlyCopy;
     if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
-    return new MemstoreSize(pipeline.peekLast().keySize(), 
pipeline.peekLast().heapOverhead());
+    return new MemstoreSize(localCopy.peekLast().keySize(), 
localCopy.peekLast().heapOverhead());
   }
 
-  private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment 
segment,
+  private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment 
segment,
       boolean closeSegmentsInSuffix) {
     version++;
     // During index merge we won't be closing the segments undergoing the 
merge. Segment#close()
-- 
1.9.1


{code}

> New Synchronization Scheme for Compaction Pipeline
> --------------------------------------------------
>
>                 Key: HBASE-17434
>                 URL: https://issues.apache.org/jira/browse/HBASE-17434
>             Project: HBase
>          Issue Type: Bug
>            Reporter: Eshcar Hillel
>            Assignee: Eshcar Hillel
>         Attachments: HBASE-17434-V01.patch
>
>
> A new copyOnWrite synchronization scheme is introduced for the compaction 
> pipeline.
> The new scheme is better since it removes the lock from getSegments() which 
> is invoked in every get and scan operation, and it reduces the number of 
> LinkedList objects that are created at runtime, thus can reduce GC (not by 
> much, but still...).
> In addition, it fixes the method getTailSize() in compaction pipeline. This 
> method creates a MemstoreSize object which comprises the data size and the 
> overhead size of the segment and needs to be atomic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to