Author: rawson Date: Sat May 15 00:28:24 2010 New Revision: 944534 URL: http://svn.apache.org/viewvc?rev=944534&view=rev Log: Make TestHRegion pass by porting the RegionScanner implementation from 0.20
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=944534&r1=944533&r2=944534&view=diff ============================================================================== --- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat May 15 00:28:24 2010 @@ -1,4 +1,4 @@ -/** +/* * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one @@ -54,27 +54,25 @@ import org.apache.hadoop.hbase.util.Writ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; - import java.io.IOException; - import java.io.UnsupportedEncodingException; - import java.lang.reflect.Constructor; - import java.util.AbstractList; - import java.util.ArrayList; - import java.util.Collection; - import java.util.HashMap; - import java.util.List; - import java.util.Map; - import java.util.Set; - import java.util.NavigableSet; - import java.util.TreeMap; - import java.util.TreeSet; - import java.util.HashMap; - import java.util.HashSet; - import java.util.Random; - import java.util.concurrent.Callable; - import java.util.concurrent.ConcurrentSkipListMap; - import java.util.concurrent.atomic.AtomicBoolean; - import java.util.concurrent.atomic.AtomicLong; - import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Constructor; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -1937,7 +1935,6 @@ public class HRegion implements HConstan //DebugPrint.println("HRegionScanner.<init>"); this.filter = scan.getFilter(); this.batch = scan.getBatch(); - if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { this.stopRow = null; } else { @@ -1969,7 +1966,6 @@ public class HRegion implements HConstan new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator); } - private void resetFilters() { if (filter != null) { filter.reset(); @@ -2025,70 +2021,64 @@ public class HRegion implements HConstan return this.filter != null && this.filter.filterAllRemaining(); } - /* - * @return true if there are more rows, false if scanner is done - * @throws IOException - */ private boolean nextInternal(int limit) throws IOException { - byte [] currentRow = null; - boolean filterCurrentRow = false; while (true) { - KeyValue kv = this.storeHeap.peek(); - if (kv == null) return false; - byte [] row = kv.getRow(); - boolean samerow = Bytes.equals(currentRow, row); - if (samerow && filterCurrentRow) { - // Filter all columns until row changes - readAndDumpCurrentResult(); - continue; - } - if (!samerow) { - // Continue on the next row: - currentRow = row; - filterCurrentRow = false; - // See if we passed stopRow - if (this.stopRow != null && - comparator.compareRows(this.stopRow, 0, this.stopRow.length, - currentRow, 0, currentRow.length) <= this.isScan) { - return false; + byte [] currentRow = peekRow(); + if (isStopRow(currentRow)) { + return false; + } else if (filterRowKey(currentRow)) { + nextRow(currentRow); + } else { + byte [] nextRow; + do { + this.storeHeap.next(results, limit); + if (limit > 0 && results.size() == limit) { + return true; + } + } while (Bytes.equals(currentRow, nextRow = peekRow())); + + final boolean stopRow = isStopRow(nextRow); + if (!stopRow && (results.isEmpty() || filterRow())) { + // this seems like a redundant step - we already consumed the row + // there're no left overs. + // the reasons for calling this method are: + // 1. reset the filters. + // 2. provide a hook to fast forward the row (used by subclasses) + nextRow(currentRow); + continue; } - if (hasResults()) return true; - } - // See if current row should be filtered based on row key - if (this.filter != null && this.filter.filterRowKey(row, 0, row.length)) { - readAndDumpCurrentResult(); - resetFilters(); - filterCurrentRow = true; - currentRow = row; - continue; - } - this.storeHeap.next(results, limit); - if (limit > 0 && results.size() == limit) { - return true; + return !stopRow; } } } - private void readAndDumpCurrentResult() throws IOException { - this.storeHeap.next(this.results); - this.results.clear(); + private boolean filterRow() { + return filter != null + && filter.filterRow(); + } + private boolean filterRowKey(byte[] row) { + return filter != null + && filter.filterRowKey(row, 0, row.length); } - /* - * Do we have results to return or should we continue. Call when we get to - * the end of a row. Does house cleaning -- clearing results and resetting - * filters -- if we are to continue. - * @return True if we should return else false if need to keep going. - */ - private boolean hasResults() { - if (this.results.isEmpty() || - this.filter != null && this.filter.filterRow()) { - // Make sure results is empty, reset filters - this.results.clear(); - resetFilters(); - return false; + protected void nextRow(byte [] currentRow) throws IOException { + while (Bytes.equals(currentRow, peekRow())) { + this.storeHeap.next(MOCKED_LIST); } - return true; + results.clear(); + resetFilters(); + } + + private byte[] peekRow() { + KeyValue kv = this.storeHeap.peek(); + return kv == null ? null : kv.getRow(); + } + + private boolean isStopRow(byte [] currentRow) { + return currentRow == null || + (stopRow != null && + comparator.compareRows(stopRow, 0, stopRow.length, + currentRow, 0, currentRow.length) <= isScan); } public synchronized void close() { @@ -2096,14 +2086,7 @@ public class HRegion implements HConstan storeHeap.close(); storeHeap = null; } - this.filterClosed = true; - } - - /** - * @return the current storeHeap - */ - public synchronized KeyValueHeap getStoreHeap() { - return this.storeHeap; + this.filterClosed = true; } } @@ -2829,6 +2812,33 @@ public class HRegion implements HConstan } /** + * A mocked list implementaion - discards all updates. + */ + private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() { + + @Override + public void add(int index, KeyValue element) { + // do nothing + } + + @Override + public boolean addAll(int index, Collection<? extends KeyValue> c) { + return false; // this list is never changed as a result of an update + } + + @Override + public KeyValue get(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + return 0; + } + }; + + + /** * Facility for dumping and compacting catalog tables. * Only does catalog tables since these are only tables we for sure know * schema on. For usage run: