Repository: hbase Updated Branches: refs/heads/master 97fd9051f -> f65a439f0
HBASE-17291 Remove ImmutableSegment#getKeyValueScanner (Ram) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f65a439f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f65a439f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f65a439f Branch: refs/heads/master Commit: f65a439f0188c0811c2ca07b5d830498809a13ab Parents: 97fd905 Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Authored: Mon Jan 9 12:06:25 2017 +0530 Committer: Ramkrishna <ramkrishna.s.vasude...@intel.com> Committed: Mon Jan 9 12:06:25 2017 +0530 ---------------------------------------------------------------------- .../hbase/regionserver/ImmutableSegment.java | 7 +- .../hbase/regionserver/MemStoreSnapshot.java | 2 +- .../hbase/regionserver/SegmentScanner.java | 34 +++--- .../hbase/regionserver/SnapshotScanner.java | 105 +++++++++++++++++++ .../regionserver/TestCompactingMemStore.java | 10 ++ .../TestCompactingToCellArrayMapMemStore.java | 102 ++++++++++++++++++ .../regionserver/TestMemStoreChunkPool.java | 6 ++ 7 files changed, 247 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 4cdb29d..0fae6c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -27,14 +27,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.util.CollectionBackedScanner; import java.io.IOException; /** * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, * and is not needed for a {@link MutableSegment}. Specifically, the method - * {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the + * {@link ImmutableSegment#getSnapshotScanner()} builds a special scanner for the * {@link MemStoreSnapshot} object. */ @InterfaceAudience.Private @@ -127,8 +126,8 @@ public class ImmutableSegment extends Segment { * general segment scanner. * @return a special scanner for the MemStoreSnapshot object */ - public KeyValueScanner getKeyValueScanner() { - return new CollectionBackedScanner(getCellSet(), getComparator()); + public KeyValueScanner getSnapshotScanner() { + return new SnapshotScanner(this); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index 74d1e17..61e7876 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -40,7 +40,7 @@ public class MemStoreSnapshot { this.dataSize = snapshot.keySize(); this.heapOverhead = snapshot.heapOverhead(); this.timeRangeTracker = snapshot.getTimeRangeTracker(); - this.scanner = snapshot.getKeyValueScanner(); + this.scanner = snapshot.getSnapshotScanner(); this.tagsPresent = snapshot.isTagsPresent(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 7803f7d..5e2e36f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -41,14 +41,14 @@ public class SegmentScanner implements KeyValueScanner { private static final long DEFAULT_SCANNER_ORDER = Long.MAX_VALUE; // the observed structure - private final Segment segment; + protected final Segment segment; // the highest relevant MVCC private long readPoint; // the current iterator that can be reinitialized by // seek(), backwardSeek(), or reseek() - private Iterator<Cell> iter; + protected Iterator<Cell> iter; // the pre-calculated cell to be returned by peek() - private Cell current = null; + protected Cell current = null; // or next() // A flag represents whether could stop skipping KeyValues for MVCC // if have encountered the next row. Only used for reversed scan @@ -57,7 +57,7 @@ public class SegmentScanner implements KeyValueScanner { private Cell last = null; // flag to indicate if this scanner is closed - private boolean closed = false; + protected boolean closed = false; protected SegmentScanner(Segment segment, long readPoint) { this(segment, readPoint, DEFAULT_SCANNER_ORDER); @@ -74,7 +74,7 @@ public class SegmentScanner implements KeyValueScanner { this.segment.incScannerCount(); iter = segment.iterator(); // the initialization of the current is required for working with heap of SegmentScanners - current = getNext(); + updateCurrent(); this.scannerOrder = scannerOrder; if (current == null) { // nothing to fetch from this scanner @@ -108,7 +108,7 @@ public class SegmentScanner implements KeyValueScanner { return null; } Cell oldCurrent = current; - current = getNext(); // update the currently observed Cell + updateCurrent(); // update the currently observed Cell return oldCurrent; } @@ -127,13 +127,17 @@ public class SegmentScanner implements KeyValueScanner { return false; } // restart the iterator from new key - iter = segment.tailSet(cell).iterator(); + iter = getIterator(cell); // last is going to be reinitialized in the next getNext() call last = null; - current = getNext(); + updateCurrent(); return (current != null); } + protected Iterator<Cell> getIterator(Cell cell) { + return segment.tailSet(cell).iterator(); + } + /** * Reseek the scanner at or after the specified KeyValue. * This method is guaranteed to seek at or after the required key only if the @@ -156,8 +160,8 @@ public class SegmentScanner implements KeyValueScanner { get it. So we remember the last keys we iterated to and restore the reseeked set to at least that point. */ - iter = segment.tailSet(getHighest(cell, last)).iterator(); - current = getNext(); + iter = getIterator(getHighest(cell, last)); + updateCurrent(); return (current != null); } @@ -355,7 +359,7 @@ public class SegmentScanner implements KeyValueScanner { * Private internal method for iterating over the segment, * skipping the cells with irrelevant MVCC */ - private Cell getNext() { + protected void updateCurrent() { Cell startKV = current; Cell next = null; @@ -363,16 +367,18 @@ public class SegmentScanner implements KeyValueScanner { while (iter.hasNext()) { next = iter.next(); if (next.getSequenceId() <= this.readPoint) { - return next; // skip irrelevant versions + current = next; + return;// skip irrelevant versions } if (stopSkippingKVsIfNextRow && // for backwardSeek() stay in the startKV != null && // boundaries of a single row segment.compareRows(next, startKV) > 0) { - return null; + current = null; + return; } } // end of while - return null; // nothing found + current = null; // nothing found } finally { if (next != null) { // in all cases, remember the last KV we iterated to, needed for reseek() http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java new file mode 100644 index 0000000..6300e00 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; + +/** + * Scans the snapshot. Acts as a simple scanner that just iterates over all the cells + * in the segment + */ +@InterfaceAudience.Private +public class SnapshotScanner extends SegmentScanner { + + public SnapshotScanner(Segment immutableSegment) { + // Snapshot scanner does not need readpoint. It should read all the cells in the + // segment + super(immutableSegment, Long.MAX_VALUE); + } + + @Override + public Cell peek() { // sanity check, the current should be always valid + if (closed) { + return null; + } + return current; + } + + @Override + public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { + return true; + } + + @Override + public boolean backwardSeek(Cell key) throws IOException { + throw new NotImplementedException( + "backwardSeek must not be called on a " + "non-reversed scanner"); + } + + @Override + public boolean seekToPreviousRow(Cell key) throws IOException { + throw new NotImplementedException( + "seekToPreviousRow must not be called on a " + "non-reversed scanner"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new NotImplementedException( + "seekToLastRow must not be called on a " + "non-reversed scanner"); + } + + @Override + protected Iterator<Cell> getIterator(Cell cell) { + return segment.iterator(); + } + + @Override + protected void updateCurrent() { + if (iter.hasNext()) { + current = iter.next(); + } else { + current = null; + } + } + + @Override + public boolean seek(Cell seekCell) { + // restart iterator + iter = getIterator(seekCell); + return reseek(seekCell); + } + + @Override + public boolean reseek(Cell seekCell) { + while (iter.hasNext()) { + Cell next = iter.next(); + int ret = segment.getComparator().compare(next, seekCell); + if (ret >= 0) { + current = next; + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index b0b63a9..0aa2814 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -393,6 +393,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); + // close the scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -433,6 +435,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { List<KeyValueScanner> scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data + // close the scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -460,6 +464,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } // Since no opening scanner, the chunks of snapshot should be put back to // pool + // close the scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } @@ -527,6 +533,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Creating another snapshot MemStoreSnapshot snapshot = memstore.snapshot(); + // close the scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); snapshot = memstore.snapshot(); @@ -541,6 +549,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } // Since no opening scanner, the chunks of snapshot should be put back to // pool + // close the scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 56ae72e..bc6f982 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -387,6 +387,108 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } } + @Override + @Test + public void testPuttingBackChunksWithOpeningScanner() throws IOException { + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] qf4 = Bytes.toBytes("testqualifier4"); + byte[] qf5 = Bytes.toBytes("testqualifier5"); + byte[] qf6 = Bytes.toBytes("testqualifier6"); + byte[] qf7 = Bytes.toBytes("testqualifier7"); + byte[] val = Bytes.toBytes("testval"); + + // Setting up memstore + memstore.add(new KeyValue(row, fam, qf1, val), null); + memstore.add(new KeyValue(row, fam, qf2, val), null); + memstore.add(new KeyValue(row, fam, qf3, val), null); + + // Creating a snapshot + MemStoreSnapshot snapshot = memstore.snapshot(); + assertEquals(3, memstore.getSnapshot().getCellsCount()); + + // Adding value to "new" memstore + assertEquals(0, memstore.getActive().getCellsCount()); + memstore.add(new KeyValue(row, fam, qf4, val), null); + memstore.add(new KeyValue(row, fam, qf5, val), null); + assertEquals(2, memstore.getActive().getCellsCount()); + + // opening scanner before clear the snapshot + List<KeyValueScanner> scanners = memstore.getScanners(0); + // Shouldn't putting back the chunks to pool,since some scanners are opening + // based on their data + // close the scanner + snapshot.getScanner().close(); + memstore.clearSnapshot(snapshot.getId()); + + assertTrue(chunkPool.getPoolSize() == 0); + + // Chunks will be put back to pool after close scanners; + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + assertTrue(chunkPool.getPoolSize() > 0); + + // clear chunks + chunkPool.clearChunks(); + + // Creating another snapshot + + snapshot = memstore.snapshot(); + // Adding more value + memstore.add(new KeyValue(row, fam, qf6, val), null); + memstore.add(new KeyValue(row, fam, qf7, val), null); + // opening scanners + scanners = memstore.getScanners(0); + // close scanners before clear the snapshot + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + // Since no opening scanner, the chunks of snapshot should be put back to + // pool + // close the scanner + snapshot.getScanner().close(); + memstore.clearSnapshot(snapshot.getId()); + assertTrue(chunkPool.getPoolSize() > 0); + } + + @Test + public void testPuttingBackChunksAfterFlushing() throws IOException { + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] qf4 = Bytes.toBytes("testqualifier4"); + byte[] qf5 = Bytes.toBytes("testqualifier5"); + byte[] val = Bytes.toBytes("testval"); + + // Setting up memstore + memstore.add(new KeyValue(row, fam, qf1, val), null); + memstore.add(new KeyValue(row, fam, qf2, val), null); + memstore.add(new KeyValue(row, fam, qf3, val), null); + + // Creating a snapshot + MemStoreSnapshot snapshot = memstore.snapshot(); + assertEquals(3, memstore.getSnapshot().getCellsCount()); + + // Adding value to "new" memstore + assertEquals(0, memstore.getActive().getCellsCount()); + memstore.add(new KeyValue(row, fam, qf4, val), null); + memstore.add(new KeyValue(row, fam, qf5, val), null); + assertEquals(2, memstore.getActive().getCellsCount()); + // close the scanner + snapshot.getScanner().close(); + memstore.clearSnapshot(snapshot.getId()); + + int chunkCount = chunkPool.getPoolSize(); + assertTrue(chunkCount > 0); + } + + private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) { byte[] fam = Bytes.toBytes("testfamily"); byte[] qf = Bytes.toBytes("testqualifier"); http://git-wip-us.apache.org/repos/asf/hbase/blob/f65a439f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index e2ba169..49b5139 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -135,6 +135,8 @@ public class TestMemStoreChunkPool { memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); + // close the scanner - this is how the snapshot will be used + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -177,6 +179,8 @@ public class TestMemStoreChunkPool { List<KeyValueScanner> scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data + // close the snapshot scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -203,6 +207,8 @@ public class TestMemStoreChunkPool { } // Since no opening scanner, the chunks of snapshot should be put back to // pool + // close the snapshot scanner + snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); }