Repository: hbase Updated Branches: refs/heads/master 294c2dae9 -> 9b1ecb31f
HBASE-16143 Change MemstoreScanner constructor to accept List<KeyValueScanner> (Ram) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9b1ecb31 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9b1ecb31 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9b1ecb31 Branch: refs/heads/master Commit: 9b1ecb31f0fdab6208d376fd06ebecdc1d916812 Parents: 294c2da Author: Ramkrishna <[email protected]> Authored: Wed Jun 29 23:10:55 2016 +0530 Committer: Ramkrishna <[email protected]> Committed: Wed Jun 29 23:10:55 2016 +0530 ---------------------------------------------------------------------- .../hbase/regionserver/AbstractMemStore.java | 16 -------- .../hbase/regionserver/CompactingMemStore.java | 13 ++++-- .../hbase/regionserver/DefaultMemStore.java | 16 ++++---- .../hbase/regionserver/MemStoreCompactor.java | 2 +- .../hbase/regionserver/MemStoreScanner.java | 42 +++++++++++--------- .../hbase/regionserver/SegmentScanner.java | 10 +---- 6 files changed, 43 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9b1ecb31/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 0f27e0e..d25f960 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; @@ -192,14 +191,6 @@ public abstract class AbstractMemStore implements MemStore { return getActive().getSize(); } - /** - * @return a list containing a single memstore scanner. - */ - @Override - public List<KeyValueScanner> getScanners(long readPt) throws IOException { - return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(this, readPt)); - } - @Override public long getSnapshotSize() { return getSnapshot().getSize(); @@ -441,13 +432,6 @@ public abstract class AbstractMemStore implements MemStore { protected abstract void checkActiveSize(); /** - * Returns a list of Store segment scanners, one per each store segment - * @param readPt the version number required to initialize the scanners - * @return a list of Store segment scanners, one per each store segment - */ - protected abstract List<SegmentScanner> getListOfScanners(long readPt) throws IOException; - - /** * Returns an ordered list of segments from most recent to oldest in memstore * @return an ordered list of segments from most recent to oldest in memstore */ http://git-wip-us.apache.org/repos/asf/hbase/blob/9b1ecb31/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index ec5684d..cd923f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; @@ -226,17 +228,20 @@ public class CompactingMemStore extends AbstractMemStore { /* * Scanners are ordered from 0 (oldest) to newest in increasing order. */ - protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException { + public List<KeyValueScanner> getScanners(long readPt) throws IOException { List<Segment> pipelineList = pipeline.getSegments(); long order = pipelineList.size(); - LinkedList<SegmentScanner> list = new LinkedList<SegmentScanner>(); - list.add(getActive().getSegmentScanner(readPt, order+1)); + // The list of elements in pipeline + the active element + the snapshot segment + // TODO : This will change when the snapshot is made of more than one element + List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2); + list.add(getActive().getSegmentScanner(readPt, order + 1)); for (Segment item : pipelineList) { list.add(item.getSegmentScanner(readPt, order)); order--; } list.add(getSnapshot().getSegmentScanner(readPt, order)); - return list; + return Collections.<KeyValueScanner> singletonList( + new MemStoreScanner((AbstractMemStore) this, list, readPt)); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/9b1ecb31/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index cdc910e..c21dbb5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -113,18 +114,19 @@ public class DefaultMemStore extends AbstractMemStore { /* * Scanners are ordered from 0 (oldest) to newest in increasing order. */ - protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException { - List<SegmentScanner> list = new ArrayList<SegmentScanner>(2); - list.add(0, getActive().getSegmentScanner(readPt, 1)); - list.add(1, getSnapshot().getSegmentScanner(readPt, 0)); - return list; + public List<KeyValueScanner> getScanners(long readPt) throws IOException { + List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(2); + list.add(getActive().getSegmentScanner(readPt, 1)); + list.add(getSnapshot().getSegmentScanner(readPt, 0)); + return Collections.<KeyValueScanner> singletonList( + new MemStoreScanner((AbstractMemStore) this, list, readPt)); } @Override protected List<Segment> getSegments() throws IOException { List<Segment> list = new ArrayList<Segment>(2); - list.add(0, getActive()); - list.add(1, getSnapshot()); + list.add(getActive()); + list.add(getSnapshot()); return list; } http://git-wip-us.apache.org/repos/asf/hbase/blob/9b1ecb31/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 5b2876d..691ebb9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -67,7 +67,7 @@ class MemStoreCompactor { public boolean startCompaction() throws IOException { if (!compactingMemStore.hasCompactibleSegments()) return false; // no compaction on empty - List<SegmentScanner> scanners = new ArrayList<SegmentScanner>(); + List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); // get the list of segments from the pipeline versionedList = compactingMemStore.getCompactibleSegments(); // the list is marked with specific version http://git-wip-us.apache.org/repos/asf/hbase/blob/9b1ecb31/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index 01a7ff3..3d31d2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -57,35 +57,39 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { private long readPoint; // remember the initial version of the scanners list - List<SegmentScanner> scanners; + List<KeyValueScanner> scanners; // pointer back to the relevant MemStore // is needed for shouldSeek() method private AbstractMemStore backwardReferenceToMemStore; /** - * Constructor. * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default! * After constructor only one heap is going to be initialized for entire lifespan * of the MemStoreScanner. A specific scanner can only be one directional! * * @param ms Pointer back to the MemStore - * @param readPoint Read point below which we can safely remove duplicate KVs - * @param type The scan type COMPACT_FORWARD should be used for compaction + * @param scanners List of scanners over the segments + * @param readPt Read point below which we can safely remove duplicate KVs */ - public MemStoreScanner(AbstractMemStore ms, long readPoint, Type type) throws IOException { - this(ms, ms.getListOfScanners(readPoint), readPoint, type); - } - - /* Constructor used only when the scan usage is unknown - and need to be defined according to the first move */ - public MemStoreScanner(AbstractMemStore ms, long readPt) throws IOException { - this(ms, readPt, Type.UNDEFINED); + public MemStoreScanner(AbstractMemStore ms, List<KeyValueScanner> scanners, long readPt) + throws IOException { + this(ms, scanners, readPt, Type.UNDEFINED); } - public MemStoreScanner(AbstractMemStore ms, List<SegmentScanner> scanners, long readPoint, + /** + * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default! + * After constructor only one heap is going to be initialized for entire lifespan + * of the MemStoreScanner. A specific scanner can only be one directional! + * + * @param ms Pointer back to the MemStore + * @param scanners List of scanners over the segments + * @param readPt Read point below which we can safely remove duplicate KVs + * @param type The scan type COMPACT_FORWARD should be used for compaction + */ + public MemStoreScanner(AbstractMemStore ms, List<KeyValueScanner> scanners, long readPt, Type type) throws IOException { super(); - this.readPoint = readPoint; + this.readPoint = readPt; this.type = type; switch (type) { case UNDEFINED: @@ -262,8 +266,8 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { return true; } - for (SegmentScanner sc : scanners) { - if (sc.shouldSeek(scan, oldestUnexpiredTS)) { + for (KeyValueScanner sc : scanners) { + if (sc.shouldUseScanner(scan, store, oldestUnexpiredTS)) { return true; } } @@ -275,7 +279,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { public String toString() { StringBuffer buf = new StringBuffer(); int i = 1; - for (SegmentScanner scanner : scanners) { + for (KeyValueScanner scanner : scanners) { buf.append("scanner (" + i + ") " + scanner.toString() + " ||| "); i++; } @@ -289,7 +293,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { */ private boolean restartBackwardHeap(Cell cell) throws IOException { boolean res = false; - for (SegmentScanner scan : scanners) { + for (KeyValueScanner scan : scanners) { res |= scan.seekToPreviousRow(cell); } this.backwardHeap = @@ -315,7 +319,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { forwardHeap = null; // before building the heap seek for the relevant key on the scanners, // for the heap to be built from the scanners correctly - for (SegmentScanner scan : scanners) { + for (KeyValueScanner scan : scanners) { if (toLast) { res |= scan.seekToLastRow(); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/9b1ecb31/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index a04c1da..1191f30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -243,7 +243,7 @@ public class SegmentScanner implements KeyValueScanner { */ @Override public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - return true; + return getSegment().shouldSeek(scan,oldestUnexpiredTS); } /** * This scanner is working solely on the in-memory MemStore therefore this @@ -305,14 +305,6 @@ public class SegmentScanner implements KeyValueScanner { // do nothing } - /** - * Returns whether the given scan should seek in this segment - * @return whether the given scan should seek in this segment - */ - public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { - return getSegment().shouldSeek(scan,oldestUnexpiredTS); - } - protected Segment getSegment(){ return segment; }
