Repository: hbase-site Updated Branches: refs/heads/asf-site e01eeb1a0 -> 1d9053bc9
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/1d9053bc/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.html index 933bcca..7140dbd 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.html @@ -2844,17 +2844,17 @@ <span class="sourceLineNo">2836</span> //////////////////////////////////////////////////////////////////////////////<a name="line.2836"></a> <span class="sourceLineNo">2837</span><a name="line.2837"></a> <span class="sourceLineNo">2838</span> @Override<a name="line.2838"></a> -<span class="sourceLineNo">2839</span> public RegionScanner getScanner(Scan scan) throws IOException {<a name="line.2839"></a> +<span class="sourceLineNo">2839</span> public RegionScannerImpl getScanner(Scan scan) throws IOException {<a name="line.2839"></a> <span class="sourceLineNo">2840</span> return getScanner(scan, null);<a name="line.2840"></a> <span class="sourceLineNo">2841</span> }<a name="line.2841"></a> <span class="sourceLineNo">2842</span><a name="line.2842"></a> <span class="sourceLineNo">2843</span> @Override<a name="line.2843"></a> -<span class="sourceLineNo">2844</span> public RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners)<a name="line.2844"></a> +<span class="sourceLineNo">2844</span> public RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners)<a name="line.2844"></a> <span class="sourceLineNo">2845</span> throws IOException {<a name="line.2845"></a> <span class="sourceLineNo">2846</span> return getScanner(scan, additionalScanners, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.2846"></a> <span class="sourceLineNo">2847</span> }<a name="line.2847"></a> <span class="sourceLineNo">2848</span><a name="line.2848"></a> -<span class="sourceLineNo">2849</span> private RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners,<a name="line.2849"></a> +<span class="sourceLineNo">2849</span> private RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners,<a name="line.2849"></a> <span class="sourceLineNo">2850</span> long nonceGroup, long nonce) throws IOException {<a name="line.2850"></a> <span class="sourceLineNo">2851</span> startRegionOperation(Operation.SCAN);<a name="line.2851"></a> <span class="sourceLineNo">2852</span> try {<a name="line.2852"></a> @@ -2881,7 +2881,7 @@ <span class="sourceLineNo">2873</span> HConstants.NO_NONCE);<a name="line.2873"></a> <span class="sourceLineNo">2874</span> }<a name="line.2874"></a> <span class="sourceLineNo">2875</span><a name="line.2875"></a> -<span class="sourceLineNo">2876</span> protected RegionScanner instantiateRegionScanner(Scan scan,<a name="line.2876"></a> +<span class="sourceLineNo">2876</span> protected RegionScannerImpl instantiateRegionScanner(Scan scan,<a name="line.2876"></a> <span class="sourceLineNo">2877</span> List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {<a name="line.2877"></a> <span class="sourceLineNo">2878</span> if (scan.isReversed()) {<a name="line.2878"></a> <span class="sourceLineNo">2879</span> if (scan.getFilter() != null) {<a name="line.2879"></a> @@ -5874,2405 +5874,2407 @@ <span class="sourceLineNo">5866</span> /**<a name="line.5866"></a> <span class="sourceLineNo">5867</span> * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).<a name="line.5867"></a> <span class="sourceLineNo">5868</span> */<a name="line.5868"></a> -<span class="sourceLineNo">5869</span> class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {<a name="line.5869"></a> -<span class="sourceLineNo">5870</span> // Package local for testability<a name="line.5870"></a> -<span class="sourceLineNo">5871</span> KeyValueHeap storeHeap = null;<a name="line.5871"></a> -<span class="sourceLineNo">5872</span> /** Heap of key-values that are not essential for the provided filters and are thus read<a name="line.5872"></a> -<span class="sourceLineNo">5873</span> * on demand, if on-demand column family loading is enabled.*/<a name="line.5873"></a> -<span class="sourceLineNo">5874</span> KeyValueHeap joinedHeap = null;<a name="line.5874"></a> -<span class="sourceLineNo">5875</span> /**<a name="line.5875"></a> -<span class="sourceLineNo">5876</span> * If the joined heap data gathering is interrupted due to scan limits, this will<a name="line.5876"></a> -<span class="sourceLineNo">5877</span> * contain the row for which we are populating the values.*/<a name="line.5877"></a> -<span class="sourceLineNo">5878</span> protected Cell joinedContinuationRow = null;<a name="line.5878"></a> -<span class="sourceLineNo">5879</span> private boolean filterClosed = false;<a name="line.5879"></a> -<span class="sourceLineNo">5880</span><a name="line.5880"></a> -<span class="sourceLineNo">5881</span> protected final byte[] stopRow;<a name="line.5881"></a> -<span class="sourceLineNo">5882</span> protected final boolean includeStopRow;<a name="line.5882"></a> -<span class="sourceLineNo">5883</span> protected final HRegion region;<a name="line.5883"></a> -<span class="sourceLineNo">5884</span> protected final CellComparator comparator;<a name="line.5884"></a> -<span class="sourceLineNo">5885</span><a name="line.5885"></a> -<span class="sourceLineNo">5886</span> private final long readPt;<a name="line.5886"></a> -<span class="sourceLineNo">5887</span> private final long maxResultSize;<a name="line.5887"></a> -<span class="sourceLineNo">5888</span> private final ScannerContext defaultScannerContext;<a name="line.5888"></a> -<span class="sourceLineNo">5889</span> private final FilterWrapper filter;<a name="line.5889"></a> -<span class="sourceLineNo">5890</span><a name="line.5890"></a> -<span class="sourceLineNo">5891</span> @Override<a name="line.5891"></a> -<span class="sourceLineNo">5892</span> public RegionInfo getRegionInfo() {<a name="line.5892"></a> -<span class="sourceLineNo">5893</span> return region.getRegionInfo();<a name="line.5893"></a> -<span class="sourceLineNo">5894</span> }<a name="line.5894"></a> -<span class="sourceLineNo">5895</span><a name="line.5895"></a> -<span class="sourceLineNo">5896</span> RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)<a name="line.5896"></a> -<span class="sourceLineNo">5897</span> throws IOException {<a name="line.5897"></a> -<span class="sourceLineNo">5898</span> this(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.5898"></a> -<span class="sourceLineNo">5899</span> }<a name="line.5899"></a> -<span class="sourceLineNo">5900</span><a name="line.5900"></a> -<span class="sourceLineNo">5901</span> RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,<a name="line.5901"></a> -<span class="sourceLineNo">5902</span> long nonceGroup, long nonce) throws IOException {<a name="line.5902"></a> -<span class="sourceLineNo">5903</span> this.region = region;<a name="line.5903"></a> -<span class="sourceLineNo">5904</span> this.maxResultSize = scan.getMaxResultSize();<a name="line.5904"></a> -<span class="sourceLineNo">5905</span> if (scan.hasFilter()) {<a name="line.5905"></a> -<span class="sourceLineNo">5906</span> this.filter = new FilterWrapper(scan.getFilter());<a name="line.5906"></a> -<span class="sourceLineNo">5907</span> } else {<a name="line.5907"></a> -<span class="sourceLineNo">5908</span> this.filter = null;<a name="line.5908"></a> -<span class="sourceLineNo">5909</span> }<a name="line.5909"></a> -<span class="sourceLineNo">5910</span> this.comparator = region.getCellComparator();<a name="line.5910"></a> -<span class="sourceLineNo">5911</span> /**<a name="line.5911"></a> -<span class="sourceLineNo">5912</span> * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default<a name="line.5912"></a> -<span class="sourceLineNo">5913</span> * scanner context that can be used to enforce the batch limit in the event that a<a name="line.5913"></a> -<span class="sourceLineNo">5914</span> * ScannerContext is not specified during an invocation of next/nextRaw<a name="line.5914"></a> -<span class="sourceLineNo">5915</span> */<a name="line.5915"></a> -<span class="sourceLineNo">5916</span> defaultScannerContext = ScannerContext.newBuilder()<a name="line.5916"></a> -<span class="sourceLineNo">5917</span> .setBatchLimit(scan.getBatch()).build();<a name="line.5917"></a> -<span class="sourceLineNo">5918</span> this.stopRow = scan.getStopRow();<a name="line.5918"></a> -<span class="sourceLineNo">5919</span> this.includeStopRow = scan.includeStopRow();<a name="line.5919"></a> -<span class="sourceLineNo">5920</span><a name="line.5920"></a> -<span class="sourceLineNo">5921</span> // synchronize on scannerReadPoints so that nobody calculates<a name="line.5921"></a> -<span class="sourceLineNo">5922</span> // getSmallestReadPoint, before scannerReadPoints is updated.<a name="line.5922"></a> -<span class="sourceLineNo">5923</span> IsolationLevel isolationLevel = scan.getIsolationLevel();<a name="line.5923"></a> -<span class="sourceLineNo">5924</span> long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);<a name="line.5924"></a> -<span class="sourceLineNo">5925</span> synchronized (scannerReadPoints) {<a name="line.5925"></a> -<span class="sourceLineNo">5926</span> if (mvccReadPoint > 0) {<a name="line.5926"></a> -<span class="sourceLineNo">5927</span> this.readPt = mvccReadPoint;<a name="line.5927"></a> -<span class="sourceLineNo">5928</span> } else if (nonce == HConstants.NO_NONCE || rsServices == null<a name="line.5928"></a> -<span class="sourceLineNo">5929</span> || rsServices.getNonceManager() == null) {<a name="line.5929"></a> -<span class="sourceLineNo">5930</span> this.readPt = getReadPoint(isolationLevel);<a name="line.5930"></a> -<span class="sourceLineNo">5931</span> } else {<a name="line.5931"></a> -<span class="sourceLineNo">5932</span> this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);<a name="line.5932"></a> -<span class="sourceLineNo">5933</span> }<a name="line.5933"></a> -<span class="sourceLineNo">5934</span> scannerReadPoints.put(this, this.readPt);<a name="line.5934"></a> -<span class="sourceLineNo">5935</span> }<a name="line.5935"></a> -<span class="sourceLineNo">5936</span> initializeScanners(scan, additionalScanners);<a name="line.5936"></a> -<span class="sourceLineNo">5937</span> }<a name="line.5937"></a> -<span class="sourceLineNo">5938</span><a name="line.5938"></a> -<span class="sourceLineNo">5939</span> protected void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)<a name="line.5939"></a> -<span class="sourceLineNo">5940</span> throws IOException {<a name="line.5940"></a> -<span class="sourceLineNo">5941</span> // Here we separate all scanners into two lists - scanner that provide data required<a name="line.5941"></a> -<span class="sourceLineNo">5942</span> // by the filter to operate (scanners list) and all others (joinedScanners list).<a name="line.5942"></a> -<span class="sourceLineNo">5943</span> List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());<a name="line.5943"></a> -<span class="sourceLineNo">5944</span> List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());<a name="line.5944"></a> -<span class="sourceLineNo">5945</span> // Store all already instantiated scanners for exception handling<a name="line.5945"></a> -<span class="sourceLineNo">5946</span> List<KeyValueScanner> instantiatedScanners = new ArrayList<>();<a name="line.5946"></a> -<span class="sourceLineNo">5947</span> // handle additionalScanners<a name="line.5947"></a> -<span class="sourceLineNo">5948</span> if (additionalScanners != null && !additionalScanners.isEmpty()) {<a name="line.5948"></a> -<span class="sourceLineNo">5949</span> scanners.addAll(additionalScanners);<a name="line.5949"></a> -<span class="sourceLineNo">5950</span> instantiatedScanners.addAll(additionalScanners);<a name="line.5950"></a> -<span class="sourceLineNo">5951</span> }<a name="line.5951"></a> -<span class="sourceLineNo">5952</span><a name="line.5952"></a> -<span class="sourceLineNo">5953</span> try {<a name="line.5953"></a> -<span class="sourceLineNo">5954</span> for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {<a name="line.5954"></a> -<span class="sourceLineNo">5955</span> HStore store = stores.get(entry.getKey());<a name="line.5955"></a> -<span class="sourceLineNo">5956</span> KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);<a name="line.5956"></a> -<span class="sourceLineNo">5957</span> instantiatedScanners.add(scanner);<a name="line.5957"></a> -<span class="sourceLineNo">5958</span> if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()<a name="line.5958"></a> -<span class="sourceLineNo">5959</span> || this.filter.isFamilyEssential(entry.getKey())) {<a name="line.5959"></a> -<span class="sourceLineNo">5960</span> scanners.add(scanner);<a name="line.5960"></a> -<span class="sourceLineNo">5961</span> } else {<a name="line.5961"></a> -<span class="sourceLineNo">5962</span> joinedScanners.add(scanner);<a name="line.5962"></a> -<span class="sourceLineNo">5963</span> }<a name="line.5963"></a> -<span class="sourceLineNo">5964</span> }<a name="line.5964"></a> -<span class="sourceLineNo">5965</span> initializeKVHeap(scanners, joinedScanners, region);<a name="line.5965"></a> -<span class="sourceLineNo">5966</span> } catch (Throwable t) {<a name="line.5966"></a> -<span class="sourceLineNo">5967</span> throw handleException(instantiatedScanners, t);<a name="line.5967"></a> -<span class="sourceLineNo">5968</span> }<a name="line.5968"></a> -<span class="sourceLineNo">5969</span> }<a name="line.5969"></a> -<span class="sourceLineNo">5970</span><a name="line.5970"></a> -<span class="sourceLineNo">5971</span> protected void initializeKVHeap(List<KeyValueScanner> scanners,<a name="line.5971"></a> -<span class="sourceLineNo">5972</span> List<KeyValueScanner> joinedScanners, HRegion region)<a name="line.5972"></a> -<span class="sourceLineNo">5973</span> throws IOException {<a name="line.5973"></a> -<span class="sourceLineNo">5974</span> this.storeHeap = new KeyValueHeap(scanners, comparator);<a name="line.5974"></a> -<span class="sourceLineNo">5975</span> if (!joinedScanners.isEmpty()) {<a name="line.5975"></a> -<span class="sourceLineNo">5976</span> this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);<a name="line.5976"></a> -<span class="sourceLineNo">5977</span> }<a name="line.5977"></a> -<span class="sourceLineNo">5978</span> }<a name="line.5978"></a> -<span class="sourceLineNo">5979</span><a name="line.5979"></a> -<span class="sourceLineNo">5980</span> private IOException handleException(List<KeyValueScanner> instantiatedScanners,<a name="line.5980"></a> -<span class="sourceLineNo">5981</span> Throwable t) {<a name="line.5981"></a> -<span class="sourceLineNo">5982</span> // remove scaner read point before throw the exception<a name="line.5982"></a> -<span class="sourceLineNo">5983</span> scannerReadPoints.remove(this);<a name="line.5983"></a> -<span class="sourceLineNo">5984</span> if (storeHeap != null) {<a name="line.5984"></a> -<span class="sourceLineNo">5985</span> storeHeap.close();<a name="line.5985"></a> -<span class="sourceLineNo">5986</span> storeHeap = null;<a name="line.5986"></a> -<span class="sourceLineNo">5987</span> if (joinedHeap != null) {<a name="line.5987"></a> -<span class="sourceLineNo">5988</span> joinedHeap.close();<a name="line.5988"></a> -<span class="sourceLineNo">5989</span> joinedHeap = null;<a name="line.5989"></a> -<span class="sourceLineNo">5990</span> }<a name="line.5990"></a> -<span class="sourceLineNo">5991</span> } else {<a name="line.5991"></a> -<span class="sourceLineNo">5992</span> // close all already instantiated scanners before throwing the exception<a name="line.5992"></a> -<span class="sourceLineNo">5993</span> for (KeyValueScanner scanner : instantiatedScanners) {<a name="line.5993"></a> -<span class="sourceLineNo">5994</span> scanner.close();<a name="line.5994"></a> -<span class="sourceLineNo">5995</span> }<a name="line.5995"></a> -<span class="sourceLineNo">5996</span> }<a name="line.5996"></a> -<span class="sourceLineNo">5997</span> return t instanceof IOException ? (IOException) t : new IOException(t);<a name="line.5997"></a> -<span class="sourceLineNo">5998</span> }<a name="line.5998"></a> -<span class="sourceLineNo">5999</span><a name="line.5999"></a> -<span class="sourceLineNo">6000</span> @Override<a name="line.6000"></a> -<span class="sourceLineNo">6001</span> public long getMaxResultSize() {<a name="line.6001"></a> -<span class="sourceLineNo">6002</span> return maxResultSize;<a name="line.6002"></a> -<span class="sourceLineNo">6003</span> }<a name="line.6003"></a> -<span class="sourceLineNo">6004</span><a name="line.6004"></a> -<span class="sourceLineNo">6005</span> @Override<a name="line.6005"></a> -<span class="sourceLineNo">6006</span> public long getMvccReadPoint() {<a name="line.6006"></a> -<span class="sourceLineNo">6007</span> return this.readPt;<a name="line.6007"></a> -<span class="sourceLineNo">6008</span> }<a name="line.6008"></a> -<span class="sourceLineNo">6009</span><a name="line.6009"></a> -<span class="sourceLineNo">6010</span> @Override<a name="line.6010"></a> -<span class="sourceLineNo">6011</span> public int getBatch() {<a name="line.6011"></a> -<span class="sourceLineNo">6012</span> return this.defaultScannerContext.getBatchLimit();<a name="line.6012"></a> -<span class="sourceLineNo">6013</span> }<a name="line.6013"></a> -<span class="sourceLineNo">6014</span><a name="line.6014"></a> -<span class="sourceLineNo">6015</span> /**<a name="line.6015"></a> -<span class="sourceLineNo">6016</span> * Reset both the filter and the old filter.<a name="line.6016"></a> -<span class="sourceLineNo">6017</span> *<a name="line.6017"></a> -<span class="sourceLineNo">6018</span> * @throws IOException in case a filter raises an I/O exception.<a name="line.6018"></a> -<span class="sourceLineNo">6019</span> */<a name="line.6019"></a> -<span class="sourceLineNo">6020</span> protected void resetFilters() throws IOException {<a name="line.6020"></a> -<span class="sourceLineNo">6021</span> if (filter != null) {<a name="line.6021"></a> -<span class="sourceLineNo">6022</span> filter.reset();<a name="line.6022"></a> -<span class="sourceLineNo">6023</span> }<a name="line.6023"></a> -<span class="sourceLineNo">6024</span> }<a name="line.6024"></a> -<span class="sourceLineNo">6025</span><a name="line.6025"></a> -<span class="sourceLineNo">6026</span> @Override<a name="line.6026"></a> -<span class="sourceLineNo">6027</span> public boolean next(List<Cell> outResults)<a name="line.6027"></a> -<span class="sourceLineNo">6028</span> throws IOException {<a name="line.6028"></a> -<span class="sourceLineNo">6029</span> // apply the batching limit by default<a name="line.6029"></a> -<span class="sourceLineNo">6030</span> return next(outResults, defaultScannerContext);<a name="line.6030"></a> -<span class="sourceLineNo">6031</span> }<a name="line.6031"></a> -<span class="sourceLineNo">6032</span><a name="line.6032"></a> -<span class="sourceLineNo">6033</span> @Override<a name="line.6033"></a> -<span class="sourceLineNo">6034</span> public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)<a name="line.6034"></a> -<span class="sourceLineNo">6035</span> throws IOException {<a name="line.6035"></a> -<span class="sourceLineNo">6036</span> if (this.filterClosed) {<a name="line.6036"></a> -<span class="sourceLineNo">6037</span> throw new UnknownScannerException("Scanner was closed (timed out?) " +<a name="line.6037"></a> -<span class="sourceLineNo">6038</span> "after we renewed it. Could be caused by a very slow scanner " +<a name="line.6038"></a> -<span class="sourceLineNo">6039</span> "or a lengthy garbage collection");<a name="line.6039"></a> -<span class="sourceLineNo">6040</span> }<a name="line.6040"></a> -<span class="sourceLineNo">6041</span> startRegionOperation(Operation.SCAN);<a name="line.6041"></a> -<span class="sourceLineNo">6042</span> readRequestsCount.increment();<a name="line.6042"></a> -<span class="sourceLineNo">6043</span> try {<a name="line.6043"></a> -<span class="sourceLineNo">6044</span> return nextRaw(outResults, scannerContext);<a name="line.6044"></a> -<span class="sourceLineNo">6045</span> } finally {<a name="line.6045"></a> -<span class="sourceLineNo">6046</span> closeRegionOperation(Operation.SCAN);<a name="line.6046"></a> -<span class="sourceLineNo">6047</span> }<a name="line.6047"></a> -<span class="sourceLineNo">6048</span> }<a name="line.6048"></a> -<span class="sourceLineNo">6049</span><a name="line.6049"></a> -<span class="sourceLineNo">6050</span> @Override<a name="line.6050"></a> -<span class="sourceLineNo">6051</span> public boolean nextRaw(List<Cell> outResults) throws IOException {<a name="line.6051"></a> -<span class="sourceLineNo">6052</span> // Use the RegionScanner's context by default<a name="line.6052"></a> -<span class="sourceLineNo">6053</span> return nextRaw(outResults, defaultScannerContext);<a name="line.6053"></a> -<span class="sourceLineNo">6054</span> }<a name="line.6054"></a> -<span class="sourceLineNo">6055</span><a name="line.6055"></a> -<span class="sourceLineNo">6056</span> @Override<a name="line.6056"></a> -<span class="sourceLineNo">6057</span> public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)<a name="line.6057"></a> -<span class="sourceLineNo">6058</span> throws IOException {<a name="line.6058"></a> -<span class="sourceLineNo">6059</span> if (storeHeap == null) {<a name="line.6059"></a> -<span class="sourceLineNo">6060</span> // scanner is closed<a name="line.6060"></a> -<span class="sourceLineNo">6061</span> throw new UnknownScannerException("Scanner was closed");<a name="line.6061"></a> -<span class="sourceLineNo">6062</span> }<a name="line.6062"></a> -<span class="sourceLineNo">6063</span> boolean moreValues = false;<a name="line.6063"></a> -<span class="sourceLineNo">6064</span> if (outResults.isEmpty()) {<a name="line.6064"></a> -<span class="sourceLineNo">6065</span> // Usually outResults is empty. This is true when next is called<a name="line.6065"></a> -<span class="sourceLineNo">6066</span> // to handle scan or get operation.<a name="line.6066"></a> -<span class="sourceLineNo">6067</span> moreValues = nextInternal(outResults, scannerContext);<a name="line.6067"></a> -<span class="sourceLineNo">6068</span> } else {<a name="line.6068"></a> -<span class="sourceLineNo">6069</span> List<Cell> tmpList = new ArrayList<Cell>();<a name="line.6069"></a> -<span class="sourceLineNo">6070</span> moreValues = nextInternal(tmpList, scannerContext);<a name="line.6070"></a> -<span class="sourceLineNo">6071</span> outResults.addAll(tmpList);<a name="line.6071"></a> -<span class="sourceLineNo">6072</span> }<a name="line.6072"></a> -<span class="sourceLineNo">6073</span><a name="line.6073"></a> -<span class="sourceLineNo">6074</span> // If the size limit was reached it means a partial Result is being returned. Returning a<a name="line.6074"></a> -<span class="sourceLineNo">6075</span> // partial Result means that we should not reset the filters; filters should only be reset in<a name="line.6075"></a> -<span class="sourceLineNo">6076</span> // between rows<a name="line.6076"></a> -<span class="sourceLineNo">6077</span> if (!scannerContext.mayHaveMoreCellsInRow()) {<a name="line.6077"></a> -<span class="sourceLineNo">6078</span> resetFilters();<a name="line.6078"></a> -<span class="sourceLineNo">6079</span> }<a name="line.6079"></a> -<span class="sourceLineNo">6080</span><a name="line.6080"></a> -<span class="sourceLineNo">6081</span> if (isFilterDoneInternal()) {<a name="line.6081"></a> -<span class="sourceLineNo">6082</span> moreValues = false;<a name="line.6082"></a> -<span class="sourceLineNo">6083</span> }<a name="line.6083"></a> -<span class="sourceLineNo">6084</span> return moreValues;<a name="line.6084"></a> -<span class="sourceLineNo">6085</span> }<a name="line.6085"></a> -<span class="sourceLineNo">6086</span><a name="line.6086"></a> -<span class="sourceLineNo">6087</span> /**<a name="line.6087"></a> -<span class="sourceLineNo">6088</span> * @return true if more cells exist after this batch, false if scanner is done<a name="line.6088"></a> -<span class="sourceLineNo">6089</span> */<a name="line.6089"></a> -<span class="sourceLineNo">6090</span> private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)<a name="line.6090"></a> -<span class="sourceLineNo">6091</span> throws IOException {<a name="line.6091"></a> -<span class="sourceLineNo">6092</span> assert joinedContinuationRow != null;<a name="line.6092"></a> -<span class="sourceLineNo">6093</span> boolean moreValues = populateResult(results, this.joinedHeap, scannerContext,<a name="line.6093"></a> -<span class="sourceLineNo">6094</span> joinedContinuationRow);<a name="line.6094"></a> -<span class="sourceLineNo">6095</span><a name="line.6095"></a> -<span class="sourceLineNo">6096</span> if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.6096"></a> -<span class="sourceLineNo">6097</span> // We are done with this row, reset the continuation.<a name="line.6097"></a> -<span class="sourceLineNo">6098</span> joinedContinuationRow = null;<a name="line.6098"></a> -<span class="sourceLineNo">6099</span> }<a name="line.6099"></a> -<span class="sourceLineNo">6100</span> // As the data is obtained from two independent heaps, we need to<a name="line.6100"></a> -<span class="sourceLineNo">6101</span> // ensure that result list is sorted, because Result relies on that.<a name="line.6101"></a> -<span class="sourceLineNo">6102</span> sort(results, comparator);<a name="line.6102"></a> -<span class="sourceLineNo">6103</span> return moreValues;<a name="line.6103"></a> -<span class="sourceLineNo">6104</span> }<a name="line.6104"></a> -<span class="sourceLineNo">6105</span><a name="line.6105"></a> -<span class="sourceLineNo">6106</span> /**<a name="line.6106"></a> -<span class="sourceLineNo">6107</span> * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is<a name="line.6107"></a> -<span class="sourceLineNo">6108</span> * reached, or remainingResultSize (if not -1) is reaced<a name="line.6108"></a> -<span class="sourceLineNo">6109</span> * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.<a name="line.6109"></a> -<span class="sourceLineNo">6110</span> * @param scannerContext<a name="line.6110"></a> -<span class="sourceLineNo">6111</span> * @param currentRowCell<a name="line.6111"></a> -<span class="sourceLineNo">6112</span> * @return state of last call to {@link KeyValueHeap#next()}<a name="line.6112"></a> -<span class="sourceLineNo">6113</span> */<a name="line.6113"></a> -<span class="sourceLineNo">6114</span> private boolean populateResult(List<Cell> results, KeyValueHeap heap,<a name="line.6114"></a> -<span class="sourceLineNo">6115</span> ScannerContext scannerContext, Cell currentRowCell) throws IOException {<a name="line.6115"></a> -<span class="sourceLineNo">6116</span> Cell nextKv;<a name="line.6116"></a> -<span class="sourceLineNo">6117</span> boolean moreCellsInRow = false;<a name="line.6117"></a> -<span class="sourceLineNo">6118</span> boolean tmpKeepProgress = scannerContext.getKeepProgress();<a name="line.6118"></a> -<span class="sourceLineNo">6119</span> // Scanning between column families and thus the scope is between cells<a name="line.6119"></a> -<span class="sourceLineNo">6120</span> LimitScope limitScope = LimitScope.BETWEEN_CELLS;<a name="line.6120"></a> -<span class="sourceLineNo">6121</span> do {<a name="line.6121"></a> -<span class="sourceLineNo">6122</span> // We want to maintain any progress that is made towards the limits while scanning across<a name="line.6122"></a> -<span class="sourceLineNo">6123</span> // different column families. To do this, we toggle the keep progress flag on during calls<a name="line.6123"></a> -<span class="sourceLineNo">6124</span> // to the StoreScanner to ensure that any progress made thus far is not wiped away.<a name="line.6124"></a> -<span class="sourceLineNo">6125</span> scannerContext.setKeepProgress(true);<a name="line.6125"></a> -<span class="sourceLineNo">6126</span> heap.next(results, scannerContext);<a name="line.6126"></a> -<span class="sourceLineNo">6127</span> scannerContext.setKeepProgress(tmpKeepProgress);<a name="line.6127"></a> -<span class="sourceLineNo">6128</span><a name="line.6128"></a> -<span class="sourceLineNo">6129</span> nextKv = heap.peek();<a name="line.6129"></a> -<span class="sourceLineNo">6130</span> moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);<a name="line.6130"></a> -<span class="sourceLineNo">6131</span> if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);<a name="line.6131"></a> -<span class="sourceLineNo">6132</span> if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {<a name="line.6132"></a> -<span class="sourceLineNo">6133</span> return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();<a name="line.6133"></a> -<span class="sourceLineNo">6134</span> } else if (scannerContext.checkSizeLimit(limitScope)) {<a name="line.6134"></a> -<span class="sourceLineNo">6135</span> ScannerContext.NextState state =<a name="line.6135"></a> -<span class="sourceLineNo">6136</span> moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;<a name="line.6136"></a> -<span class="sourceLineNo">6137</span> return scannerContext.setScannerState(state).hasMoreValues();<a name="line.6137"></a> -<span class="sourceLineNo">6138</span> } else if (scannerContext.checkTimeLimit(limitScope)) {<a name="line.6138"></a> -<span class="sourceLineNo">6139</span> ScannerContext.NextState state =<a name="line.6139"></a> -<span class="sourceLineNo">6140</span> moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;<a name="line.6140"></a> -<span class="sourceLineNo">6141</span> return scannerContext.setScannerState(state).hasMoreValues();<a name="line.6141"></a> -<span class="sourceLineNo">6142</span> }<a name="line.6142"></a> -<span class="sourceLineNo">6143</span> } while (moreCellsInRow);<a name="line.6143"></a> -<span class="sourceLineNo">6144</span> return nextKv != null;<a name="line.6144"></a> -<span class="sourceLineNo">6145</span> }<a name="line.6145"></a> -<span class="sourceLineNo">6146</span><a name="line.6146"></a> -<span class="sourceLineNo">6147</span> /**<a name="line.6147"></a> -<span class="sourceLineNo">6148</span> * Based on the nextKv in the heap, and the current row, decide whether or not there are more<a name="line.6148"></a> -<span class="sourceLineNo">6149</span> * cells to be read in the heap. If the row of the nextKv in the heap matches the current row<a name="line.6149"></a> -<span class="sourceLineNo">6150</span> * then there are more cells to be read in the row.<a name="line.6150"></a> -<span class="sourceLineNo">6151</span> * @param nextKv<a name="line.6151"></a> -<span class="sourceLineNo">6152</span> * @param currentRowCell<a name="line.6152"></a> -<span class="sourceLineNo">6153</span> * @return true When there are more cells in the row to be read<a name="line.6153"></a> -<span class="sourceLineNo">6154</span> */<a name="line.6154"></a> -<span class="sourceLineNo">6155</span> private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {<a name="line.6155"></a> -<span class="sourceLineNo">6156</span> return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);<a name="line.6156"></a> -<span class="sourceLineNo">6157</span> }<a name="line.6157"></a> -<span class="sourceLineNo">6158</span><a name="line.6158"></a> -<span class="sourceLineNo">6159</span> /*<a name="line.6159"></a> -<span class="sourceLineNo">6160</span> * @return True if a filter rules the scanner is over, done.<a name="line.6160"></a> -<span class="sourceLineNo">6161</span> */<a name="line.6161"></a> -<span class="sourceLineNo">6162</span> @Override<a name="line.6162"></a> -<span class="sourceLineNo">6163</span> public synchronized boolean isFilterDone() throws IOException {<a name="line.6163"></a> -<span class="sourceLineNo">6164</span> return isFilterDoneInternal();<a name="line.6164"></a> -<span class="sourceLineNo">6165</span> }<a name="line.6165"></a> -<span class="sourceLineNo">6166</span><a name="line.6166"></a> -<span class="sourceLineNo">6167</span> private boolean isFilterDoneInternal() throws IOException {<a name="line.6167"></a> -<span class="sourceLineNo">6168</span> return this.filter != null && this.filter.filterAllRemaining();<a name="line.6168"></a> -<span class="sourceLineNo">6169</span> }<a name="line.6169"></a> -<span class="sourceLineNo">6170</span><a name="line.6170"></a> -<span class="sourceLineNo">6171</span> private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)<a name="line.6171"></a> -<span class="sourceLineNo">6172</span> throws IOException {<a name="line.6172"></a> -<span class="sourceLineNo">6173</span> if (!results.isEmpty()) {<a name="line.6173"></a> -<span class="sourceLineNo">6174</span> throw new IllegalArgumentException("First parameter should be an empty list");<a name="line.6174"></a> -<span class="sourceLineNo">6175</span> }<a name="line.6175"></a> -<span class="sourceLineNo">6176</span> if (scannerContext == null) {<a name="line.6176"></a> -<span class="sourceLineNo">6177</span> throw new IllegalArgumentException("Scanner context cannot be null");<a name="line.6177"></a> -<span class="sourceLineNo">6178</span> }<a name="line.6178"></a> -<span class="sourceLineNo">6179</span> Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();<a name="line.6179"></a> -<span class="sourceLineNo">6180</span><a name="line.6180"></a> -<span class="sourceLineNo">6181</span> // Save the initial progress from the Scanner context in these local variables. The progress<a name="line.6181"></a> -<span class="sourceLineNo">6182</span> // may need to be reset a few times if rows are being filtered out so we save the initial<a name="line.6182"></a> -<span class="sourceLineNo">6183</span> // progress.<a name="line.6183"></a> -<span class="sourceLineNo">6184</span> int initialBatchProgress = scannerContext.getBatchProgress();<a name="line.6184"></a> -<span class="sourceLineNo">6185</span> long initialSizeProgress = scannerContext.getDataSizeProgress();<a name="line.6185"></a> -<span class="sourceLineNo">6186</span> long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();<a name="line.6186"></a> -<span class="sourceLineNo">6187</span> long initialTimeProgress = scannerContext.getTimeProgress();<a name="line.6187"></a> -<span class="sourceLineNo">6188</span><a name="line.6188"></a> -<span class="sourceLineNo">6189</span> // The loop here is used only when at some point during the next we determine<a name="line.6189"></a> -<span class="sourceLineNo">6190</span> // that due to effects of filters or otherwise, we have an empty row in the result.<a name="line.6190"></a> -<span class="sourceLineNo">6191</span> // Then we loop and try again. Otherwise, we must get out on the first iteration via return,<a name="line.6191"></a> -<span class="sourceLineNo">6192</span> // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,<a name="line.6192"></a> -<span class="sourceLineNo">6193</span> // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).<a name="line.6193"></a> -<span class="sourceLineNo">6194</span> while (true) {<a name="line.6194"></a> -<span class="sourceLineNo">6195</span> // Starting to scan a new row. Reset the scanner progress according to whether or not<a name="line.6195"></a> -<span class="sourceLineNo">6196</span> // progress should be kept.<a name="line.6196"></a> -<span class="sourceLineNo">6197</span> if (scannerContext.getKeepProgress()) {<a name="line.6197"></a> -<span class="sourceLineNo">6198</span> // Progress should be kept. Reset to initial values seen at start of method invocation.<a name="line.6198"></a> -<span class="sourceLineNo">6199</span> scannerContext.setProgress(initialBatchProgress, initialSizeProgress,<a name="line.6199"></a> -<span class="sourceLineNo">6200</span> initialHeapSizeProgress, initialTimeProgress);<a name="line.6200"></a> -<span class="sourceLineNo">6201</span> } else {<a name="line.6201"></a> -<span class="sourceLineNo">6202</span> scannerContext.clearProgress();<a name="line.6202"></a> -<span class="sourceLineNo">6203</span> }<a name="line.6203"></a> -<span class="sourceLineNo">6204</span> if (rpcCall.isPresent()) {<a name="line.6204"></a> -<span class="sourceLineNo">6205</span> // If a user specifies a too-restrictive or too-slow scanner, the<a name="line.6205"></a> -<span class="sourceLineNo">6206</span> // client might time out and disconnect while the server side<a name="line.6206"></a> -<span class="sourceLineNo">6207</span> // is still processing the request. We should abort aggressively<a name="line.6207"></a> -<span class="sourceLineNo">6208</span> // in that case.<a name="line.6208"></a> -<span class="sourceLineNo">6209</span> long afterTime = rpcCall.get().disconnectSince();<a name="line.6209"></a> -<span class="sourceLineNo">6210</span> if (afterTime >= 0) {<a name="line.6210"></a> -<span class="sourceLineNo">6211</span> throw new CallerDisconnectedException(<a name="line.6211"></a> -<span class="sourceLineNo">6212</span> "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " +<a name="line.6212"></a> -<span class="sourceLineNo">6213</span> this + " after " + afterTime + " ms, since " +<a name="line.6213"></a> -<span class="sourceLineNo">6214</span> "caller disconnected");<a name="line.6214"></a> -<span class="sourceLineNo">6215</span> }<a name="line.6215"></a> -<span class="sourceLineNo">6216</span> }<a name="line.6216"></a> -<span class="sourceLineNo">6217</span><a name="line.6217"></a> -<span class="sourceLineNo">6218</span> // Let's see what we have in the storeHeap.<a name="line.6218"></a> -<span class="sourceLineNo">6219</span> Cell current = this.storeHeap.peek();<a name="line.6219"></a> -<span class="sourceLineNo">6220</span><a name="line.6220"></a> -<span class="sourceLineNo">6221</span> boolean shouldStop = shouldStop(current);<a name="line.6221"></a> -<span class="sourceLineNo">6222</span> // When has filter row is true it means that the all the cells for a particular row must be<a name="line.6222"></a> -<span class="sourceLineNo">6223</span> // read before a filtering decision can be made. This means that filters where hasFilterRow<a name="line.6223"></a> -<span class="sourceLineNo">6224</span> // run the risk of enLongAddering out of memory errors in the case that they are applied to a<a name="line.6224"></a> -<span class="sourceLineNo">6225</span> // table that has very large rows.<a name="line.6225"></a> -<span class="sourceLineNo">6226</span> boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();<a name="line.6226"></a> -<span class="sourceLineNo">6227</span><a name="line.6227"></a> -<span class="sourceLineNo">6228</span> // If filter#hasFilterRow is true, partial results are not allowed since allowing them<a name="line.6228"></a> -<span class="sourceLineNo">6229</span> // would prevent the filters from being evaluated. Thus, if it is true, change the<a name="line.6229"></a> -<span class="sourceLineNo">6230</span> // scope of any limits that could potentially create partial results to<a name="line.6230"></a> -<span class="sourceLineNo">6231</span> // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row<a name="line.6231"></a> -<span class="sourceLineNo">6232</span> if (hasFilterRow) {<a name="line.6232"></a> -<span class="sourceLineNo">6233</span> if (LOG.isTraceEnabled()) {<a name="line.6233"></a> -<span class="sourceLineNo">6234</span> LOG.trace("filter#hasFilterRow is true which prevents partial results from being "<a name="line.6234"></a> -<span class="sourceLineNo">6235</span> + " formed. Changing scope of limits that may create partials");<a name="line.6235"></a> -<span class="sourceLineNo">6236</span> }<a name="line.6236"></a> -<span class="sourceLineNo">6237</span> scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);<a name="line.6237"></a> -<span class="sourceLineNo">6238</span> scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);<a name="line.6238"></a> -<span class="sourceLineNo">6239</span> }<a name="line.6239"></a> -<span class="sourceLineNo">6240</span><a name="line.6240"></a> -<span class="sourceLineNo">6241</span> // Check if we were getting data from the joinedHeap and hit the limit.<a name="line.6241"></a> -<span class="sourceLineNo">6242</span> // If not, then it's main path - getting results from storeHeap.<a name="line.6242"></a> -<span class="sourceLineNo">6243</span> if (joinedContinuationRow == null) {<a name="line.6243"></a> -<span class="sourceLineNo">6244</span> // First, check if we are at a stop row. If so, there are no more results.<a name="line.6244"></a> -<span class="sourceLineNo">6245</span> if (shouldStop) {<a name="line.6245"></a> -<span class="sourceLineNo">6246</span> if (hasFilterRow) {<a name="line.6246"></a> -<span class="sourceLineNo">6247</span> filter.filterRowCells(results);<a name="line.6247"></a> -<span class="sourceLineNo">6248</span> }<a name="line.6248"></a> -<span class="sourceLineNo">6249</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.6249"></a> -<span class="sourceLineNo">6250</span> }<a name="line.6250"></a> -<span class="sourceLineNo">6251</span><a name="line.6251"></a> -<span class="sourceLineNo">6252</span> // Check if rowkey filter wants to exclude this row. If so, loop to next.<a name="line.6252"></a> -<span class="sourceLineNo">6253</span> // Technically, if we hit limits before on this row, we don't need this call.<a name="line.6253"></a> -<span class="sourceLineNo">6254</span> if (filterRowKey(current)) {<a name="line.6254"></a> -<span class="sourceLineNo">6255</span> incrementCountOfRowsFilteredMetric(scannerContext);<a name="line.6255"></a> -<span class="sourceLineNo">6256</span> // early check, see HBASE-16296<a name="line.6256"></a> -<span class="sourceLineNo">6257</span> if (isFilterDoneInternal()) {<a name="line.6257"></a> -<span class="sourceLineNo">6258</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.6258"></a> -<span class="sourceLineNo">6259</span> }<a name="line.6259"></a> -<span class="sourceLineNo">6260</span> // Typically the count of rows scanned is incremented inside #populateResult. However,<a name="line.6260"></a> -<span class="sourceLineNo">6261</span> // here we are filtering a row based purely on its row key, preventing us from calling<a name="line.6261"></a> -<span class="sourceLineNo">6262</span> // #populateResult. Thus, perform the necessary increment here to rows scanned metric<a name="line.6262"></a> -<span class="sourceLineNo">6263</span> incrementCountOfRowsScannedMetric(scannerContext);<a name="line.6263"></a> -<span class="sourceLineNo">6264</span> boolean moreRows = nextRow(scannerContext, current);<a name="line.6264"></a> -<span class="sourceLineNo">6265</span> if (!moreRows) {<a name="line.6265"></a> -<span class="sourceLineNo">6266</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.6266"></a> -<span class="sourceLineNo">6267</span> }<a name="line.6267"></a> -<span class="sourceLineNo">6268</span> results.clear();<a name="line.6268"></a> -<span class="sourceLineNo">6269</span> continue;<a name="line.6269"></a> -<span class="sourceLineNo">6270</span> }<a name="line.6270"></a> -<span class="sourceLineNo">6271</span><a name="line.6271"></a> -<span class="sourceLineNo">6272</span> // Ok, we are good, let's try to get some results from the main heap.<a name="line.6272"></a> -<span class="sourceLineNo">6273</span> populateResult(results, this.storeHeap, scannerContext, current);<a name="line.6273"></a> -<span class="sourceLineNo">6274</span> if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.6274"></a> -<span class="sourceLineNo">6275</span> if (hasFilterRow) {<a name="line.6275"></a> -<span class="sourceLineNo">6276</span> throw new IncompatibleFilterException(<a name="line.6276"></a> -<span class="sourceLineNo">6277</span> "Filter whose hasFilterRow() returns true is incompatible with scans that must "<a name="line.6277"></a> -<span class="sourceLineNo">6278</span> + " stop mid-row because of a limit. ScannerContext:" + scannerContext);<a name="line.6278"></a> -<span class="sourceLineNo">6279</span> }<a name="line.6279"></a> -<span class="sourceLineNo">6280</span> return true;<a name="line.6280"></a> -<span class="sourceLineNo">6281</span> }<a name="line.6281"></a> -<span class="sourceLineNo">6282</span><a name="line.6282"></a> -<span class="sourceLineNo">6283</span> Cell nextKv = this.storeHeap.peek();<a name="line.6283"></a> -<span class="sourceLineNo">6284</span> shouldStop = shouldStop(nextKv);<a name="line.6284"></a> -<span class="sourceLineNo">6285</span> // save that the row was empty before filters applied to it.<a name="line.6285"></a> -<span class="sourceLineNo">6286</span> final boolean isEmptyRow = results.isEmpty();<a name="line.6286"></a> -<span class="sourceLineNo">6287</span><a name="line.6287"></a> -<span class="sourceLineNo">6288</span> // We have the part of the row necessary for filtering (all of it, usually).<a name="line.6288"></a> -<span class="sourceLineNo">6289</span> // First filter with the filterRow(List).<a name="line.6289"></a> -<span class="sourceLineNo">6290</span> FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;<a name="line.6290"></a> -<span class="sourceLineNo">6291</span> if (hasFilterRow) {<a name="line.6291"></a> -<span class="sourceLineNo">6292</span> ret = filter.filterRowCellsWithRet(results);<a name="line.6292"></a> -<span class="sourceLineNo">6293</span><a name="line.6293"></a> -<span class="sourceLineNo">6294</span> // We don't know how the results have changed after being filtered. Must set progress<a name="line.6294"></a> -<span class="sourceLineNo">6295</span> // according to contents of results now. However, a change in the results should not<a name="line.6295"></a> -<span class="sourceLineNo">6296</span> // affect the time progress. Thus preserve whatever time progress has been made<a name="line.6296"></a> -<span class="sourceLineNo">6297</span> long timeProgress = scannerContext.getTimeProgress();<a name="line.6297"></a> -<span class="sourceLineNo">6298</span> if (scannerContext.getKeepProgress()) {<a name="line.6298"></a> -<span class="sourceLineNo">6299</span> scannerContext.setProgress(initialBatchProgress, initialSizeProgress,<a name="line.6299"></a> -<span class="sourceLineNo">6300</span> initialHeapSizeProgress, initialTimeProgress);<a name="line.6300"></a> -<span class="sourceLineNo">6301</span> } else {<a name="line.6301"></a> -<span class="sourceLineNo">6302</span> scannerContext.clearProgress();<a name="line.6302"></a> -<span class="sourceLineNo">6303</span> }<a name="line.6303"></a> -<span class="sourceLineNo">6304</span> scannerContext.setTimeProgress(timeProgress);<a name="line.6304"></a> -<span class="sourceLineNo">6305</span> scannerContext.incrementBatchProgress(results.size());<a name="line.6305"></a> -<span class="sourceLineNo">6306</span> for (Cell cell : results) {<a name="line.6306"></a> -<span class="sourceLineNo">6307</span> scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),<a name="line.6307"></a> -<span class="sourceLineNo">6308</span> PrivateCellUtil.estimatedHeapSizeOf(cell));<a name="line.6308"></a> -<span class="sourceLineNo">6309</span> }<a name="line.6309"></a> -<span class="sourceLineNo">6310</span> }<a name="line.6310"></a> -<span class="sourceLineNo">6311</span><a name="line.6311"></a> -<span class="sourceLineNo">6312</span> if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {<a name="line.6312"></a> -<span class="sourceLineNo">6313</span> incrementCountOfRowsFilteredMetric(scannerContext);<a name="line.6313"></a> -<span class="sourceLineNo">6314</span> results.clear();<a name="line.6314"></a> -<span class="sourceLineNo">6315</span> boolean moreRows = nextRow(scannerContext, current);<a name="line.6315"></a> -<span class="sourceLineNo">6316</span> if (!moreRows) {<a name="line.6316"></a> -<span class="sourceLineNo">6317</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.6317"></a> -<span class="sourceLineNo">6318</span> }<a name="line.6318"></a> -<span class="sourceLineNo">6319</span><a name="line.6319"></a> -<span class="sourceLineNo">6320</span> // This row was totally filtered out, if this is NOT the last row,<a name="line.6320"></a> -<span class="sourceLineNo">6321</span> // we should continue on. Otherwise, nothing else to do.<a name="line.6321"></a> -<span class="sourceLineNo">6322</span> if (!shouldStop) continue;<a name="line.6322"></a> -<span class="sourceLineNo">6323</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.6323"></a> -<span class="sourceLineNo">6324</span> }<a name="line.6324"></a> -<span class="sourceLineNo">6325</span><a name="line.6325"></a> -<span class="sourceLineNo">6326</span> // Ok, we are done with storeHeap for this row.<a name="line.6326"></a> -<span class="sourceLineNo">6327</span> // Now we may need to fetch additional, non-essential data into row.<a name="line.6327"></a> -<span class="sourceLineNo">6328</span> // These values are not needed for filter to work, so we postpone their<a name="line.6328"></a> -<span class="sourceLineNo">6329</span> // fetch to (possibly) reduce amount of data loads from disk.<a name="line.6329"></a> -<span class="sourceLineNo">6330</span> if (this.joinedHeap != null) {<a name="line.6330"></a> -<span class="sourceLineNo">6331</span> boolean mayHaveData = joinedHeapMayHaveData(current);<a name="line.6331"></a> -<span class="sourceLineNo">6332</span> if (mayHaveData) {<a name="line.6332"></a> -<span class="sourceLineNo">6333</span> joinedContinuationRow = current;<a name="line.6333"></a> -<span class="sourceLineNo">6334</span> populateFromJoinedHeap(results, scannerContext);<a name="line.6334"></a> -<span class="sourceLineNo">6335</span><a name="line.6335"></a> -<span class="sourceLineNo">6336</span> if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.6336"></a> -<span class="sourceLineNo">6337</span> return true;<a name="line.6337"></a> -<span class="sourceLineNo">6338</span> }<a name="line.6338"></a> -<span class="sourceLineNo">6339</span> }<a name="line.6339"></a> -<span class="sourceLineNo">6340</span> }<a name="line.6340"></a> -<span class="sourceLineNo">6341</span> } else {<a name="line.6341"></a> -<span class="sourceLineNo">6342</span> // Populating from the joined heap was stopped by limits, populate some more.<a name="line.6342"></a> -<span class="sourceLineNo">6343</span> populateFromJoinedHeap(results, scannerContext);<a name="line.6343"></a> -<span class="sourceLineNo">6344</span> if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.6344"></a> -<span class="sourceLineNo">6345</span> return true;<a name="line.6345"></a> -<span class="sourceLineNo">6346</span> }<a name="line.6346"></a> -<span class="sourceLineNo">6347</span> }<a name="line.6347"></a> -<span class="sourceLineNo">6348</span> // We may have just called populateFromJoinedMap and hit the limits. If that is<a name="line.6348"></a> -<span class="sourceLineNo">6349</span> // the case, we need to call it again on the next next() invocation.<a name="line.6349"></a> -<span class="sourceLineNo">6350</span> if (joinedContinuationRow != null) {<a name="line.6350"></a> -<span class="sourceLineNo">6351</span> return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();<a name="line.6351"></a> -<span class="sourceLineNo">6352</span> }<a name="line.6352"></a> -<span class="sourceLineNo">6353</span><a name="line.6353"></a> -<span class="sourceLineNo">6354</span> // Finally, we are done with both joinedHeap and storeHeap.<a name="line.6354"></a> -<span class="sourceLineNo">6355</span> // Double check to prevent empty rows from appearing in result. It could be<a name="line.6355"></a> -<span class="sourceLineNo">6356</span> // the case when SingleColumnValueExcludeFilter is used.<a name="line.6356"></a> -<span class="sourceLineNo">6357</span> if (results.isEmpty()) {<a name="line.6357"></a> -<span class="sourceLineNo">6358</span> incrementCountOfRowsFilteredMetric(scannerContext);<a name="line.6358"></a> -<span class="sourceLineNo">6359</span> boolean moreRows = nextRow(scannerContext, current);<a name="line.6359"></a> -<span class="sourceLineNo">6360</span> if (!moreRows) {<a name="line.6360"></a> -<span class="sourceLineNo">6361</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.6361"></a> -<span class="sourceLineNo">6362</span> }<a name="line.6362"></a> -<span class="sourceLineNo">6363</span> if (!shouldStop) continue;<a name="line.6363"></a> -<span class="sourceLineNo">6364</span> }<a name="line.6364"></a> -<span class="sourceLineNo">6365</span><a name="line.6365"></a> -<span class="sourceLineNo">6366</span> if (shouldStop) {<a name="line.6366"></a> -<span class="sourceLineNo">6367</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.6367"></a> -<span class="sourceLineNo">6368</span> } else {<a name="line.6368"></a> -<span class="sourceLineNo">6369</span> return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();<a name="line.6369"></a> -<span class="sourceLineNo">6370</span> }<a name="line.6370"></a> -<span class="sourceLineNo">6371</span> }<a name="line.6371"></a> -<span class="sourceLineNo">6372</span> }<a name="line.6372"></a> -<span class="sourceLineNo">6373</span><a name="line.6373"></a> -<span class="sourceLineNo">6374</span> protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {<a name="line.6374"></a> -<span class="sourceLineNo">6375</span> filteredReadRequestsCount.increment();<a name="line.6375"></a> -<span class="sourceLineNo">6376</span><a name="line.6376"></a> -<span class="sourceLineNo">6377</span> if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;<a name="line.6377"></a> -<span class="sourceLineNo">6378</span><a name="line.6378"></a> -<span class="sourceLineNo">6379</span> scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();<a name="line.6379"></a> -<span class="sourceLineNo">6380</span> }<a name="line.6380"></a> -<span class="sourceLineNo">6381</span><a name="line.6381"></a> -<span class="sourceLineNo">6382</span> protected void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {<a name="line.6382"></a> -<span class="sourceLineNo">6383</span> if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;<a name="line.6383"></a> -<span class="sourceLineNo">6384</span><a name="line.6384"></a> -<span class="sourceLineNo">6385</span> scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();<a name="line.6385"></a> -<span class="sourceLineNo">6386</span> }<a name="line.6386"></a> -<span class="sourceLineNo">6387</span><a name="line.6387"></a> -<span class="sourceLineNo">6388</span> /**<a name="line.6388"></a> -<span class="sourceLineNo">6389</span> * @param currentRowCell<a name="line.6389"></a> -<span class="sourceLineNo">6390</span> * @return true when the joined heap may have data for the current row<a name="line.6390"></a> -<span class="sourceLineNo">6391</span> * @throws IOException<a name="line.6391"></a> -<span class="sourceLineNo">6392</span> */<a name="line.6392"></a> -<span class="sourceLineNo">6393</span> private boolean joinedHeapMayHaveData(Cell currentRowCell)<a name="line.6393"></a> -<span class="sourceLineNo">6394</span> throws IOException {<a name="line.6394"></a> -<span class="sourceLineNo">6395</span> Cell nextJoinedKv = joinedHeap.peek();<a name="line.6395"></a> -<span class="sourceLineNo">6396</span> boolean matchCurrentRow =<a name="line.6396"></a> -<span class="sourceLineNo">6397</span> nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);<a name="line.6397"></a> -<span class="sourceLineNo">6398</span> boolean matchAfterSeek = false;<a name="line.6398"></a> -<span class="sourceLineNo">6399</span><a name="line.6399"></a> -<span class="sourceLineNo">6400</span> // If the next value in the joined heap does not match the current row, try to seek to the<a name="line.6400"></a> -<span class="sourceLineNo">6401</span> // correct row<a name="line.6401"></a> -<span class="sourceLineNo">6402</span> if (!matchCurrentRow) {<a name="line.6402"></a> -<span class="sourceLineNo">6403</span> Cell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);<a name="line.6403"></a> -<span class="sourceLineNo">6404</span> boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);<a name="line.6404"></a> -<span class="sourceLineNo">6405</span> matchAfterSeek =<a name="line.6405"></a> -<span class="sourceLineNo">6406</span> seekSuccessful && joinedHeap.peek() != null<a name="line.6406"></a> -<span class="sourceLineNo">6407</span> && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);<a name="line.6407"></a> -<span class="sourceLineNo">6408</span> }<a name="line.6408"></a> -<span class="sourceLineNo">6409</span><a name="line.6409"></a> -<span class="sourceLineNo">6410</span> return matchCurrentRow || matchAfterSeek;<a name="line.6410"></a> -<span class="sourceLineNo">6411</span> }<a name="line.6411"></a> -<span class="sourceLineNo">6412</span><a name="line.6412"></a> -<span class="sourceLineNo">6413</span> /**<a name="line.6413"></a> -<span class="sourceLineNo">6414</span> * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines<a name="line.6414"></a> -<span class="sourceLineNo">6415</span> * both filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older,<a name="line.6415"></a> -<span class="sourceLineNo">6416</span> * it may not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only<a name="line.6416"></a> -<span class="sourceLineNo">6417</span> * returns true when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow().<a name="line.6417"></a> -<span class="sourceLineNo">6418</span> * Therefore, the filterRow() will be skipped.<a name="line.6418"></a> -<span class="sourceLineNo">6419</span> */<a name="line.6419"></a> -<span class="sourceLineNo">6420</span> private boolean filterRow() throws IOException {<a name="line.6420"></a> -<span class="sourceLineNo">6421</span> // when hasFilterRow returns true, filter.filterRow() will be called automatically inside<a name="line.6421"></a> -<span class="sourceLineNo">6422</span> // filterRowCells(List<Cell> kvs) so we skip that scenario here.<a name="line.6422"></a> -<span class="sourceLineNo">6423</span> return filter != null && (!filter.hasFilterRow())<a name="line.6423"></a> -<span class="sourceLineNo">6424</span> && filter.filterRow();<a name="line.6424"></a> -<span class="sourceLineNo">6425</span> }<a name="line.6425"></a> -<span class="sourceLineNo">6426</span><a name="line.6426"></a> -<span class="sourceLineNo">6427</span> private boolean filterRowKey(Cell current) throws IOException {<a name="line.6427"></a> -<span class="sourceLineNo">6428</span> return filter != null && filter.filterRowKey(current);<a name="line.6428"></a> -<span class="sourceLineNo">6429</span> }<a name="line.6429"></a> -<span class="sourceLineNo">6430</span><a name="line.6430"></a> -<span class="sourceLineNo">6431</span> protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {<a name="line.6431"></a> -<span class="sourceLineNo">6432</span> assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";<a name="line.6432"></a> -<span class="sourceLineNo">6433</span> Cell next;<a name="line.6433"></a> -<span class="sourceLineNo">6434</span> while ((next = this.storeHeap.peek()) != null &&<a name="line.6434"></a> -<span class="sourceLineNo">6435</span> CellUtil.matchingRows(next, curRowCell)) {<a name="line.6435"></a> -<span class="sourceLineNo">6436</span> this.storeHeap.next(MOCKED_LIST);<a name="line.6436"></a> -<span class="sourceLineNo">6437</span> }<a name="line.6437"></a> -<span class="sourceLineNo">6438</span> resetFilters();<a name="line.6438"></a> -<span class="sourceLineNo">6439</span><a name="line.6439"></a> -<span class="sourceLineNo">6440</span> // Calling the hook in CP which allows it to do a fast forward<a name="line.6440"></a> -<span class="sourceLineNo">6441</span> return this.region.getCoprocessorHost() == null<a name="line.6441"></a> -<span class="sourceLineNo">6442</span> || this.region.getCoprocessorHost()<a name="line.6442"></a> -<span class="sourceLineNo">6443</span> .postScannerFilterRow(this, curRowCell);<a name="line.6443"></a> -<span class="sourceLineNo">6444</span> }<a name="line.6444"></a> -<span class="sourceLineNo">6445</span><a name="line.6445"></a> -<span class="sourceLineNo">6446</span> protected boolean shouldStop(Cell currentRowCell) {<a name="line.6446"></a> -<span class="sourceLineNo">6447</span> if (currentRowCell == null) {<a name="line.6447"></a> -<span class="sourceLineNo">6448</span> return true;<a name="line.6448"></a> -<span class="sourceLineNo">6449</span> }<a name="line.6449"></a> -<span class="sourceLineNo">6450</span> if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {<a name="line.6450"></a> -<span class="sourceLineNo">6451</span> return false;<a name="line.6451"></a> -<span class="sourceLineNo">6452</span> }<a name="line.6452"></a> -<span class="sourceLineNo">6453</span> int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);<a name="line.6453"></a> -<span class="sourceLineNo">6454</span> return c > 0 || (c == 0 && !includeStopRow);<a name="line.6454"></a> -<span class="sourceLineNo">6455</span> }<a name="line.6455"></a> -<span class="sourceLineNo">6456</span><a name="line.6456"></a> -<span class="sourceLineNo">6457</span> @Override<a name="line.6457"></a> -<span class="sourceLineNo">6458</span> public synchronized void close() {<a name="line.6458"></a> -<span class="sourceLineNo">6459</span> if (storeHeap != null) {<a name="line.6459"></a> -<span class="sourceLineNo">6460</span> storeHeap.close();<a name="line.6460"></a> -<span class="sourceLineNo">6461</span> storeHeap = null;<a name="line.6461"></a> -<span class="sourceLineNo">6462</span> }<a name="line.6462"></a> -<span class="sourceLineNo">6463</span> if (joinedHeap != null) {<a name="line.6463"></a> -<span class="sourceLineNo">6464</span> joinedHeap.close();<a name="line.6464"></a> -<span class="sourceLineNo">6465</span> joinedHeap = null;<a name="line.6465"></a> -<span class="sourceLineNo">6466</span> }<a name="line.6466"></a> -<span class="sourceLineNo">6467</span> // no need to synchronize here.<a name="line.6467"></a> -<span class="sourceLineNo">6468</span> scannerReadPoints.remove(this);<a name="line.6468"></a> -<span class="sourceLineNo">6469</span> this.filterClosed = true;<a name="line.6469"></a> -<span class="sourceLineNo">6470</span> }<a name="line.6470"></a> -<span class="sourceLineNo">6471</span><a name="line.6471"></a> -<span class="sourceLineNo">6472</span> KeyValueHeap getStoreHeapForTesting() {<a name="line.6472"></a> -<span class="sourceLineNo">6473</span> return storeHeap;<a name="line.6473"></a> -<span class="sourceLineNo">6474</span> }<a name="line.6474"></a> -<span class="sourceLineNo">6475</span><a name="line.6475"></a> -<span class="sourceLineNo">6476</span> @Override<a name="line.6476"></a> -<span class="sourceLineNo">6477</span> public synchronized boolean reseek(byte[] row) throws IOException {<a name="line.6477"></a> -<span class="sourceLineNo">6478</span> if (row == null) {<a name="line.6478"></a> -<span class="sourceLineNo">6479</span> throw new IllegalArgumentException("Row cannot be null.");<a name="line.6479"></a> -<span class="sourceLineNo">6480</span> }<a name="line.6480"></a> -<span class="sourceLineNo">6481</span> boolean result = false;<a name="line.6481"></a> -<span class="sourceLineNo">6482</span> startRegionOperation();<a name="line.6482"></a> -<span class="sourceLineNo">6483</span> Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);<a name="line.6483"></a> -<span class="sourceLineNo">6484</span> try {<a name="line.6484"></a> -<span class="sourceLineNo">6485</span> // use request seek to make use of the lazy seek option. See HBASE-5520<a name="line.6485"></a> -<span class="sourceLineNo">6486</span> result = this.storeHeap.requestSeek(kv, true, true);<a name="line.6486"></a> -<span class="sourceLineNo">6487</span> if (this.joinedHeap != null) {<a name="line.6487"></a> -<span class="sourceLineNo">6488</span> result = this.joinedHeap.requestSeek(kv, true, true) || result;<a name="line.6488"></a> -<span class="sourceLineNo">6489</span> }<a name="line.6489"></a> -<span class="sourceLineNo">6490</span> } finally {<a name="line.6490"></a> -<span class="sourceLineNo">6491</span> closeRegionOperation();<a name="line.6491"></a> -<span class="sourceLineNo">6492</span> }<a name="line.6492"></a> -<span class="sourceLineNo">6493</span> return result;<a name="line.6493"></a> -<span class="sourceLineNo">6494</span> }<a name="line.6494"></a> -<span class="sourceLineNo">6495</span><a name="line.6495"></a> -<span class="sourceLineNo">6496</span> @Override<a name="line.6496"></a> -<span class="sourceLineNo">6497</span> public void shipped() throws IOException {<a name="line.6497"></a> -<span class="sourceLineNo">6498</span> if (storeHeap != null) {<a name="line.6498"></a> -<span class="sourceLineNo">6499</span> storeHeap.shipped();<a name="line.6499"></a> -<span class="sourceLineNo">6500</span> }<a name="line.6500"></a> -<span class="sourceLineNo">6501</span> if (joinedHeap != null) {<a name="line.6501"></a> -<span class="sourceLineNo">6502</span> joinedHeap.shipped();<a name="line.6502"></a> -<span class="sourceLineNo">6503</span> }<a name="line.6503"></a> -<span class="sourceLineNo">6504</span> }<a name="line.6504"></a> -<span class="sourceLineNo">6505</span><a name="line.6505"></a> -<span class="sourceLineNo">6506</span> @Override<a name="line.6506"></a> -<span class="sourceLineNo">6507</span> public void run() throws IOException {<a name="line.6507"></a> -<span class="sourceLineNo">6508</span> // This is the RPC callback method executed. We do the close in of the scanner in this<a name="line.6508"></a> -<span class="sourceLineNo">6509</span> // callback<a name="line.6509"></a> -<span class="sourceLineNo">6510</span> this.close();<a name="line.6510"></a> -<span class="sourceLineNo">6511</span> }<a name="line.6511"></a> -<span class="sourceLineNo">6512</span> }<a name="line.6512"></a> -<span class="sourceLineNo">6513</span><a name="line.6513"></a> -<span class="sourceLineNo">6514</span> // Utility methods<a name="line.6514"></a> -<span class="sourceLineNo">6515</span> /**<a name="line.6515"></a> -<span class="sourceLineNo">6516</span> * A utility method to create new instances of HRegion based on the<a name="line.6516"></a> -<span class="sourceLineNo">6517</span> * {@link HConstants#REGION_IMPL} configuration property.<a name="line.6517"></a> -<span class="sourceLineNo">6518</span> * @param tableDir qualified path of directory where region should be located,<a name="line.6518"></a> -<span class="sourceLineNo">6519</span> * usually the table directory.<a name="line.6519"></a> -<span class="sourceLineNo">6520</span> * @param wal The WAL is the outbound log for any updates to the HRegion<a name="line.6520"></a> -<span class="sourceLineNo">6521</span> * The wal file is a logfile from the previous execution that's<a name="line.6521"></a> -<span class="sourceLineNo">6522</span> * custom-computed for this HRegion. The HRegionServer computes and sorts the<a name="line.6522"></a> -<span class="sourceLineNo">6523</span> * appropriate wal info for this HRegion. If there is a previous file<a name="line.6523"></a> -<span class="sourceLineNo">6524</span> * (implying that the HRegion has been written-to before), then read it from<a name="line.6524"></a> -<span class="sourceLineNo">6525</span> * the supplied path.<a name="line.6525"></a> -<span class="sourceLineNo">6526</span> * @param fs is the filesystem.<a name="line.6526"></a> -<span class="sourceLineNo">6527</span> * @param conf is global configuration settings.<a name="line.6527"></a> -<span class="sourceLineNo">6528</span> * @param regionInfo - RegionInfo that describes the region<a name="line.6528"></a> -<span class="sourceLineNo">6529</span> * is new), then read them from the supplied path.<a name="line.6529"></a> -<span class="sourceLineNo">6530</span> * @param htd the table descriptor<a name="line.6530"></a> -<span class="sourceLineNo">6531</span> * @return the new instance<a name="line.6531"></a> -<span class="sourceLineNo">6532</span> */<a name="line.6532"></a> -<span class="sourceLineNo">6533</span> static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,<a name="line.6533"></a> -<span class="sourceLineNo">6534</span> Configuration conf, RegionInfo regionInfo, final TableDescriptor htd,<a name="line.6534"></a> -<span class="sourceLineNo">6535</span> RegionServerServices rsServices) {<a name="line.6535"></a> -<span class="sourceLineNo">6536</span> try {<a name="line.6536"></a> -<span class="sourceLineNo">6537</span> @SuppressWarnings("unchecked")<a name="line.6537"></a> -<span class="sourceLineNo">6538</span> Class<? extends HRegion> regionClass =<a name="line.6538"></a> -<span class="sourceLineNo">6539</span> (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);<a name="line.6539"></a> -<span class="sourceLineNo">6540</span><a name="line.6540"></a> -<span class="sourceLineNo">6541</span> Constructor<? extends HRegion> c =<a name="line.6541"></a> -<span class="sourceLineNo">6542</span> regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,<a name="line.6542"></a> -<span class="sourceLineNo">6543</span> Configuration.class, RegionInfo.class, TableDescriptor.class,<a name="line.6543"></a> -<span class="sourceLineNo">6544</span> RegionServerServices.class);<a name="line.6544"></a> -<span class="sourceLineNo">6545</span><a name="line.6545"></a> -<span class="sourceLineNo">6546</span> return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);<a name="line.6546"></a> -<span class="sourceLineNo">6547</span> } catch (Throwable e) {<a name="line.6547"></a> -<span class="sourceLineNo">6548</span> // todo: what should I throw here?<a name="line.6548"></a> -<span class="sourceLineNo">6549</span> throw new IllegalStateException("Could not instantiate a region instance.", e);<a name="line.6549"></a> -<span class="sourceLineNo">6550</span> }<a name="line.6550"></a> -<span class="sourceLineNo">6551</span> }<a name="line.6551"></a> -<span class="sourceLineNo">6552</span><a name="line.6552"></a> -<span class="sourceLineNo">6553</span> /**<a name="line.6553"></a> -<span class="sourceLineNo">6554</span> * Convenience method creating new HRegions. Used by createTable.<a name="line.6554"></a> -<span class="sourceLineNo">6555</span> *<a name="line.6555"></a> -<span class="sourceLineNo">6556</span> * @param info Info for region to create.<a name="line.6556"></a> -<span class="sourceLineNo">6557</span> * @param rootDir Root directory for HBase instance<a name="line.6557"></a> -<span class="sourceLineNo">6558</span> * @param wal shared WAL<a name="line.6558"></a> -<span class="sourceLineNo">6559</span> * @param initialize - true to initialize the region<a name="line.6559"></a> -<span class="sourceLineNo">6560</span> * @return new HRegion<a name="line.6560"></a> -<span class="sourceLineNo">6561</span> * @throws IOException<a name="line.6561"></a> -<span class="sourceLineNo">6562</span> */<a name="line.6562"></a> -<span class="sourceLineNo">6563</span> public static HRegion createHRegion(final RegionInfo info, final Path rootDir,<a name="line.6563"></a> -<span class="sourceLineNo">6564</span> final Configuration conf, final TableDescriptor hTableDescriptor,<a name="line.6564"></a> -<span class="sourceLineNo">6565</span> final WAL wal, final boolean initialize)<a name="line.6565"></a> -<span class="sourceLineNo">6566</span> throws IOException {<a name="line.6566"></a> -<span class="sourceLineNo">6567</span> LOG.info("creating HRegion " + info.getTable().getNameAsString()<a name="line.6567"></a> -<span class="sourceLineNo">6568</span> + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +<a name="line.6568"></a> -<span class="sourceLineNo">6569</span> " Table name == " + info.getTable().getNameAsString());<a name="line.6569"></a> -<span class="sourceLineNo">6570</span> FileSystem fs = FileSystem.get(conf);<a name="line.6570"></a> -<span class="sourceLineNo">6571</span> Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());<a name="line.6571"></a> -<span class="sourceLineNo">6572</span> HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);<a name="line.6572"></a> -<span class="sourceLineNo">6573</span> HRegion region = HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, null);<a name="line.6573"></a> -<span class="sourceLineNo">6574</span> if (initialize) region.initialize(null);<a name="line.6574"></a> -<span class="sourceLineNo">6575</span> return region;<a name="line.6575"></a> -<span class="sourceLineNo">6576</span> }<a name="line.6576"></a> -<span class="sourceLineNo">6577</span><a name="line.6577"></a> -<span class="sourceLineNo">6578</span> public static HRegion createHRegion(final RegionInfo info, final Path rootDir,<a name="line.6578"></a> -<span class="sourceLineNo">6579</span> final Configuration conf,<a name="line.6579"></a> -<span class="sourceLineNo">6580</span> final TableDescriptor hTableDescriptor,<a name="line.6580"></a> -<span class="sourceLineNo">6581</span> final WAL wal)<a name="line.6581"></a> -<span class="sourceLineNo">6582</span> throws IOException {<a name="line.6582"></a> -<span class="sourceLineNo">6583</span> return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);<a name="line.6583"></a> -<span class="sourceLineNo">6584</span> }<a name="line.6584"></a> -<span class="sourceLineNo">6585</span><a name="line.6585"></a> +<span class="sourceLineNo">5869</span> class RegionScannerImpl<a name="line.5869"></a> +<span class="sourceLineNo">5870</span> implements RegionScanner, Shipper, org.apache.hadoop.hbase.ipc.RpcCallback {<a name="line.5870"></a> +<span class="sourceLineNo">5871</span> // Package local for testability<a name="line.5871"></a> +<span class="sourceLineNo">5872</span> KeyValueHeap storeHeap = null;<a name="line.5872"></a> +<span class="sourceLineNo">5873</span> /** Heap of key-values that are not essential for the provided filters and are thus read<a name="line.5873"></a> +<span class="sourceLineNo">5874</span> * on demand, if on-demand column family loading is enabled.*/<a name="line.5874"></a> +<span class="sourceLineNo">5875</span> KeyValueHeap joinedHeap = null;<a name="line.5875"></a> +<span class="sourceLineNo">5876</span> /**<a name="line.5876"></a> +<span class="sourceLineNo">5877</span> * If the joined heap data gathering is interrupted due to scan limits, this will<a name="line.5877"></a> +<span class="sourceLineNo">5878</span> * contain the row for which we are populating the values.*/<a name="line.5878"></a> +<span class="sourceLineNo">5879</span> protected Cell joinedContinuationRow = null;<a name="line.5879"></a> +<span class="sourceLineNo">5880</span> private boolean filterClosed = false;<a name="line.5880"></a> +<span class="sourceLineNo">5881</span><a name="line.5881"></a> +<span class="sourceLineNo">5882</span> protected final byte[] stopRow;<a name="line.5882"></a> +<span class="sourceLineNo">5883</span> protected final boolean includeStopRow;<a name="line.5883"></a> +<span class="sourceLineNo">5884</span> protected final HRegion region;<a name="line.5884"></a> +<span class="sourceLineNo">5885</span> protected final CellComparator comparator;<a name="line.5885"></a> +<span class="sourceLineNo">5886</span><a name="line.5886"></a> +<span class="sourceLineNo">5887</span> private final long readPt;<a name="line.5887"></a> +<span class="sourceLineNo">5888</span> private final long maxResultSize;<a name="line.5888"></a> +<span class="sourceLineNo">5889</span> private final ScannerContext defaultScannerContext;<a name="line.5889"></a> +<span class="sourceLineNo">5890</span> private final FilterWrapper filter;<a name="line.5890"></a> +<span class="sourceLineNo">5891</span><a name="line.5891"></a> +<span class="sourceLineNo">5892</span> @Override<a name="line.5892"></a> +<span class="sourceLineNo">5893</span> public RegionInfo getRegionInfo() {<a name="line.5893"></a> +<span class="sourceLineNo">5894</span> return region.getRegionInfo();<a name="line.5894"></a> +<span class="sourceLineNo">5895</span> }<a name="line.5895"></a> +<span class="sourceLineNo">5896</span><a name="line.5896"></a> +<span class="sourceLineNo">5897</span> RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)<a name="line.5897"></a> +<span class="sourceLineNo">5898</span> throws IOException {<a name="line.5898"></a> +<span class="sourceLineNo">5899</span> this(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.5899"></a> +<span class="sourceLineNo">5900</span> }<a name="line.5900"></a> +<span class="sourceLineNo">5901</span><a name="line.5901"></a> +<span class="sourceLineNo">5902</span> RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,<a name="line.5902"></a> +<span class="sourceLineNo">5903</span> long nonceGroup, long nonce) throws IOException {<a name="line.5903"></a> +<span class="sourceLineNo">5904</span> this.region = region;<a name="line.5904"></a> +<span class="sourceLineNo">5905</span> this.maxResultSize = scan.getMaxResultSize();<a name="line.5905"></a> +<span class="sourceLineNo">5906</span> if (scan.hasFilter()) {<a name="line.5906"></a> +<span class="sourceLineNo">5907</span> this.filter = new FilterWrapper(scan.getFilter());<a name="line.5907"></a> +<span class="sourceLineNo">5908</span> } else {<a name="line.5908"></a> +<span class="sourceLineNo">5909</span> this.filter = null;<a name="line.5909"></a> +<span class="sourceLineNo">5910</span> }<a name="line.5910"></a> +<span class="sourceLineNo">5911</span> this.comparator = region.getCellComparator();<a name="line.5911"></a> +<span class="sourceLineNo">5912</span> /**<a name="line.5912"></a> +<span class="sourceLineNo">5913</span> * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default<a name="line.5913"></a> +<span class="sourceLineNo">5914</span> * scanner context that can be used to enforce the batch limit in the event that a<a name="line.5914"></a> +<span class="sourceLineNo">5915</span> * ScannerContext is not specified during an invocation of next/nextRaw<a name="line.5915"></a> +<span class="sourceLineNo">5916</span> */<a name="line.5916"></a> +<span class="sourceLineNo">5917</span> defaultScannerContext = ScannerContext.newBuilder()<a name="line.5917"></a> +<span class="sourceLineNo">5918</span> .setBatchLimit(scan.getBatch()).build();<a name="line.5918"></a> +<span class="sourceLineNo">5919</span> this.stopRow = scan.getStopRow();<a name="line.5919"></a> +<span class="sourceLineNo">5920</span> this.includeStopRow = scan.includeStopRow();<a name="line.5920"></a> +<span class="sourceLineNo">5921</span><a name="line.5921"></a> +<span class="sourceLineNo">5922</span> // synchronize on scannerReadPoints so that nobody calculates<a name="line.5922"></a> +<span class="sourceLineNo">5923</span> // getSmallestReadPoint, before scannerReadPoints is updated.<a name="line.5923"></a> +<span class="sourceLineNo">5924</span> IsolationLevel isolationLevel = scan.getIsolationLevel();<a name="line.5924"></a> +<span class="sourceLineNo">5925</span> long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);<a name="line.5925"></a> +<span class="sourceLineNo">5926</span> synchronized (scannerReadPoints) {<a name="line.5926"></a> +<span class="sourceLineNo">5927</span> if (mvccReadPoint > 0) {<a name="line.5927"></a> +<span class="sourceLineNo">5928</span> this.readPt = mvccReadPoint;<a name="line.5928"></a> +<span class="sourceLineNo">5929</span> } else if (nonce == HConstants.NO_NONCE || rsServices == null<a name="line.5929"></a> +<span class="sourceLineNo">5930</span> || rsServices.getNonceManager() == null) {<a name="line.5930"></a> +<span class="sourceLineNo">5931</span> this.readPt = getReadPoint(isolationLevel);<a name="line.5931"></a> +<span class="sourceLineNo">5932</span> } else {<a name="line.5932"></a> +<span class="sourceLineNo">5933</span> this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);<a name="line.5933"></a> +<span class="sourceLineNo">5934</span> }<a name="line.5934"></a> +<span class="sourceLineNo">5935</span> scannerReadPoints.put(this, this.readPt);<a name="line.5935"></a> +<span class="sourceLineNo">5936</span> }<a name="line.5936"></a> +<span class="sourceLineNo">5937</span> initializeScanners(scan, additionalScanners);<a name="line.5937"></a> +<span class="sourceLineNo">5938</span> }<a name="line.5938"></a> +<span class="sourceLineNo">5939</span><a name="line.5939"></a> +<span class="sourceLineNo">5940</span> protected void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)<a name="line.5940"></a> +<span class="sourceLineNo">5941</span> throws IOException {<a name="line.5941"></a> +<span class="sourceLineNo">5942</span> // Here we separate all scanners into two lists - scanner that provide data required<a name="line.5942"></a> +<span class="sourceLineNo">5943</span> // by the filter to operate (scanners list) and all others (joinedScanners list).<a name="line.5943"></a> +<span class="sourceLineNo">5944</span> List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());<a name="line.5944"></a> +<span class="sourceLineNo">5945</span> List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());<a name="line.5945"></a> +<span class="sourceLineNo">5946</span> // Store all already instantiated scanners for exception handling<a name="line.5946"></a> +<span class="sourceLineNo">5947</span> List<KeyValueScanner> instantiatedScanners = new ArrayList<>();<a name="line.5947"></a> +<span class="sourceLineNo">5948</span> // handle additionalScanners<a name="line.5948"></a> +<span class="sourceLineNo">5949</span> if (additionalScanners != null && !additionalScanners.isEmpty()) {<a name="line.5949"></a> +<span class="sourceLineNo">5950</span> scanners.addAll(additionalScanners);<a name="line.5950"></a> +<span class="sourceLineNo">5951</span> instantiatedScanners.addAll(additionalScanners);<a name="line.5951"></a> +<span class="sourceLineNo">5952</span> }<a name="line.5952"></a> +<span class="sourceLineNo">5953</span><a name="line.5953"></a> +<span class="sourceLineNo">5954</span> try {<a name="line.5954"></a> +<span class="sourceLineNo">5955</spa <TRUNCATED>
