Repository: hbase
Updated Branches:
  refs/heads/master 97fd9051f -> f65a439f0


HBASE-17291 Remove ImmutableSegment#getKeyValueScanner (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f65a439f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f65a439f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f65a439f

Branch: refs/heads/master
Commit: f65a439f0188c0811c2ca07b5d830498809a13ab
Parents: 97fd905
Author: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Authored: Mon Jan 9 12:06:25 2017 +0530
Committer: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Committed: Mon Jan 9 12:06:25 2017 +0530

----------------------------------------------------------------------
 .../hbase/regionserver/ImmutableSegment.java    |   7 +-
 .../hbase/regionserver/MemStoreSnapshot.java    |   2 +-
 .../hbase/regionserver/SegmentScanner.java      |  34 +++---
 .../hbase/regionserver/SnapshotScanner.java     | 105 +++++++++++++++++++
 .../regionserver/TestCompactingMemStore.java    |  10 ++
 .../TestCompactingToCellArrayMapMemStore.java   | 102 ++++++++++++++++++
 .../regionserver/TestMemStoreChunkPool.java     |   6 ++
 7 files changed, 247 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 4cdb29d..0fae6c3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -27,14 +27,13 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 
 import java.io.IOException;
 
 /**
  * ImmutableSegment is an abstract class that extends the API supported by a 
{@link Segment},
  * and is not needed for a {@link MutableSegment}. Specifically, the method
- * {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for 
the
+ * {@link ImmutableSegment#getSnapshotScanner()} builds a special scanner for 
the
  * {@link MemStoreSnapshot} object.
  */
 @InterfaceAudience.Private
@@ -127,8 +126,8 @@ public class ImmutableSegment extends Segment {
    * general segment scanner.
    * @return a special scanner for the MemStoreSnapshot object
    */
-  public KeyValueScanner getKeyValueScanner() {
-    return new CollectionBackedScanner(getCellSet(), getComparator());
+  public KeyValueScanner getSnapshotScanner() {
+    return new SnapshotScanner(this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
index 74d1e17..61e7876 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
@@ -40,7 +40,7 @@ public class MemStoreSnapshot {
     this.dataSize = snapshot.keySize();
     this.heapOverhead = snapshot.heapOverhead();
     this.timeRangeTracker = snapshot.getTimeRangeTracker();
-    this.scanner = snapshot.getKeyValueScanner();
+    this.scanner = snapshot.getSnapshotScanner();
     this.tagsPresent = snapshot.isTagsPresent();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 7803f7d..5e2e36f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -41,14 +41,14 @@ public class SegmentScanner implements KeyValueScanner {
   private static final long DEFAULT_SCANNER_ORDER = Long.MAX_VALUE;
 
   // the observed structure
-  private final Segment segment;
+  protected final Segment segment;
   // the highest relevant MVCC
   private long readPoint;
   // the current iterator that can be reinitialized by
   // seek(), backwardSeek(), or reseek()
-  private Iterator<Cell> iter;
+  protected Iterator<Cell> iter;
   // the pre-calculated cell to be returned by peek()
-  private Cell current = null;
+  protected Cell current = null;
   // or next()
   // A flag represents whether could stop skipping KeyValues for MVCC
   // if have encountered the next row. Only used for reversed scan
@@ -57,7 +57,7 @@ public class SegmentScanner implements KeyValueScanner {
   private Cell last = null;
 
   // flag to indicate if this scanner is closed
-  private boolean closed = false;
+  protected boolean closed = false;
 
   protected SegmentScanner(Segment segment, long readPoint) {
     this(segment, readPoint, DEFAULT_SCANNER_ORDER);
@@ -74,7 +74,7 @@ public class SegmentScanner implements KeyValueScanner {
     this.segment.incScannerCount();
     iter = segment.iterator();
     // the initialization of the current is required for working with heap of 
SegmentScanners
-    current = getNext();
+    updateCurrent();
     this.scannerOrder = scannerOrder;
     if (current == null) {
       // nothing to fetch from this scanner
@@ -108,7 +108,7 @@ public class SegmentScanner implements KeyValueScanner {
       return null;
     }
     Cell oldCurrent = current;
-    current = getNext();                  // update the currently observed Cell
+    updateCurrent();                  // update the currently observed Cell
     return oldCurrent;
   }
 
@@ -127,13 +127,17 @@ public class SegmentScanner implements KeyValueScanner {
       return false;
     }
     // restart the iterator from new key
-    iter = segment.tailSet(cell).iterator();
+    iter = getIterator(cell);
     // last is going to be reinitialized in the next getNext() call
     last = null;
-    current = getNext();
+    updateCurrent();
     return (current != null);
   }
 
+  protected Iterator<Cell> getIterator(Cell cell) {
+    return segment.tailSet(cell).iterator();
+  }
+
   /**
    * Reseek the scanner at or after the specified KeyValue.
    * This method is guaranteed to seek at or after the required key only if the
@@ -156,8 +160,8 @@ public class SegmentScanner implements KeyValueScanner {
     get it. So we remember the last keys we iterated to and restore
     the reseeked set to at least that point.
     */
-    iter = segment.tailSet(getHighest(cell, last)).iterator();
-    current = getNext();
+    iter = getIterator(getHighest(cell, last));
+    updateCurrent();
     return (current != null);
   }
 
@@ -355,7 +359,7 @@ public class SegmentScanner implements KeyValueScanner {
    * Private internal method for iterating over the segment,
    * skipping the cells with irrelevant MVCC
    */
-  private Cell getNext() {
+  protected void updateCurrent() {
     Cell startKV = current;
     Cell next = null;
 
@@ -363,16 +367,18 @@ public class SegmentScanner implements KeyValueScanner {
       while (iter.hasNext()) {
         next = iter.next();
         if (next.getSequenceId() <= this.readPoint) {
-          return next;                    // skip irrelevant versions
+          current = next;
+          return;// skip irrelevant versions
         }
         if (stopSkippingKVsIfNextRow &&   // for backwardSeek() stay in the
             startKV != null &&        // boundaries of a single row
             segment.compareRows(next, startKV) > 0) {
-          return null;
+          current = null;
+          return;
         }
       } // end of while
 
-      return null; // nothing found
+      current = null; // nothing found
     } finally {
       if (next != null) {
         // in all cases, remember the last KV we iterated to, needed for 
reseek()

http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java
new file mode 100644
index 0000000..6300e00
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * Scans the snapshot. Acts as a simple scanner that just iterates over all 
the cells
+ * in the segment
+ */
+@InterfaceAudience.Private
+public class SnapshotScanner extends SegmentScanner {
+
+  public SnapshotScanner(Segment immutableSegment) {
+    // Snapshot scanner does not need readpoint. It should read all the cells 
in the
+    // segment
+    super(immutableSegment, Long.MAX_VALUE);
+  }
+
+  @Override
+  public Cell peek() { // sanity check, the current should be always valid
+    if (closed) {
+      return null;
+    }
+    return current;
+  }
+
+  @Override
+  public boolean shouldUseScanner(Scan scan, Store store, long 
oldestUnexpiredTS) {
+    return true;
+  }
+
+  @Override
+  public boolean backwardSeek(Cell key) throws IOException {
+    throw new NotImplementedException(
+        "backwardSeek must not be called on a " + "non-reversed scanner");
+  }
+
+  @Override
+  public boolean seekToPreviousRow(Cell key) throws IOException {
+    throw new NotImplementedException(
+        "seekToPreviousRow must not be called on a " + "non-reversed scanner");
+  }
+
+  @Override
+  public boolean seekToLastRow() throws IOException {
+    throw new NotImplementedException(
+        "seekToLastRow must not be called on a " + "non-reversed scanner");
+  }
+
+  @Override
+  protected Iterator<Cell> getIterator(Cell cell) {
+    return segment.iterator();
+  }
+
+  @Override
+  protected void updateCurrent() {
+    if (iter.hasNext()) {
+      current = iter.next();
+    } else {
+      current = null;
+    }
+  }
+
+  @Override
+  public boolean seek(Cell seekCell) {
+    // restart iterator
+    iter = getIterator(seekCell);
+    return reseek(seekCell);
+  }
+
+  @Override
+  public boolean reseek(Cell seekCell) {
+    while (iter.hasNext()) {
+      Cell next = iter.next();
+      int ret = segment.getComparator().compare(next, seekCell);
+      if (ret >= 0) {
+        current = next;
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index b0b63a9..0aa2814 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -393,6 +393,8 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
     memstore.add(new KeyValue(row, fam, qf4, val), null);
     memstore.add(new KeyValue(row, fam, qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
+    // close the scanner
+    snapshot.getScanner().close();
     memstore.clearSnapshot(snapshot.getId());
 
     int chunkCount = chunkPool.getPoolSize();
@@ -433,6 +435,8 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
     List<KeyValueScanner> scanners = memstore.getScanners(0);
     // Shouldn't putting back the chunks to pool,since some scanners are 
opening
     // based on their data
+    // close the scanner
+    snapshot.getScanner().close();
     memstore.clearSnapshot(snapshot.getId());
 
     assertTrue(chunkPool.getPoolSize() == 0);
@@ -460,6 +464,8 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
     }
     // Since no opening scanner, the chunks of snapshot should be put back to
     // pool
+    // close the scanner
+    snapshot.getScanner().close();
     memstore.clearSnapshot(snapshot.getId());
     assertTrue(chunkPool.getPoolSize() > 0);
   }
@@ -527,6 +533,8 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
     // Creating another snapshot
 
     MemStoreSnapshot snapshot = memstore.snapshot();
+    // close the scanner
+    snapshot.getScanner().close();
     memstore.clearSnapshot(snapshot.getId());
 
     snapshot = memstore.snapshot();
@@ -541,6 +549,8 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
     }
     // Since no opening scanner, the chunks of snapshot should be put back to
     // pool
+    // close the scanner
+    snapshot.getScanner().close();
     memstore.clearSnapshot(snapshot.getId());
     assertTrue(chunkPool.getPoolSize() > 0);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index 56ae72e..bc6f982 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -387,6 +387,108 @@ public class TestCompactingToCellArrayMapMemStore extends 
TestCompactingMemStore
     }
   }
 
+  @Override
+  @Test
+  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
+    byte[] row = Bytes.toBytes("testrow");
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf1 = Bytes.toBytes("testqualifier1");
+    byte[] qf2 = Bytes.toBytes("testqualifier2");
+    byte[] qf3 = Bytes.toBytes("testqualifier3");
+    byte[] qf4 = Bytes.toBytes("testqualifier4");
+    byte[] qf5 = Bytes.toBytes("testqualifier5");
+    byte[] qf6 = Bytes.toBytes("testqualifier6");
+    byte[] qf7 = Bytes.toBytes("testqualifier7");
+    byte[] val = Bytes.toBytes("testval");
+
+    // Setting up memstore
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
+
+    // Creating a snapshot
+    MemStoreSnapshot snapshot = memstore.snapshot();
+    assertEquals(3, memstore.getSnapshot().getCellsCount());
+
+    // Adding value to "new" memstore
+    assertEquals(0, memstore.getActive().getCellsCount());
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
+    assertEquals(2, memstore.getActive().getCellsCount());
+
+    // opening scanner before clear the snapshot
+    List<KeyValueScanner> scanners = memstore.getScanners(0);
+    // Shouldn't putting back the chunks to pool,since some scanners are 
opening
+    // based on their data
+    // close the scanner
+    snapshot.getScanner().close();
+    memstore.clearSnapshot(snapshot.getId());
+
+    assertTrue(chunkPool.getPoolSize() == 0);
+
+    // Chunks will be put back to pool after close scanners;
+    for (KeyValueScanner scanner : scanners) {
+      scanner.close();
+    }
+    assertTrue(chunkPool.getPoolSize() > 0);
+
+    // clear chunks
+    chunkPool.clearChunks();
+
+    // Creating another snapshot
+
+    snapshot = memstore.snapshot();
+    // Adding more value
+    memstore.add(new KeyValue(row, fam, qf6, val), null);
+    memstore.add(new KeyValue(row, fam, qf7, val), null);
+    // opening scanners
+    scanners = memstore.getScanners(0);
+    // close scanners before clear the snapshot
+    for (KeyValueScanner scanner : scanners) {
+      scanner.close();
+    }
+    // Since no opening scanner, the chunks of snapshot should be put back to
+    // pool
+    // close the scanner
+    snapshot.getScanner().close();
+    memstore.clearSnapshot(snapshot.getId());
+    assertTrue(chunkPool.getPoolSize() > 0);
+  }
+
+  @Test
+  public void testPuttingBackChunksAfterFlushing() throws IOException {
+    byte[] row = Bytes.toBytes("testrow");
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf1 = Bytes.toBytes("testqualifier1");
+    byte[] qf2 = Bytes.toBytes("testqualifier2");
+    byte[] qf3 = Bytes.toBytes("testqualifier3");
+    byte[] qf4 = Bytes.toBytes("testqualifier4");
+    byte[] qf5 = Bytes.toBytes("testqualifier5");
+    byte[] val = Bytes.toBytes("testval");
+
+    // Setting up memstore
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
+
+    // Creating a snapshot
+    MemStoreSnapshot snapshot = memstore.snapshot();
+    assertEquals(3, memstore.getSnapshot().getCellsCount());
+
+    // Adding value to "new" memstore
+    assertEquals(0, memstore.getActive().getCellsCount());
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
+    assertEquals(2, memstore.getActive().getCellsCount());
+    // close the scanner
+    snapshot.getScanner().close();
+    memstore.clearSnapshot(snapshot.getId());
+
+    int chunkCount = chunkPool.getPoolSize();
+    assertTrue(chunkCount > 0);
+  }
+
+
   private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
     byte[] fam = Bytes.toBytes("testfamily");
     byte[] qf = Bytes.toBytes("testqualifier");

http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index e2ba169..49b5139 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -135,6 +135,8 @@ public class TestMemStoreChunkPool {
     memstore.add(new KeyValue(row, fam, qf4, val), null);
     memstore.add(new KeyValue(row, fam, qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
+    // close the scanner - this is how the snapshot will be used
+    snapshot.getScanner().close();
     memstore.clearSnapshot(snapshot.getId());
 
     int chunkCount = chunkPool.getPoolSize();
@@ -177,6 +179,8 @@ public class TestMemStoreChunkPool {
     List<KeyValueScanner> scanners = memstore.getScanners(0);
     // Shouldn't putting back the chunks to pool,since some scanners are 
opening
     // based on their data
+    // close the snapshot scanner
+    snapshot.getScanner().close();
     memstore.clearSnapshot(snapshot.getId());
 
     assertTrue(chunkPool.getPoolSize() == 0);
@@ -203,6 +207,8 @@ public class TestMemStoreChunkPool {
     }
     // Since no opening scanner, the chunks of snapshot should be put back to
     // pool
+    // close the snapshot scanner
+    snapshot.getScanner().close();
     memstore.clearSnapshot(snapshot.getId());
     assertTrue(chunkPool.getPoolSize() > 0);
   }

Reply via email to