PHOENIX-3021 Using local index during compaction is producing NPE(Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1a05421a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1a05421a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1a05421a Branch: refs/heads/encodecolumns Commit: 1a05421a3e968e0aaeee9a9d76f10f08bd6b3ae0 Parents: 7fcdf96 Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Wed Jul 13 18:46:51 2016 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Wed Jul 13 18:46:51 2016 +0530 ---------------------------------------------------------------------- .../IndexHalfStoreFileReaderGenerator.java | 87 ++++++++++++++------ 1 file changed, 61 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a05421a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java index 9861cad..3bbc3df 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java @@ -28,6 +28,7 @@ import java.util.NavigableSet; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -244,36 +245,70 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { if (store.getFamily().getNameAsString() .startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && store.hasReferences()) { - long readPt = c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()); - boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall()); - Collection<StoreFile> storeFiles = store.getStorefiles(); - List<StoreFile> nonReferenceStoreFiles = new ArrayList<StoreFile>(store.getStorefiles().size()); - List<StoreFile> referenceStoreFiles = new ArrayList<StoreFile>(store.getStorefiles().size()); - List<KeyValueScanner> keyValueScanners = new ArrayList<KeyValueScanner>(store.getStorefiles().size()+1); - for(StoreFile storeFile : storeFiles) { - if (storeFile.isReference()) { - referenceStoreFiles.add(storeFile); - } else { - nonReferenceStoreFiles.add(storeFile); - } - } - List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(nonReferenceStoreFiles, scan.getCacheBlocks(), scanUsePread, readPt); - keyValueScanners.addAll(scanners); - for(StoreFile sf : referenceStoreFiles) { - if(sf.getReader() instanceof IndexHalfStoreFileReader) { - keyValueScanners.add(new LocalIndexStoreFileScanner(sf.getReader(), sf.getReader() + final long readPt = c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel + ()); + if (!scan.isReversed()) { + return new StoreScanner(store, store.getScanInfo(), scan, + targetCols, readPt) { + + @Override + protected List<KeyValueScanner> getScannersNoCompaction() throws IOException { + if (store.hasReferences()) { + return getLocalIndexScanners(c, store, scan, readPt); + } else { + return super.getScannersNoCompaction(); + } + } + }; + } else { + return new ReversedStoreScanner(store, store.getScanInfo(), scan, + targetCols, readPt) { + @Override + protected List<KeyValueScanner> getScannersNoCompaction() throws IOException { + if (store.hasReferences()) { + return getLocalIndexScanners(c, store, scan, readPt); + } else { + return super.getScannersNoCompaction(); + } + } + }; + } + } + return s; + } + + private List<KeyValueScanner> getLocalIndexScanners(final + ObserverContext<RegionCoprocessorEnvironment> c, + final Store store, final Scan scan, final long readPt) throws IOException { + + boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall()); + Collection<StoreFile> storeFiles = store.getStorefiles(); + List<StoreFile> nonReferenceStoreFiles = new ArrayList<>(store.getStorefiles().size()); + List<StoreFile> referenceStoreFiles = new ArrayList<>(store.getStorefiles().size + ()); + final List<KeyValueScanner> keyValueScanners = new ArrayList<>(store + .getStorefiles().size() + 1); + for (StoreFile storeFile : storeFiles) { + if (storeFile.isReference()) { + referenceStoreFiles.add(storeFile); + } else { + nonReferenceStoreFiles.add(storeFile); + } + } + final List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(nonReferenceStoreFiles, scan.getCacheBlocks(), scanUsePread, readPt); + keyValueScanners.addAll(scanners); + for (StoreFile sf : referenceStoreFiles) { + if (sf.getReader() instanceof IndexHalfStoreFileReader) { + keyValueScanners.add(new LocalIndexStoreFileScanner(sf.getReader(), sf.getReader() .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf .getReader().getHFileReader().hasMVCCInfo(), readPt)); - } else { - keyValueScanners.add(new StoreFileScanner(sf.getReader(), sf.getReader() + } else { + keyValueScanners.add(new StoreFileScanner(sf.getReader(), sf.getReader() .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf .getReader().getHFileReader().hasMVCCInfo(), readPt)); - } } - keyValueScanners.addAll(((HStore)store).memstore.getScanners(readPt)); - if(!scan.isReversed()) return new StoreScanner(scan, store.getScanInfo(), ScanType.USER_SCAN, targetCols, keyValueScanners); - return new ReversedStoreScanner(scan, store.getScanInfo(), ScanType.USER_SCAN, targetCols, keyValueScanners); - } - return s; + } + keyValueScanners.addAll(((HStore) store).memstore.getScanners(readPt)); + return keyValueScanners; } }
