Repository: hbase Updated Branches: refs/heads/branch-1.0 30eb2fb26 -> 6c555d36c
HBASE-15213 Fix increment performance regression caused by HBASE-8763 on branch-1.0 (Junegunn Choi) Signed-off-by: stack <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6c555d36 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6c555d36 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6c555d36 Branch: refs/heads/branch-1.0 Commit: 6c555d36cd9928e44281b3280e57dd5f98b63fc8 Parents: 30eb2fb Author: stack <[email protected]> Authored: Fri Feb 5 13:28:16 2016 -0800 Committer: stack <[email protected]> Committed: Fri Feb 5 18:03:28 2016 -0800 ---------------------------------------------------------------------- .../MultiVersionConsistencyControl.java | 31 +++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6c555d36/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index fffd7c0..8b9f41b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.LinkedList; +import java.util.LinkedHashSet; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -39,8 +39,8 @@ public class MultiVersionConsistencyControl { private final Object readWaiters = new Object(); // This is the pending queue of writes. - private final LinkedList<WriteEntry> writeQueue = - new LinkedList<WriteEntry>(); + private final LinkedHashSet<WriteEntry> writeQueue = + new LinkedHashSet<WriteEntry>(); /** * Default constructor. Initializes the memstoreRead/Write points to 0. @@ -100,7 +100,14 @@ public class MultiVersionConsistencyControl { * @return WriteEntry a WriteEntry instance with the passed in curSeqNum */ public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { + return beginMemstoreInsertWithSeqNum(curSeqNum, false); + } + + private WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum, boolean complete) { WriteEntry e = new WriteEntry(curSeqNum); + if (complete) { + e.markCompleted(); + } synchronized (writeQueue) { writeQueue.add(e); return e; @@ -153,11 +160,11 @@ public class MultiVersionConsistencyControl { e.markCompleted(); while (!writeQueue.isEmpty()) { - WriteEntry queueFirst = writeQueue.getFirst(); + WriteEntry queueFirst = writeQueue.iterator().next(); if (queueFirst.isCompleted()) { // Using Max because Edit complete in WAL sync order not arriving order nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber()); - writeQueue.removeFirst(); + writeQueue.remove(queueFirst); } else { break; } @@ -199,27 +206,31 @@ public class MultiVersionConsistencyControl { * Wait for all previous MVCC transactions complete */ public void waitForPreviousTransactionsComplete() { - WriteEntry w = beginMemstoreInsert(); + WriteEntry w = beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER, true); waitForPreviousTransactionsComplete(w); } public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { boolean interrupted = false; WriteEntry w = waitedEntry; + w.markCompleted(); try { WriteEntry firstEntry = null; do { synchronized (writeQueue) { - // writeQueue won't be empty at this point, the following is just a safety check if (writeQueue.isEmpty()) { break; } - firstEntry = writeQueue.getFirst(); + firstEntry = writeQueue.iterator().next(); if (firstEntry == w) { // all previous in-flight transactions are done break; } + // WriteEntry already was removed from the queue by another handler + if (!writeQueue.contains(w)) { + break; + } try { writeQueue.wait(0); } catch (InterruptedException ie) { @@ -231,9 +242,7 @@ public class MultiVersionConsistencyControl { } } while (firstEntry != null); } finally { - if (w != null) { - advanceMemstore(w); - } + advanceMemstore(w); } if (interrupted) { Thread.currentThread().interrupt();
