HBASE-20542: Better heap utilization for IMC with MSLABs

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d822ee3a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d822ee3a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d822ee3a

Branch: refs/heads/HBASE-18477
Commit: d822ee3a7ccc4959ed5a4b85bb54ff6142aa7d6e
Parents: 112d050
Author: eshcar <esh...@oath.com>
Authored: Sun Jul 1 15:29:26 2018 +0300
Committer: eshcar <esh...@oath.com>
Committed: Sun Jul 1 15:31:31 2018 +0300

----------------------------------------------------------------------
 .../hbase/regionserver/AbstractMemStore.java    | 145 ++++++++-----
 .../regionserver/CellArrayImmutableSegment.java |   5 +-
 .../regionserver/CellChunkImmutableSegment.java |  17 +-
 .../hbase/regionserver/CompactingMemStore.java  | 215 +++++++++++--------
 .../hbase/regionserver/CompactionPipeline.java  |  16 +-
 .../regionserver/CompositeImmutableSegment.java |   5 +-
 .../hbase/regionserver/DefaultMemStore.java     |  40 ++--
 .../hbase/regionserver/MemStoreCompactor.java   |  12 +-
 .../hbase/regionserver/MemStoreLABImpl.java     |   6 +-
 .../hbase/regionserver/MemStoreSizing.java      |   8 +-
 .../hbase/regionserver/MutableSegment.java      |  25 ++-
 .../NonThreadSafeMemStoreSizing.java            |  10 +-
 .../regionserver/RegionServicesForStores.java   |   8 -
 .../hadoop/hbase/regionserver/Segment.java      |  72 +++++--
 .../regionserver/ThreadSafeMemStoreSizing.java  |   4 +
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  12 ++
 .../regionserver/TestCompactingMemStore.java    |   4 +-
 .../TestCompactingToCellFlatMapMemStore.java    |  50 +++--
 .../hadoop/hbase/regionserver/TestHStore.java   |  13 +-
 19 files changed, 418 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index b82afba..a7a1af8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -47,7 +47,7 @@ public abstract class AbstractMemStore implements MemStore {
   private final CellComparator comparator;
 
   // active segment absorbs write operations
-  protected volatile MutableSegment active;
+  private volatile MutableSegment active;
   // Snapshot of memstore.  Made for flusher.
   protected volatile ImmutableSegment snapshot;
   protected volatile long snapshotId;
@@ -82,8 +82,8 @@ public abstract class AbstractMemStore implements MemStore {
 
   protected void resetActive() {
     // Reset heap to not include any keys
-    this.active = SegmentFactory.instance().createMutableSegment(conf, 
comparator);
-    this.timeOfOldestEdit = Long.MAX_VALUE;
+    active = SegmentFactory.instance().createMutableSegment(conf, comparator);
+    timeOfOldestEdit = Long.MAX_VALUE;
   }
 
   /**
@@ -102,12 +102,52 @@ public abstract class AbstractMemStore implements 
MemStore {
 
   @Override
   public void add(Cell cell, MemStoreSizing memstoreSizing) {
-    Cell toAdd = maybeCloneWithAllocator(cell, false);
+    doAddOrUpsert(cell, 0, memstoreSizing, true);  }
+
+  /*
+   * Inserts the specified Cell into MemStore and deletes any existing
+   * versions of the same row/family/qualifier as the specified Cell.
+   * <p>
+   * First, the specified Cell is inserted into the Memstore.
+   * <p>
+   * If there are any existing Cell in this MemStore with the same row,
+   * family, and qualifier, they are removed.
+   * <p>
+   * Callers must hold the read lock.
+   *
+   * @param cell the cell to be updated
+   * @param readpoint readpoint below which we can safely remove duplicate KVs
+   * @param memstoreSizing object to accumulate changed size
+   */
+  private void upsert(Cell cell, long readpoint, MemStoreSizing 
memstoreSizing) {
+    doAddOrUpsert(cell, readpoint, memstoreSizing, false);
+  }
+
+  private void doAddOrUpsert(Cell cell, long readpoint, MemStoreSizing 
memstoreSizing, boolean
+      doAdd) {
+    MutableSegment currentActive;
+    boolean succ = false;
+    while (!succ) {
+      currentActive = getActive();
+      succ = preUpdate(currentActive, cell, memstoreSizing);
+      if (succ) {
+        if(doAdd) {
+          doAdd(currentActive, cell, memstoreSizing);
+        } else {
+          doUpsert(currentActive, cell, readpoint, memstoreSizing);
+        }
+        postUpdate(currentActive);
+      }
+    }
+  }
+
+  private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing 
memstoreSizing) {
+    Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
     boolean mslabUsed = (toAdd != cell);
-    // This cell data is backed by the same byte[] where we read request in 
RPC(See HBASE-15180). By
-    // default MSLAB is ON and we might have copied cell to MSLAB area. If not 
we must do below deep
-    // copy. Or else we will keep referring to the bigger chunk of memory and 
prevent it from
-    // getting GCed.
+    // This cell data is backed by the same byte[] where we read request in 
RPC(See
+    // HBASE-15180). By default MSLAB is ON and we might have copied cell to 
MSLAB area. If
+    // not we must do below deep copy. Or else we will keep referring to the 
bigger chunk of
+    // memory and prevent it from getting GCed.
     // Copy to MSLAB would not have happened if
     // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
     // 2. When the size of the cell is bigger than the max size supported by 
MSLAB. See
@@ -116,9 +156,42 @@ public abstract class AbstractMemStore implements MemStore 
{
     if (!mslabUsed) {
       toAdd = deepCopyIfNeeded(toAdd);
     }
-    internalAdd(toAdd, mslabUsed, memstoreSizing);
+    internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
+  }
+
+  private void doUpsert(MutableSegment currentActive, Cell cell, long 
readpoint, MemStoreSizing
+      memstoreSizing) {
+    // Add the Cell to the MemStore
+    // Use the internalAdd method here since we (a) already have a lock
+    // and (b) cannot safely use the MSLAB here without potentially
+    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
+    // test that triggers the pathological case if we don't avoid MSLAB
+    // here.
+    // This cell data is backed by the same byte[] where we read request in 
RPC(See
+    // HBASE-15180). We must do below deep copy. Or else we will keep 
referring to the bigger
+    // chunk of memory and prevent it from getting GCed.
+    cell = deepCopyIfNeeded(cell);
+    boolean sizeAddedPreOperation = sizeAddedPreOperation();
+    currentActive.upsert(cell, readpoint, memstoreSizing, 
sizeAddedPreOperation);
+    setOldestEditTimeToNow();
   }
 
+    /**
+     * Issue any synchronization and test needed before applying the update
+     * @param currentActive the segment to be updated
+     * @param cell the cell to be added
+     * @param memstoreSizing object to accumulate region size changes
+     * @return true iff can proceed with applying the update
+     */
+  protected abstract boolean preUpdate(MutableSegment currentActive, Cell cell,
+      MemStoreSizing memstoreSizing);
+
+  /**
+   * Issue any post update synchronization and tests
+   * @param currentActive updated segment
+   */
+  protected abstract void postUpdate(MutableSegment currentActive);
+
   private static Cell deepCopyIfNeeded(Cell cell) {
     if (cell instanceof ExtendedCell) {
       return ((ExtendedCell) cell).deepClone();
@@ -188,43 +261,12 @@ public abstract class AbstractMemStore implements 
MemStore {
   }
 
   protected void dump(Logger log) {
-    active.dump(log);
+    getActive().dump(log);
     snapshot.dump(log);
   }
 
 
   /*
-   * Inserts the specified Cell into MemStore and deletes any existing
-   * versions of the same row/family/qualifier as the specified Cell.
-   * <p>
-   * First, the specified Cell is inserted into the Memstore.
-   * <p>
-   * If there are any existing Cell in this MemStore with the same row,
-   * family, and qualifier, they are removed.
-   * <p>
-   * Callers must hold the read lock.
-   *
-   * @param cell the cell to be updated
-   * @param readpoint readpoint below which we can safely remove duplicate KVs
-   * @param memstoreSize
-   */
-  private void upsert(Cell cell, long readpoint, MemStoreSizing 
memstoreSizing) {
-    // Add the Cell to the MemStore
-    // Use the internalAdd method here since we (a) already have a lock
-    // and (b) cannot safely use the MSLAB here without potentially
-    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
-    // test that triggers the pathological case if we don't avoid MSLAB
-    // here.
-    // This cell data is backed by the same byte[] where we read request in 
RPC(See HBASE-15180). We
-    // must do below deep copy. Or else we will keep referring to the bigger 
chunk of memory and
-    // prevent it from getting GCed.
-    cell = deepCopyIfNeeded(cell);
-    this.active.upsert(cell, readpoint, memstoreSizing);
-    setOldestEditTimeToNow();
-    checkActiveSize();
-  }
-
-  /*
    * @param a
    * @param b
    * @return Return lowest of a or b or null if both a and b are null
@@ -275,8 +317,9 @@ public abstract class AbstractMemStore implements MemStore {
    * @param forceCloneOfBigCell true only during the process of flattening to 
CellChunkMap.
    * @return either the given cell or its clone
    */
-  private Cell maybeCloneWithAllocator(Cell cell, boolean forceCloneOfBigCell) 
{
-    return active.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
+  private Cell maybeCloneWithAllocator(MutableSegment currentActive, Cell 
cell, boolean
+      forceCloneOfBigCell) {
+    return currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell);
   }
 
   /*
@@ -286,14 +329,17 @@ public abstract class AbstractMemStore implements 
MemStore {
    * Callers should ensure they already have the read lock taken
    * @param toAdd the cell to add
    * @param mslabUsed whether using MSLAB
-   * @param memstoreSize
+   * @param memstoreSizing object to accumulate changed size
    */
-  private void internalAdd(final Cell toAdd, final boolean mslabUsed, 
MemStoreSizing memstoreSizing) {
-    active.add(toAdd, mslabUsed, memstoreSizing);
+  private void internalAdd(MutableSegment currentActive, final Cell toAdd, 
final boolean
+      mslabUsed, MemStoreSizing memstoreSizing) {
+    boolean sizeAddedPreOperation = sizeAddedPreOperation();
+    currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation);
     setOldestEditTimeToNow();
-    checkActiveSize();
   }
 
+  protected abstract boolean sizeAddedPreOperation();
+
   private void setOldestEditTimeToNow() {
     if (timeOfOldestEdit == Long.MAX_VALUE) {
       timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
@@ -326,11 +372,6 @@ public abstract class AbstractMemStore implements MemStore 
{
   }
 
   /**
-   * Check whether anything need to be done based on the current active set 
size
-   */
-  protected abstract void checkActiveSize();
-
-  /**
    * @return an ordered list of segments from most recent to oldest in memstore
    */
   protected abstract List<Segment> getSegments() throws IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
index dadfc48..2f02555 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
@@ -55,11 +55,12 @@ public class CellArrayImmutableSegment extends 
ImmutableSegment {
    * of CSLMImmutableSegment
    * The given iterator returns the Cells that "survived" the compaction.
    */
-  protected CellArrayImmutableSegment(CSLMImmutableSegment segment, 
MemStoreSizing memstoreSizing,
+  protected CellArrayImmutableSegment(CSLMImmutableSegment segment, 
MemStoreSizing mss,
       MemStoreCompactionStrategy.Action action) {
     super(segment); // initiailize the upper class
     long indexOverhead = DEEP_OVERHEAD_CAM - 
CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
     incMemStoreSize(0, indexOverhead, 0); // CAM is always on-heap
+    mss.incMemStoreSize(0, indexOverhead, 0);
     int numOfCells = segment.getCellsCount();
     // build the new CellSet based on CellChunkMap and update the CellSet of 
this Segment
     reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), 
segment.getCellSet(),
@@ -68,7 +69,7 @@ public class CellArrayImmutableSegment extends 
ImmutableSegment {
     // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the 
care for the sizes)
     long newSegmentSizeDelta = 
numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
     incMemStoreSize(0, newSegmentSizeDelta, 0);
-    memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta, 0);
+    mss.incMemStoreSize(0, newSegmentSizeDelta, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
index e2f8205..eed97fa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
@@ -80,13 +80,16 @@ public class CellChunkImmutableSegment extends 
ImmutableSegment {
     // initiate the heapSize with the size of the segment metadata
     if(onHeap) {
       incMemStoreSize(0, indexOverhead, 0);
+      memstoreSizing.incMemStoreSize(0, indexOverhead, 0);
     } else {
       incMemStoreSize(0, -CSLMImmutableSegment.DEEP_OVERHEAD_CSLM, 
DEEP_OVERHEAD_CCM);
+      memstoreSizing.incMemStoreSize(0, 
-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM,
+          DEEP_OVERHEAD_CCM);
     }
     int numOfCells = segment.getCellsCount();
     // build the new CellSet based on CellChunkMap
     reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), 
segment.getCellSet(),
-        action);
+        memstoreSizing, action);
     // arrange the meta-data size, decrease all meta-data sizes related to 
SkipList;
     // add sizes of CellChunkMap entry, decrease also Cell object sizes
     // (reinitializeCellSet doesn't take the care for the sizes)
@@ -150,7 +153,7 @@ public class CellChunkImmutableSegment extends 
ImmutableSegment {
         // CellChunkMap assumes all cells are allocated on MSLAB.
         // Therefore, cells which are not allocated on MSLAB initially,
         // are copied into MSLAB here.
-        c = copyCellIntoMSLAB(c);
+        c = copyCellIntoMSLAB(c, null); //no memstore sizing object to update
         alreadyCopied = true;
       }
       if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > 
chunks[currentChunkIdx].size) {
@@ -197,7 +200,7 @@ public class CellChunkImmutableSegment extends 
ImmutableSegment {
   // This is a service for not-flat immutable segments
   private void reinitializeCellSet(
       int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet,
-      MemStoreCompactionStrategy.Action action) {
+      MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) 
{
     Cell curCell;
     Chunk[] chunks = allocIndexChunks(numOfCells);
 
@@ -213,7 +216,7 @@ public class CellChunkImmutableSegment extends 
ImmutableSegment {
           // CellChunkMap assumes all cells are allocated on MSLAB.
           // Therefore, cells which are not allocated on MSLAB initially,
           // are copied into MSLAB here.
-          curCell = copyCellIntoMSLAB(curCell);
+          curCell = copyCellIntoMSLAB(curCell, memstoreSizing);
         }
         if (offsetInCurentChunk + ClassSize.CELL_CHUNK_MAP_ENTRY > 
chunks[currentChunkIdx].size) {
           // continue to the next metadata chunk
@@ -315,7 +318,7 @@ public class CellChunkImmutableSegment extends 
ImmutableSegment {
     return chunks;
   }
 
-  private Cell copyCellIntoMSLAB(Cell cell) {
+  private Cell copyCellIntoMSLAB(Cell cell, MemStoreSizing memstoreSizing) {
     // Take care for a special case when a cell is copied from on-heap to 
(probably off-heap) MSLAB.
     // The cell allocated as an on-heap JVM object (byte array) occupies 
slightly different
     // amount of memory, than when the cell serialized and allocated on the 
MSLAB.
@@ -332,8 +335,10 @@ public class CellChunkImmutableSegment extends 
ImmutableSegment {
     long newCellSize = getCellLength(cell);
     long heapOverhead = newHeapSize - oldHeapSize;
     long offHeapOverhead = newOffHeapSize - oldOffHeapSize;
-    //TODO: maybe need to update the dataSize of the region
     incMemStoreSize(newCellSize - oldCellSize, heapOverhead, offHeapOverhead);
+    if(memstoreSizing != null) {
+      memstoreSizing.incMemStoreSize(newCellSize - oldCellSize, heapOverhead, 
offHeapOverhead);
+    }
     return cell;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 3886e7d..157441d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -62,7 +62,7 @@ public class CompactingMemStore extends AbstractMemStore {
   // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
   public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
       "hbase.memstore.inmemoryflush.threshold.factor";
-  private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014;
+  private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactingMemStore.class);
   private HStore store;
@@ -71,7 +71,7 @@ public class CompactingMemStore extends AbstractMemStore {
   protected MemStoreCompactor compactor;
 
   private long inmemoryFlushSize;       // the threshold on active size for 
in-memory flush
-  private final AtomicBoolean inMemoryFlushInProgress = new 
AtomicBoolean(false);
+  private final AtomicBoolean inMemoryCompactionInProgress = new 
AtomicBoolean(false);
 
   // inWalReplay is true while we are synchronously replaying the edits from 
WAL
   private boolean inWalReplay = false;
@@ -94,11 +94,11 @@ public class CompactingMemStore extends AbstractMemStore {
 
   public static final long DEEP_OVERHEAD = ClassSize.align( 
AbstractMemStore.DEEP_OVERHEAD
       + 7 * ClassSize.REFERENCE     // Store, RegionServicesForStores, 
CompactionPipeline,
-                                    // MemStoreCompactor, 
inMemoryFlushInProgress, allowCompaction,
-                                    // indexType
+      // MemStoreCompactor, inMemoryCompactionInProgress,
+      // allowCompaction, indexType
       + Bytes.SIZEOF_LONG           // inmemoryFlushSize
       + 2 * Bytes.SIZEOF_BOOLEAN    // compositeSnapshot and inWalReplay
-      + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and 
allowCompaction
+      + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and 
allowCompaction
       + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
 
   public CompactingMemStore(Configuration conf, CellComparator c,
@@ -139,12 +139,15 @@ public class CompactingMemStore extends AbstractMemStore {
       // Family number might also be zero in some of our unit test case
       numStores = 1;
     }
-    inmemoryFlushSize = memstoreFlushSize / numStores;
-    // multiply by a factor (the same factor for all index types)
-    factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
-          IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT);
-
-    inmemoryFlushSize = (long) (inmemoryFlushSize * factor);
+    factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0);
+    if(factor != 0.0) {
+      // multiply by a factor (the same factor for all index types)
+      inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores;
+    } else {
+      inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER *
+          conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, 
MemStoreLAB.CHUNK_SIZE_DEFAULT);
+      inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER;
+    }
   }
 
   /**
@@ -156,7 +159,7 @@ public class CompactingMemStore extends AbstractMemStore {
   @Override
   public MemStoreSize size() {
     MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
-    memstoreSizing.incMemStoreSize(active.getMemStoreSize());
+    memstoreSizing.incMemStoreSize(getActive().getMemStoreSize());
     for (Segment item : pipeline.getSegments()) {
       memstoreSizing.incMemStoreSize(item.getMemStoreSize());
     }
@@ -201,9 +204,11 @@ public class CompactingMemStore extends AbstractMemStore {
           "Doing nothing. Another ongoing flush or did we fail last attempt?");
     } else {
       LOG.debug("FLUSHING TO DISK {}, store={}",
-            getRegionServices().getRegionInfo().getEncodedName(), 
getFamilyName());
+          getRegionServices().getRegionInfo().getEncodedName(), 
getFamilyName());
       stopCompaction();
-      pushActiveToPipeline(this.active);
+      // region level lock ensures pushing active to pipeline is done in 
isolation
+      // no concurrent update operations trying to flush the active segment
+      pushActiveToPipeline(getActive());
       snapshotId = EnvironmentEdgeManager.currentTime();
       // in both cases whatever is pushed to snapshot is cleared from the 
pipeline
       if (compositeSnapshot) {
@@ -223,19 +228,22 @@ public class CompactingMemStore extends AbstractMemStore {
       // if snapshot is empty the tail of the pipeline (or everything in the 
memstore) is flushed
       if (compositeSnapshot) {
         MemStoreSizing memStoreSizing = new 
NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
-        memStoreSizing.incMemStoreSize(this.active.getMemStoreSize());
+        MutableSegment currActive = getActive();
+        if(!currActive.isEmpty()) {
+          memStoreSizing.incMemStoreSize(currActive.getMemStoreSize());
+        }
         mss = memStoreSizing.getMemStoreSize();
       } else {
         mss = pipeline.getTailSize();
       }
     }
-    return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
+    return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
   }
 
   @Override
   protected long keySize() {
     // Need to consider dataSize/keySize of all segments in pipeline and active
-    long keySize = this.active.getDataSize();
+    long keySize = getActive().getDataSize();
     for (Segment segment : this.pipeline.getSegments()) {
       keySize += segment.getDataSize();
     }
@@ -245,7 +253,7 @@ public class CompactingMemStore extends AbstractMemStore {
   @Override
   protected long heapSize() {
     // Need to consider heapOverhead of all segments in pipeline and active
-    long h = this.active.getHeapSize();
+    long h = getActive().getHeapSize();
     for (Segment segment : this.pipeline.getSegments()) {
       h += segment.getHeapSize();
     }
@@ -283,15 +291,43 @@ public class CompactingMemStore extends AbstractMemStore {
     inWalReplay = false;
   }
 
+  /**
+   * Issue any synchronization and test needed before applying the update
+   * For compacting memstore this means checking the update can increase the 
size without
+   * overflow
+   * @param currentActive the segment to be updated
+   * @param cell the cell to be added
+   * @param memstoreSizing object to accumulate region size changes
+   * @return true iff can proceed with applying the update
+   */
+  @Override protected boolean preUpdate(MutableSegment currentActive, Cell 
cell,
+      MemStoreSizing memstoreSizing) {
+    if(currentActive.sharedLock()) {
+      if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) {
+        return true;
+      }
+      currentActive.sharedUnlock();
+    }
+    return false;
+  }
+
+  @Override protected void postUpdate(MutableSegment currentActive) {
+    currentActive.sharedUnlock();
+  }
+
+  @Override protected boolean sizeAddedPreOperation() {
+    return true;
+  }
+
   // the getSegments() method is used for tests only
   @VisibleForTesting
   @Override
   protected List<Segment> getSegments() {
     List<? extends Segment> pipelineList = pipeline.getSegments();
     List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
-    list.add(this.active);
+    list.add(getActive());
     list.addAll(pipelineList);
-    list.addAll(this.snapshot.getAllSegments());
+    list.addAll(snapshot.getAllSegments());
 
     return list;
   }
@@ -351,7 +387,7 @@ public class CompactingMemStore extends AbstractMemStore {
 
   @Override
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
-    MutableSegment activeTmp = active;
+    MutableSegment activeTmp = getActive();
     List<? extends Segment> pipelineList = pipeline.getSegments();
     List<? extends Segment> snapshotList = snapshot.getAllSegments();
     long numberOfSegments = 1L + pipelineList.size() + snapshotList.size();
@@ -363,56 +399,67 @@ public class CompactingMemStore extends AbstractMemStore {
     return list;
   }
 
-   @VisibleForTesting
-   protected List<KeyValueScanner> createList(int capacity) {
-     return new ArrayList<>(capacity);
-   }
+  @VisibleForTesting
+  protected List<KeyValueScanner> createList(int capacity) {
+    return new ArrayList<>(capacity);
+  }
 
   /**
    * Check whether anything need to be done based on the current active set 
size.
    * The method is invoked upon every addition to the active set.
    * For CompactingMemStore, flush the active set to the read-only memory if 
it's
    * size is above threshold
+   * @param currActive intended segment to update
+   * @param cellToAdd cell to be added to the segment
+   * @param memstoreSizing object to accumulate changed size
+   * @return true if the cell can be added to the
    */
-  @Override
-  protected void checkActiveSize() {
-    if (shouldFlushInMemory()) {
-      /* The thread is dispatched to flush-in-memory. This cannot be done
-      * on the same thread, because for flush-in-memory we require updatesLock
-      * in exclusive mode while this method (checkActiveSize) is invoked 
holding updatesLock
-      * in the shared mode. */
-      InMemoryFlushRunnable runnable = new InMemoryFlushRunnable();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-          "Dispatching the MemStore in-memory flush for store " + 
store.getColumnFamilyName());
+  private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell 
cellToAdd,
+      MemStoreSizing memstoreSizing) {
+    if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
+      if (currActive.setInMemoryFlushed()) {
+        flushInMemory(currActive);
+        if (inMemoryCompactionInProgress.compareAndSet(false, true)) {
+          // The thread is dispatched to do in-memory compaction in the 
background
+          InMemoryCompactionRunnable runnable = new 
InMemoryCompactionRunnable();
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Dispatching the MemStore in-memory flush for store " + 
store
+                .getColumnFamilyName());
+          }
+          getPool().execute(runnable);
+        }
       }
-      getPool().execute(runnable);
+      return false;
     }
-  }
+    return true;
+ }
 
-  // internally used method, externally visible only for tests
+  // externally visible only for tests
   // when invoked directly from tests it must be verified that the caller 
doesn't hold updatesLock,
   // otherwise there is a deadlock
   @VisibleForTesting
-  void flushInMemory() throws IOException {
-    // setting the inMemoryFlushInProgress flag again for the case this method 
is invoked
+  void flushInMemory() {
+    MutableSegment currActive = getActive();
+    if(currActive.setInMemoryFlushed()) {
+      flushInMemory(currActive);
+    }
+    inMemoryCompaction();
+  }
+
+  private void flushInMemory(MutableSegment currActive) {
+    LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction 
pipeline");
+    pushActiveToPipeline(currActive);
+  }
+
+  void inMemoryCompaction() {
+    // setting the inMemoryCompactionInProgress flag again for the case this 
method is invoked
     // directly (only in tests) in the common path setting from true to true 
is idempotent
-    inMemoryFlushInProgress.set(true);
+    inMemoryCompactionInProgress.set(true);
     try {
-      // Phase I: Update the pipeline
-      getRegionServices().blockUpdates();
-      try {
-        LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction 
pipeline");
-        pushActiveToPipeline(this.active);
-      } finally {
-        getRegionServices().unblockUpdates();
-      }
-
       // Used by tests
       if (!allowCompaction.get()) {
         return;
       }
-      // Phase II: Compact the pipeline
       try {
         // Speculative compaction execution, may be interrupted if flush is 
forced while
         // compaction is in progress
@@ -422,8 +469,7 @@ public class CompactingMemStore extends AbstractMemStore {
             getRegionServices().getRegionInfo().getEncodedName(), 
getFamilyName(), e);
       }
     } finally {
-      inMemoryFlushInProgress.set(false);
-      LOG.trace("IN-MEMORY FLUSH: end");
+      inMemoryCompactionInProgress.set(false);
     }
   }
 
@@ -442,16 +488,24 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   @VisibleForTesting
-  protected boolean shouldFlushInMemory() {
-    if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush 
threshold
-      if (inWalReplay) {  // when replaying edits from WAL there is no need in 
in-memory flush
-        return false;     // regardless the size
+  protected boolean shouldFlushInMemory(MutableSegment currActive, Cell 
cellToAdd,
+      MemStoreSizing memstoreSizing) {
+    long cellSize = currActive.getCellLength(cellToAdd);
+    long segmentDataSize = currActive.getDataSize();
+    while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
+      // when replaying edits from WAL there is no need in in-memory flush 
regardless the size
+      // otherwise size below flush threshold try to update atomically
+      if(currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + 
cellSize)) {
+        if(memstoreSizing != null){
+          memstoreSizing.incMemStoreSize(cellSize, 0, 0);
+        }
+        //enough space for cell - no need to flush
+        return false;
       }
-      // the inMemoryFlushInProgress is CASed to be true here in order to 
mutual exclude
-      // the insert of the active into the compaction pipeline
-      return (inMemoryFlushInProgress.compareAndSet(false,true));
+      segmentDataSize = currActive.getDataSize();
     }
-    return false;
+    // size above flush threshold
+    return true;
   }
 
   /**
@@ -460,14 +514,14 @@ public class CompactingMemStore extends AbstractMemStore {
    * Non-blocking request
    */
   private void stopCompaction() {
-    if (inMemoryFlushInProgress.get()) {
+    if (inMemoryCompactionInProgress.get()) {
       compactor.stop();
     }
   }
 
-  protected void pushActiveToPipeline(MutableSegment active) {
-    if (!active.isEmpty()) {
-      pipeline.pushHead(active);
+  protected void pushActiveToPipeline(MutableSegment currActive) {
+    if (!currActive.isEmpty()) {
+      pipeline.pushHead(currActive);
       resetActive();
     }
   }
@@ -518,28 +572,21 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   /**
-  * The in-memory-flusher thread performs the flush asynchronously.
-  * There is at most one thread per memstore instance.
-  * It takes the updatesLock exclusively, pushes active into the pipeline, 
releases updatesLock
-  * and compacts the pipeline.
-  */
-  private class InMemoryFlushRunnable implements Runnable {
-
+   * The in-memory-flusher thread performs the flush asynchronously.
+   * There is at most one thread per memstore instance.
+   * It takes the updatesLock exclusively, pushes active into the pipeline, 
releases updatesLock
+   * and compacts the pipeline.
+   */
+  private class InMemoryCompactionRunnable implements Runnable {
     @Override
     public void run() {
-      try {
-        flushInMemory();
-      } catch (IOException e) {
-        LOG.warn("Unable to run memstore compaction. region "
-            + getRegionServices().getRegionInfo().getRegionNameAsString()
-            + "store: "+ getFamilyName(), e);
-      }
+      inMemoryCompaction();
     }
   }
 
   @VisibleForTesting
   boolean isMemStoreFlushingInMemory() {
-    return inMemoryFlushInProgress.get();
+    return inMemoryCompactionInProgress.get();
   }
 
   /**
@@ -567,10 +614,10 @@ public class CompactingMemStore extends AbstractMemStore {
 
   // debug method
   public void debug() {
-    String msg = "active size=" + this.active.getDataSize();
-    msg += " in-memory flush size is "+ inmemoryFlushSize;
+    String msg = "active size=" + getActive().getDataSize();
     msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
-    msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? 
"true" : "false");
+    msg += " inMemoryCompactionInProgress is "+ 
(inMemoryCompactionInProgress.get() ? "true" :
+        "false");
     LOG.debug(msg);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index f8aa3ef..1131c3c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -151,15 +150,10 @@ public class CompactionPipeline {
       long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
       long heapSizeDelta = suffixHeapSize - newHeapSize;
       region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, 
-offHeapSizeDelta);
-      LOG.debug("Suffix data size={}, new segment data size={}, "
-              + "suffix heap size={}," + "new segment heap size={}"
-              + "suffix off heap size={}," + "new segment off heap size={}"
-          , suffixDataSize
-          , newDataSize
-          , suffixHeapSize
-          , newHeapSize
-          , suffixOffHeapSize
-          , newOffHeapSize);
+      LOG.debug("Suffix data size={}, new segment data size={}, " + "suffix 
heap size={},"
+              + "new segment heap size={}" + "suffix off heap size={},"
+              + "new segment off heap size={}", suffixDataSize, newDataSize, 
suffixHeapSize,
+          newHeapSize, suffixOffHeapSize, newOffHeapSize);
     }
     return true;
   }
@@ -214,6 +208,7 @@ public class CompactionPipeline {
       int i = 0;
       for (ImmutableSegment s : pipeline) {
         if ( s.canBeFlattened() ) {
+          s.waitForUpdates(); // to ensure all updates preceding s in-memory 
flush have completed
           // size to be updated
           MemStoreSizing newMemstoreAccounting = new 
NonThreadSafeMemStoreSizing();
           ImmutableSegment newS = 
SegmentFactory.instance().createImmutableSegmentByFlattening(
@@ -223,7 +218,6 @@ public class CompactionPipeline {
             // Update the global memstore size counter upon flattening there 
is no change in the
             // data size
             MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
-            Preconditions.checkArgument(mss.getDataSize() == 0, "Not zero!");
             region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), 
mss.getOffHeapSize());
           }
           LOG.debug("Compaction pipeline segment {} flattened", s);

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
index dcfaf81..372c660 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
@@ -239,13 +239,14 @@ public class CompositeImmutableSegment extends 
ImmutableSegment {
   }
 
   @Override
-  protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing 
memstoreSizing) {
+  protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing 
memstoreSizing,
+      boolean sizeAddedPreOperation) {
     throw new IllegalStateException("Not supported by 
CompositeImmutableScanner");
   }
 
   @Override
   protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean 
mslabUsed,
-      MemStoreSizing memstoreSizing) {
+      MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) {
     throw new IllegalStateException("Not supported by 
CompositeImmutableScanner");
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 5dcf48b..97170fb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -87,9 +87,9 @@ public class DefaultMemStore extends AbstractMemStore {
           "Doing nothing. Another ongoing flush or did we fail last attempt?");
     } else {
       this.snapshotId = EnvironmentEdgeManager.currentTime();
-      if (!this.active.isEmpty()) {
+      if (!getActive().isEmpty()) {
         ImmutableSegment immutableSegment = SegmentFactory.instance().
-            createImmutableSegment(this.active);
+            createImmutableSegment(getActive());
         this.snapshot = immutableSegment;
         resetActive();
       }
@@ -100,17 +100,17 @@ public class DefaultMemStore extends AbstractMemStore {
   @Override
   public MemStoreSize getFlushableSize() {
     MemStoreSize mss = getSnapshotSize();
-    return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
+    return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
   }
 
   @Override
   protected long keySize() {
-    return this.active.getDataSize();
+    return getActive().getDataSize();
   }
 
   @Override
   protected long heapSize() {
-    return this.active.getHeapSize();
+    return getActive().getHeapSize();
   }
 
   @Override
@@ -119,7 +119,7 @@ public class DefaultMemStore extends AbstractMemStore {
    */
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
     List<KeyValueScanner> list = new ArrayList<>();
-    addToScanners(active, readPt, list);
+    addToScanners(getActive(), readPt, list);
     addToScanners(snapshot.getAllSegments(), readPt, list);
     return list;
   }
@@ -127,8 +127,8 @@ public class DefaultMemStore extends AbstractMemStore {
   @Override
   protected List<Segment> getSegments() throws IOException {
     List<Segment> list = new ArrayList<>(2);
-    list.add(this.active);
-    list.add(this.snapshot);
+    list.add(getActive());
+    list.add(snapshot);
     return list;
   }
 
@@ -139,27 +139,31 @@ public class DefaultMemStore extends AbstractMemStore {
    */
   Cell getNextRow(final Cell cell) {
     return getLowest(
-        getNextRow(cell, this.active.getCellSet()),
+        getNextRow(cell, this.getActive().getCellSet()),
         getNextRow(cell, this.snapshot.getCellSet()));
   }
 
   @Override public void updateLowestUnflushedSequenceIdInWAL(boolean 
onlyIfMoreRecent) {
   }
 
-  @Override
-  public MemStoreSize size() {
-    return active.getMemStoreSize();
+  @Override protected boolean preUpdate(MutableSegment currentActive, Cell 
cell,
+      MemStoreSizing memstoreSizing) {
+    return true;
   }
 
-  /**
-   * Check whether anything need to be done based on the current active set 
size
-   * Nothing need to be done for the DefaultMemStore
-   */
-  @Override
-  protected void checkActiveSize() {
+  @Override protected void postUpdate(MutableSegment currentActive) {
     return;
   }
 
+  @Override protected boolean sizeAddedPreOperation() {
+    return false;
+  }
+
+  @Override
+  public MemStoreSize size() {
+    return getActive().getMemStoreSize();
+  }
+
   @Override
   public long preFlushSeqIDEstimation() {
     return HConstants.NO_SEQNUM;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index cea85e9..9973742 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -207,10 +208,14 @@ public class MemStoreCompactor {
 
     ImmutableSegment result = null;
     MemStoreSegmentsIterator iterator = null;
+    List<ImmutableSegment> segments = versionedList.getStoreSegments();
+    for (ImmutableSegment s : segments) {
+      s.waitForUpdates(); // to ensure all updates preceding s in-memory flush 
have completed
+    }
 
     switch (action) {
       case COMPACT:
-        iterator = new 
MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
+        iterator = new MemStoreCompactorSegmentsIterator(segments,
             compactingMemStore.getComparator(),
             compactionKVMax, compactingMemStore.getStore());
 
@@ -222,13 +227,12 @@ public class MemStoreCompactor {
       case MERGE:
       case MERGE_COUNT_UNIQUE_KEYS:
         iterator =
-            new 
MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
+            new MemStoreMergerSegmentsIterator(segments,
             compactingMemStore.getComparator(), compactionKVMax);
 
         result = SegmentFactory.instance().createImmutableSegmentByMerge(
           compactingMemStore.getConfiguration(), 
compactingMemStore.getComparator(), iterator,
-          versionedList.getNumOfCells(), versionedList.getStoreSegments(),
-          compactingMemStore.getIndexType(), action);
+          versionedList.getNumOfCells(), segments, 
compactingMemStore.getIndexType(), action);
         iterator.close();
         break;
       default:

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index ac7223f..ce775f8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -120,8 +120,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
    */
   @Override
   public Cell forceCopyOfBigCellInto(Cell cell) {
-    int size = cell instanceof ExtendedCell? 
((ExtendedCell)cell).getSerializedSize():
-        KeyValueUtil.length(cell);
+    int size = Segment.getCellLength(cell);
     size += ChunkCreator.SIZEOF_CHUNK_HEADER;
     Preconditions.checkArgument(size >= 0, "negative size");
     if (size <= dataChunkSize) {
@@ -135,8 +134,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
   }
 
   private Cell copyCellInto(Cell cell, int maxAlloc) {
-    int size = cell instanceof ExtendedCell? 
((ExtendedCell)cell).getSerializedSize():
-        KeyValueUtil.length(cell);
+    int size = Segment.getCellLength(cell);
     Preconditions.checkArgument(size >= 0, "negative size");
     // Callers should satisfy large allocations directly from JVM since they
     // don't cause fragmentation as badly.

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
index 8430ac6..22ca9b6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSizing.java
@@ -81,6 +81,10 @@ public interface MemStoreSizing {
         long offHeapSizeDelta) {
       throw new RuntimeException("I'm a DUD, you can't use me!");
     }
+
+    @Override public boolean compareAndSetDataSize(long expected, long 
updated) {
+      throw new RuntimeException("I'm a DUD, you can't use me!");
+    }
   };
 
   /**
@@ -104,6 +108,8 @@ public interface MemStoreSizing {
     return incMemStoreSize(-delta.getDataSize(), -delta.getHeapSize(), 
-delta.getOffHeapSize());
   }
 
+  boolean compareAndSetDataSize(long expected, long updated);
+
   long getDataSize();
   long getHeapSize();
   long getOffHeapSize();
@@ -113,4 +119,4 @@ public interface MemStoreSizing {
    * {@link #getHeapSize()}, and {@link #getOffHeapSize()}, in the one go.
    */
   MemStoreSize getMemStoreSize();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
index c72d385..d76321d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.util.Iterator;
 import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
@@ -38,9 +39,13 @@ import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 @InterfaceAudience.Private
 public class MutableSegment extends Segment {
 
-  public final static long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
-        + ClassSize.CONCURRENT_SKIPLISTMAP
-        + ClassSize.SYNC_TIMERANGE_TRACKER;
+  private final AtomicBoolean flushed = new AtomicBoolean(false);
+
+  public final static long DEEP_OVERHEAD = 
ClassSize.align(Segment.DEEP_OVERHEAD
+      + ClassSize.CONCURRENT_SKIPLISTMAP
+      + ClassSize.SYNC_TIMERANGE_TRACKER
+      + ClassSize.REFERENCE
+      + ClassSize.ATOMIC_BOOLEAN);
 
   protected MutableSegment(CellSet cellSet, CellComparator comparator, 
MemStoreLAB memStoreLAB) {
     super(cellSet, comparator, memStoreLAB, 
TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
@@ -52,12 +57,14 @@ public class MutableSegment extends Segment {
    * @param cell the cell to add
    * @param mslabUsed whether using MSLAB
    */
-  public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing) 
{
-    internalAdd(cell, mslabUsed, memStoreSizing);
+  public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing,
+      boolean sizeAddedPreOperation) {
+    internalAdd(cell, mslabUsed, memStoreSizing, sizeAddedPreOperation);
   }
 
-  public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing) 
{
-    internalAdd(cell, false, memStoreSizing);
+  public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing,
+      boolean sizeAddedPreOperation) {
+    internalAdd(cell, false, memStoreSizing, sizeAddedPreOperation);
 
     // Get the Cells for the row/family/qualifier regardless of timestamp.
     // For this case we want to clean up any other puts
@@ -105,6 +112,10 @@ public class MutableSegment extends Segment {
     }
   }
 
+  public boolean setInMemoryFlushed() {
+    return flushed.compareAndSet(false, true);
+  }
+
   /**
    * Returns the first cell in the segment
    * @return the first cell in the segment

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java
index 601ff33..7b3b1d3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonThreadSafeMemStoreSizing.java
@@ -59,6 +59,14 @@ class NonThreadSafeMemStoreSizing implements MemStoreSizing {
     return this.dataSize;
   }
 
+  @Override public boolean compareAndSetDataSize(long expected, long updated) {
+    if(dataSize == expected) {
+      dataSize = updated;
+      return true;
+    }
+    return false;
+  }
+
   @Override
   public long getDataSize() {
     return dataSize;
@@ -78,4 +86,4 @@ class NonThreadSafeMemStoreSizing implements MemStoreSizing {
   public String toString() {
     return getMemStoreSize().toString();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
index b088856..31f2d85 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
@@ -57,14 +57,6 @@ public class RegionServicesForStores {
     this.region = region;
   }
 
-  public void blockUpdates() {
-    region.blockUpdates();
-  }
-
-  public void unblockUpdates() {
-    region.unblockUpdates();
-  }
-
   public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long 
offHeapSizeDelta) {
     region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index 7069bf8..e68da16 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -24,9 +24,11 @@ import java.util.List;
 import java.util.Objects;
 import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.ExtendedCell;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
@@ -48,15 +50,17 @@ import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 public abstract class Segment implements MemStoreSizing {
 
   public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
-      + 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, 
memStoreSizing,
+      + 6 * ClassSize.REFERENCE // cellSet, comparator, updatesLock, 
memStoreLAB, memStoreSizing,
                                 // and timeRangeTracker
       + Bytes.SIZEOF_LONG // minSequenceId
       + Bytes.SIZEOF_BOOLEAN); // tagsPresent
   public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + 
ClassSize.ATOMIC_REFERENCE
-      + ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG;
+      + ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG
+      + ClassSize.REENTRANT_LOCK;
 
   private AtomicReference<CellSet> cellSet= new AtomicReference<>();
   private final CellComparator comparator;
+  private ReentrantReadWriteLock updatesLock;
   protected long minSequenceId;
   private MemStoreLAB memStoreLAB;
   // Sum of sizes of all Cells added to this Segment. Cell's HeapSize is 
considered. This is not
@@ -87,6 +91,7 @@ public abstract class Segment implements MemStoreSizing {
       OffHeapSize += memStoreSize.getOffHeapSize();
     }
     this.comparator = comparator;
+    this.updatesLock = new ReentrantReadWriteLock();
     // Do we need to be thread safe always? What if ImmutableSegment?
     // DITTO for the TimeRangeTracker below.
     this.memStoreSizing = new ThreadSafeMemStoreSizing(dataSize, heapSize, 
OffHeapSize);
@@ -97,6 +102,7 @@ public abstract class Segment implements MemStoreSizing {
   protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB 
memStoreLAB, TimeRangeTracker trt) {
     this.cellSet.set(cellSet);
     this.comparator = comparator;
+    this.updatesLock = new ReentrantReadWriteLock();
     this.minSequenceId = Long.MAX_VALUE;
     this.memStoreLAB = memStoreLAB;
     // Do we need to be thread safe always? What if ImmutableSegment?
@@ -109,9 +115,10 @@ public abstract class Segment implements MemStoreSizing {
   protected Segment(Segment segment) {
     this.cellSet.set(segment.getCellSet());
     this.comparator = segment.getComparator();
+    this.updatesLock = segment.getUpdatesLock();
     this.minSequenceId = segment.getMinSequenceId();
     this.memStoreLAB = segment.getMemStoreLAB();
-    this.memStoreSizing = new 
ThreadSafeMemStoreSizing(segment.memStoreSizing.getMemStoreSize());
+    this.memStoreSizing = segment.memStoreSizing;
     this.tagsPresent = segment.isTagsPresent();
     this.timeRangeTracker = segment.getTimeRangeTracker();
   }
@@ -183,7 +190,8 @@ public abstract class Segment implements MemStoreSizing {
    */
   @VisibleForTesting
   static int getCellLength(Cell cell) {
-    return KeyValueUtil.length(cell);
+    return cell instanceof ExtendedCell ? 
((ExtendedCell)cell).getSerializedSize():
+        KeyValueUtil.length(cell);
   }
 
   public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
@@ -244,6 +252,25 @@ public abstract class Segment implements MemStoreSizing {
     return this.memStoreSizing.incMemStoreSize(delta, heapOverhead, 
offHeapOverhead);
   }
 
+  public boolean sharedLock() {
+    return updatesLock.readLock().tryLock();
+  }
+
+  public void sharedUnlock() {
+    updatesLock.readLock().unlock();
+  }
+
+  public void waitForUpdates() {
+    if(!updatesLock.isWriteLocked()) {
+      updatesLock.writeLock().lock();
+    }
+  }
+
+  @Override
+  public boolean compareAndSetDataSize(long expected, long updated) {
+    return memStoreSizing.compareAndSetDataSize(expected, updated);
+  }
+
   public long getMinSequenceId() {
     return minSequenceId;
   }
@@ -288,25 +315,30 @@ public abstract class Segment implements MemStoreSizing {
     return comparator;
   }
 
-  protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing 
memstoreSizing) {
+  protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing 
memstoreSizing,
+      boolean sizeAddedPreOperation) {
     boolean succ = getCellSet().add(cell);
-    updateMetaInfo(cell, succ, mslabUsed, memstoreSizing);
+    updateMetaInfo(cell, succ, mslabUsed, memstoreSizing, 
sizeAddedPreOperation);
   }
 
   protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean 
mslabUsed,
-      MemStoreSizing memstoreSizing) {
-    long cellSize = 0;
+      MemStoreSizing memstoreSizing, boolean sizeAddedPreOperation) {
+    long delta = 0;
+    long cellSize = getCellLength(cellToAdd);
     // If there's already a same cell in the CellSet and we are using MSLAB, 
we must count in the
     // MSLAB allocation size as well, or else there will be memory leak 
(occupied heap size larger
     // than the counted number)
     if (succ || mslabUsed) {
-      cellSize = getCellLength(cellToAdd);
+      delta = cellSize;
     }
-    long heapSize = heapSizeChange(cellToAdd, succ);
-    long offHeapSize = offHeapSizeChange(cellToAdd, succ);
-    incMemStoreSize(cellSize, heapSize, offHeapSize);
+    if(sizeAddedPreOperation) {
+      delta -= cellSize;
+    }
+    long heapSize = heapSizeChange(cellToAdd, succ || mslabUsed);
+    long offHeapSize = offHeapSizeChange(cellToAdd, succ || mslabUsed);
+    incMemStoreSize(delta, heapSize, offHeapSize);
     if (memstoreSizing != null) {
-      memstoreSizing.incMemStoreSize(cellSize, heapSize, offHeapSize);
+      memstoreSizing.incMemStoreSize(delta, heapSize, offHeapSize);
     }
     getTimeRangeTracker().includeTimestamp(cellToAdd);
     minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
@@ -320,16 +352,16 @@ public abstract class Segment implements MemStoreSizing {
   }
 
   protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSizing 
memstoreSizing) {
-    updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing);
+    updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing, 
false);
   }
 
   /**
    * @return The increase in heap size because of this cell addition. This 
includes this cell POJO's
    *         heap size itself and additional overhead because of addition on 
to CSLM.
    */
-  protected long heapSizeChange(Cell cell, boolean succ) {
+  protected long heapSizeChange(Cell cell, boolean allocated) {
     long res = 0;
-    if (succ) {
+    if (allocated) {
       boolean onHeap = true;
       MemStoreLAB memStoreLAB = getMemStoreLAB();
       if(memStoreLAB != null) {
@@ -344,9 +376,9 @@ public abstract class Segment implements MemStoreSizing {
     return res;
   }
 
-  protected long offHeapSizeChange(Cell cell, boolean succ) {
+  protected long offHeapSizeChange(Cell cell, boolean allocated) {
     long res = 0;
-    if (succ) {
+    if (allocated) {
       boolean offHeap = false;
       MemStoreLAB memStoreLAB = getMemStoreLAB();
       if(memStoreLAB != null) {
@@ -410,4 +442,8 @@ public abstract class Segment implements MemStoreSizing {
     res += "max timestamp=" + timeRangeTracker.getMax();
     return res;
   }
+
+  private ReentrantReadWriteLock getUpdatesLock() {
+    return updatesLock;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java
index de05493..8e343d0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ThreadSafeMemStoreSizing.java
@@ -58,6 +58,10 @@ class ThreadSafeMemStoreSizing implements MemStoreSizing {
     return this.dataSize.addAndGet(dataSizeDelta);
   }
 
+  @Override public boolean compareAndSetDataSize(long expected, long updated) {
+    return dataSize.compareAndSet(expected,updated);
+  }
+
   @Override
   public long getDataSize() {
     return dataSize.get();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 2d454e5..993503d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -376,11 +376,13 @@ public class TestHeapSize  {
     expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
     expected += ClassSize.estimateBase(AtomicReference.class, false);
     expected += ClassSize.estimateBase(CellSet.class, false);
+    expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
+      ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
       assertEquals(expected, actual);
     }
 
@@ -391,16 +393,20 @@ public class TestHeapSize  {
     expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
     expected += ClassSize.estimateBase(AtomicReference.class, false);
     expected += ClassSize.estimateBase(CellSet.class, false);
+    expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
     expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
     expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
+    expected += ClassSize.estimateBase(AtomicBoolean.class, false);
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
+      ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
       ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
       ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
+      ClassSize.estimateBase(AtomicBoolean.class,true);
       assertEquals(expected, actual);
     }
 
@@ -411,6 +417,7 @@ public class TestHeapSize  {
     expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
     expected += ClassSize.estimateBase(AtomicReference.class, false);
     expected += ClassSize.estimateBase(CellSet.class, false);
+    expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
     expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
@@ -418,6 +425,7 @@ public class TestHeapSize  {
       ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
+      ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
       ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
       assertEquals(expected, actual);
     }
@@ -428,6 +436,7 @@ public class TestHeapSize  {
     expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
     expected += ClassSize.estimateBase(AtomicReference.class, false);
     expected += ClassSize.estimateBase(CellSet.class, false);
+    expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
     expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
     expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
     if (expected != actual) {
@@ -436,6 +445,7 @@ public class TestHeapSize  {
       ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
+      ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
       ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
       ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
       assertEquals(expected, actual);
@@ -446,6 +456,7 @@ public class TestHeapSize  {
     expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
     expected += ClassSize.estimateBase(AtomicReference.class, false);
     expected += ClassSize.estimateBase(CellSet.class, false);
+    expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
     expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
     expected += ClassSize.estimateBase(CellArrayMap.class, false);
     if (expected != actual) {
@@ -454,6 +465,7 @@ public class TestHeapSize  {
       ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
+      ClassSize.estimateBase(ReentrantReadWriteLock.class,true);
       ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
       ClassSize.estimateBase(CellArrayMap.class, true);
       assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 8dbddb9..ade8563 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -838,7 +838,7 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
       byte[] row = Bytes.toBytes(keys[i]);
       byte[] val = Bytes.toBytes(keys[i] + i);
       KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
-      totalLen += kv.getLength();
+      totalLen += Segment.getCellLength(kv);
       hmc.add(kv, null);
       LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + 
kv.getTimestamp());
     }
@@ -859,7 +859,7 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
       Threads.sleep(1); // to make sure each kv gets a different ts
       byte[] row = Bytes.toBytes(keys[i]);
       KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
-      totalLen += kv.getLength();
+      totalLen += Segment.getCellLength(kv);
       hmc.add(kv, null);
       LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + 
kv.getTimestamp());
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
index 943757b..7997a45 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
@@ -757,7 +757,7 @@ public class TestCompactingToCellFlatMapMemStore extends 
TestCompactingMemStore
     // set memstore to flat into CellChunkMap
     MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
     
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
-            String.valueOf(compactionType));
+        String.valueOf(compactionType));
     ((MyCompactingMemStore) memstore).initiateType(compactionType, 
memstore.getConfiguration());
     ((CompactingMemStore) 
memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
 
@@ -796,11 +796,13 @@ public class TestCompactingToCellFlatMapMemStore extends 
TestCompactingMemStore
     // One cell is duplicated, but it shouldn't be compacted because we are in 
BASIC mode.
     // totalCellsLen should remain the same
     long oneCellOnCCMHeapSize =
-            ClassSize.CELL_CHUNK_MAP_ENTRY + 
ClassSize.align(KeyValueUtil.length(kv));
+        (long) ClassSize.CELL_CHUNK_MAP_ENTRY + 
ClassSize.align(KeyValueUtil.length(kv));
     totalHeapSize = MutableSegment.DEEP_OVERHEAD + 
CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
             + numOfCells * oneCellOnCCMHeapSize;
 
-    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
+    assertEquals(totalCellsLen+ChunkCreator.SIZEOF_CHUNK_HEADER, 
regionServicesForStores
+        .getMemStoreSize());
+
     assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
 
     MemStoreSize mss = memstore.getFlushableSize();
@@ -824,8 +826,9 @@ public class TestCompactingToCellFlatMapMemStore extends 
TestCompactingMemStore
     // but smaller than the size of two cells.
     // Therefore, the two created cells are flattened together.
     totalHeapSize = MutableSegment.DEEP_OVERHEAD
-            + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
-            + 2 * oneCellOnCCMHeapSize;
+        + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+        + 1 * oneCellOnCSLMHeapSize
+        + 1 * oneCellOnCCMHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
   }
 
@@ -848,6 +851,8 @@ public class TestCompactingToCellFlatMapMemStore extends 
TestCompactingMemStore
     MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
     memstore.getConfiguration().setInt(MemStoreCompactionStrategy
         .COMPACTING_MEMSTORE_THRESHOLD_KEY, 4);
+    memstore.getConfiguration()
+        .setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 
0.014);
     
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(compactionType));
     ((MyCompactingMemStore) memstore).initiateType(compactionType, 
memstore.getConfiguration());
@@ -860,39 +865,42 @@ public class TestCompactingToCellFlatMapMemStore extends 
TestCompactingMemStore
     String bigVal = new String(chars);
     byte[] val = Bytes.toBytes(bigVal);
 
-    // We need to add two cells, five times, in order to guarantee a merge
+    // We need to add two cells, three times, in order to guarantee a merge
     List<String[]> keysList = new ArrayList<>();
     keysList.add(new String[]{"A", "B"});
     keysList.add(new String[]{"C", "D"});
     keysList.add(new String[]{"E", "F"});
     keysList.add(new String[]{"G", "H"});
-    keysList.add(new String[]{"I", "J"});
 
     // Measuring the size of a single kv
     KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"),
             Bytes.toBytes("testqualifier"), System.currentTimeMillis(), val);
     long oneCellOnCCMHeapSize =
-            ClassSize.CELL_CHUNK_MAP_ENTRY + 
ClassSize.align(KeyValueUtil.length(kv));
-
+        (long) ClassSize.CELL_CHUNK_MAP_ENTRY + 
ClassSize.align(KeyValueUtil.length(kv));
+    long oneCellOnCSLMHeapSize =
+        ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + 
kv.heapSize());
     long totalHeapSize = MutableSegment.DEEP_OVERHEAD;
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < keysList.size(); i++) {
       addRowsByKeys(memstore, keysList.get(i), val);
       while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
         Threads.sleep(10);
       }
 
-      // The in-memory flush size is bigger than the size of a single cell,
-      // but smaller than the size of two cells.
-      // Therefore, the two created cells are flattened together.
-      totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
-              + 2 * oneCellOnCCMHeapSize;
-      if (i == 4) {
-        // Four out of the five are merged into one,
-        // and the segment becomes immutable
-        totalHeapSize -= (3 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
-                + MutableSegment.DEEP_OVERHEAD);
+      if(i==0) {
+        totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+            + oneCellOnCCMHeapSize + oneCellOnCSLMHeapSize;
+      } else {
+        // The in-memory flush size is bigger than the size of a single cell,
+        // but smaller than the size of two cells.
+        // Therefore, the two created cells are flattened in a seperate 
segment.
+        totalHeapSize += 2 * (CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + 
oneCellOnCCMHeapSize);
+      }
+      if (i == 2) {
+        // Four out of the five segments are merged into one
+        totalHeapSize -= (4 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
+        totalHeapSize = ClassSize.align(totalHeapSize);
       }
-      assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
+      assertEquals("i="+i, totalHeapSize, ((CompactingMemStore) 
memstore).heapSize());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d822ee3a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 6803003..12df8f1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -1765,15 +1765,12 @@ public class TestHStore {
     }
 
     @Override
-    protected boolean shouldFlushInMemory() {
-      boolean rval = super.shouldFlushInMemory();
-      if (rval) {
-        RUNNER_COUNT.incrementAndGet();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("runner count: " + RUNNER_COUNT.get());
-        }
+    void inMemoryCompaction() {
+      RUNNER_COUNT.incrementAndGet();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("runner count: " + RUNNER_COUNT.get());
       }
-      return rval;
+      super.inMemoryCompaction();
     }
   }
 

Reply via email to