This is an automated email from the ASF dual-hosted git repository. openinx pushed a commit to branch HBASE-21879 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 2e82751bb86ad29562c1a45373456e96d1721e5b Author: huzheng <[email protected]> AuthorDate: Wed Apr 17 11:54:15 2019 +0800 HBASE-22122 Change to release mob hfile's block after rpc server shipped response to client --- .../hadoop/hbase/mob/DefaultMobStoreCompactor.java | 28 ++--- .../java/org/apache/hadoop/hbase/mob/MobCell.java | 74 +++++++++++++ .../java/org/apache/hadoop/hbase/mob/MobFile.java | 21 ++-- .../hadoop/hbase/regionserver/HMobStore.java | 63 ++++++------ .../hadoop/hbase/regionserver/MobStoreScanner.java | 41 +++++++- .../hadoop/hbase/regionserver/RSRpcServices.java | 2 +- .../regionserver/ReversedMobStoreScanner.java | 47 +++++++-- .../hbase/regionserver/StoreFileScanner.java | 12 --- .../apache/hadoop/hbase/mob/TestCachedMobFile.java | 19 ++-- .../org/apache/hadoop/hbase/mob/TestMobFile.java | 26 ++--- .../hbase/mob/TestMobWithByteBuffAllocator.java | 114 +++++++++++++++++++++ .../hadoop/hbase/regionserver/TestHMobStore.java | 19 ++-- 12 files changed, 347 insertions(+), 119 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 062bec6..ee1a53f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -244,19 +244,21 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { writer.append(c); } else { // If the value is not larger than the threshold, it's not regarded a mob. Retrieve - // the mob cell from the mob file, and write it back to the store file. - Cell mobCell = mobStore.resolve(c, false); - if (mobCell.getValueLength() != 0) { - // put the mob data back to the store file - PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); - writer.append(mobCell); - cellsCountCompactedFromMob++; - cellsSizeCompactedFromMob += mobCell.getValueLength(); - } else { - // If the value of a file is empty, there might be issues when retrieving, - // directly write the cell to the store file, and leave it to be handled by the - // next compaction. - writer.append(c); + // the mob cell from the mob file, and write it back to the store file. Must + // close the mob scanner once the life cycle finished. + try (MobCell mobCell = mobStore.resolve(c, false)) { + if (mobCell.getCell().getValueLength() != 0) { + // put the mob data back to the store file + PrivateCellUtil.setSequenceId(mobCell.getCell(), c.getSequenceId()); + writer.append(mobCell.getCell()); + cellsCountCompactedFromMob++; + cellsSizeCompactedFromMob += mobCell.getCell().getValueLength(); + } else { + // If the value of a file is empty, there might be issues when retrieving, + // directly write the cell to the store file, and leave it to be handled by the + // next compaction. + writer.append(c); + } } } } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java new file mode 100644 index 0000000..ec956a2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java @@ -0,0 +1,74 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mob; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The MobCell will maintain a {@link Cell} and a {@link StoreFileScanner} inside. Now, the mob cell + * is backend by NIO ByteBuffers which are allocated from ByteBuffAllocator, so we cannot just read + * the cell and close the MOB file scanner because the MOB file scanner closing will deallocate the + * NIO ByteBuffers, which resulting memory leak. + * <p> + * Actually, the right solution is: <br> + * 1. Read the normal cell; <br> + * 2. Parse the value of normal cell and get MOB fileName,offset,length; <br> + * 3. Open scanner to read the mob value; <br> + * 4. Construct the response cell whose key is from the normal cell and value is from the mob cell. + * <br> + * 5. Ship the response cell to HBase client. <br> + * 6. Release both normal cell's block and mob cell's block. <br> + * <p> + * For mob cell, the block releasing just means closing the the mob scanner, so here we need to keep + * the {@link StoreFileScanner} inside and close only when we're ensure that the MobCell has been + * shipped to RPC client. + */ [email protected] +public class MobCell implements Closeable { + + private final Cell cell; + private final StoreFileScanner sfScanner; + + public MobCell(Cell cell) { + this.cell = cell; + this.sfScanner = null; + } + + public MobCell(Cell cell, StoreFileScanner sfScanner) { + this.cell = cell; + this.sfScanner = sfScanner; + } + + public Cell getCell() { + return cell; + } + + @Override + public void close() throws IOException { + if (this.sfScanner != null) { + this.sfScanner.close(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java index 1d0d5ff..43abd39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mob; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -70,7 +71,7 @@ public class MobFile { * @return The cell in the mob file. * @throws IOException */ - public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException { + public MobCell readCell(Cell search, boolean cacheMobBlocks) throws IOException { return readCell(search, cacheMobBlocks, sf.getMaxMemStoreTS()); } @@ -82,26 +83,26 @@ public class MobFile { * @return The cell in the mob file. * @throws IOException */ - public Cell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException { - Cell result = null; + public MobCell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException { StoreFileScanner scanner = null; - List<HStoreFile> sfs = new ArrayList<>(); - sfs.add(sf); + boolean succ = false; try { - List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, - cacheMobBlocks, true, false, false, readPt); + List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles( + Collections.singletonList(sf), cacheMobBlocks, true, false, false, readPt); if (!sfScanners.isEmpty()) { scanner = sfScanners.get(0); if (scanner.seek(search)) { - result = scanner.peek(); + MobCell mobCell = new MobCell(scanner.peek(), scanner); + succ = true; + return mobCell; } } + return null; } finally { - if (scanner != null) { + if (scanner != null && !succ) { scanner.close(); } } - return result; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 596aa3d..b8ea960 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; @@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; +import org.apache.hadoop.hbase.mob.MobCell; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobFile; import org.apache.hadoop.hbase.mob.MobFileCache; @@ -298,14 +298,14 @@ public class HMobStore extends HStore { } /** - * Reads the cell from the mob file, and the read point does not count. - * This is used for DefaultMobStoreCompactor where we can read empty value for the missing cell. + * Reads the cell from the mob file, and the read point does not count. This is used for + * DefaultMobStoreCompactor where we can read empty value for the missing cell. * @param reference The cell found in the HBase, its value is a path to a mob file. * @param cacheBlocks Whether the scanner should cache blocks. * @return The cell found in the mob file. * @throws IOException */ - public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException { + public MobCell resolve(Cell reference, boolean cacheBlocks) throws IOException { return resolve(reference, cacheBlocks, -1, true); } @@ -314,14 +314,14 @@ public class HMobStore extends HStore { * @param reference The cell found in the HBase, its value is a path to a mob file. * @param cacheBlocks Whether the scanner should cache blocks. * @param readPt the read point. - * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is - * missing or corrupt. + * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is missing or + * corrupt. * @return The cell found in the mob file. * @throws IOException */ - public Cell resolve(Cell reference, boolean cacheBlocks, long readPt, - boolean readEmptyValueOnMobCellMiss) throws IOException { - Cell result = null; + public MobCell resolve(Cell reference, boolean cacheBlocks, long readPt, + boolean readEmptyValueOnMobCellMiss) throws IOException { + MobCell mobCell = null; if (MobUtils.hasValidMobRefCellValue(reference)) { String fileName = MobUtils.getMobFileName(reference); Tag tableNameTag = MobUtils.getTableNameTag(reference); @@ -336,35 +336,34 @@ public class HMobStore extends HStore { locations = new ArrayList<>(2); TableName tn = TableName.valueOf(tableNameString); locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString())); - locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils - .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString())); + locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, + MobUtils.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString())); map.put(tableNameString, locations); } } finally { keyLock.releaseLockEntry(lockEntry); } } - result = readCell(locations, fileName, reference, cacheBlocks, readPt, + mobCell = readCell(locations, fileName, reference, cacheBlocks, readPt, readEmptyValueOnMobCellMiss); } } - if (result == null) { + if (mobCell == null) { LOG.warn("The Cell result is null, assemble a new Cell with the same row,family," + "qualifier,timestamp,type and tags but with an empty value to return."); - result = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) - .setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength()) - .setFamily(reference.getFamilyArray(), reference.getFamilyOffset(), - reference.getFamilyLength()) - .setQualifier(reference.getQualifierArray(), - reference.getQualifierOffset(), reference.getQualifierLength()) - .setTimestamp(reference.getTimestamp()) - .setType(reference.getTypeByte()) - .setValue(HConstants.EMPTY_BYTE_ARRAY) - .setTags(reference.getTagsArray(), reference.getTagsOffset(), - reference.getTagsLength()) - .build(); + Cell cell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength()) + .setFamily(reference.getFamilyArray(), reference.getFamilyOffset(), + reference.getFamilyLength()) + .setQualifier(reference.getQualifierArray(), reference.getQualifierOffset(), + reference.getQualifierLength()) + .setTimestamp(reference.getTimestamp()).setType(reference.getTypeByte()) + .setValue(HConstants.EMPTY_BYTE_ARRAY) + .setTags(reference.getTagsArray(), reference.getTagsOffset(), reference.getTagsLength()) + .build(); + mobCell = new MobCell(cell); } - return result; + return mobCell; } /** @@ -383,8 +382,8 @@ public class HMobStore extends HStore { * @return The found cell. Null if there's no such a cell. * @throws IOException */ - private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks, - long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException { + private MobCell readCell(List<Path> locations, String fileName, Cell search, + boolean cacheMobBlocks, long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException { FileSystem fs = getFileSystem(); Throwable throwable = null; for (Path location : locations) { @@ -392,12 +391,8 @@ public class HMobStore extends HStore { Path path = new Path(location, fileName); try { file = mobFileCache.openFile(fs, path, cacheConf); - Cell cell = readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) + return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search, cacheMobBlocks); - // Now we will return blocks to allocator for mob cells before shipping to rpc client. - // it will be memory leak. so just copy cell as an on-heap KV here. will remove this in - // HBASE-22122 (TODO) - return KeyValueUtil.copyToNewKeyValue(cell); } catch (IOException e) { mobFileCache.evictFile(fileName); throwable = e; @@ -425,7 +420,7 @@ public class HMobStore extends HStore { } } LOG.error("The mob file " + fileName + " could not be found in the locations " + locations - + " or it is corrupt"); + + " or it is corrupt"); if (readEmptyValueOnMobCellMiss) { return null; } else if ((throwable instanceof FileNotFoundException) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java index b9f9af8..76144f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java @@ -19,13 +19,17 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobCell; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into @@ -34,10 +38,13 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class MobStoreScanner extends StoreScanner { + private static final Logger LOG = LoggerFactory.getLogger(MobStoreScanner.class); + private boolean cacheMobBlocks = false; private boolean rawMobScan = false; private boolean readEmptyValueOnMobCellMiss = false; private final HMobStore mobStore; + private final List<MobCell> referencedMobCells; public MobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns, long readPt) throws IOException { @@ -49,6 +56,7 @@ public class MobStoreScanner extends StoreScanner { throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); } mobStore = (HMobStore) store; + this.referencedMobCells = new ArrayList<>(); } /** @@ -69,11 +77,13 @@ public class MobStoreScanner extends StoreScanner { for (int i = 0; i < outResult.size(); i++) { Cell cell = outResult.get(i); if (MobUtils.isMobReferenceCell(cell)) { - Cell mobCell = mobStore - .resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss); + MobCell mobCell = + mobStore.resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss); mobKVCount++; - mobKVSize += mobCell.getValueLength(); - outResult.set(i, mobCell); + mobKVSize += mobCell.getCell().getValueLength(); + outResult.set(i, mobCell.getCell()); + // Keep the MobCell here unless we shipped the RPC or close the scanner. + referencedMobCells.add(mobCell); } } mobStore.updateMobScanCellsCount(mobKVCount); @@ -81,4 +91,27 @@ public class MobStoreScanner extends StoreScanner { } return result; } + + private void freeAllReferencedMobCells() throws IOException { + for (MobCell cell : referencedMobCells) { + cell.close(); + } + referencedMobCells.clear(); + } + + @Override + public void shipped() throws IOException { + super.shipped(); + this.freeAllReferencedMobCells(); + } + + @Override + public void close() { + super.close(); + try { + this.freeAllReferencedMobCells(); + } catch (IOException e) { + LOG.warn("Failed to free referenced mob cells: ", e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 1586f1c..2b8bba0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -291,7 +291,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; - protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled"; + public static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled"; // Request counter. (Includes requests that are not serviced by regions.) // Count only once for requests with multiple actions like multi/caching-scan/replayBatch diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java index d64c372..a3d779c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java @@ -19,26 +19,31 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.mob.MobCell; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support - * reversed scanning in both the memstore and the MOB store. - * + * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support reversed + * scanning in both the memstore and the MOB store. */ @InterfaceAudience.Private public class ReversedMobStoreScanner extends ReversedStoreScanner { + private static final Logger LOG = LoggerFactory.getLogger(ReversedMobStoreScanner.class); private boolean cacheMobBlocks = false; private boolean rawMobScan = false; private boolean readEmptyValueOnMobCellMiss = false; - protected final HMobStore mobStore; + private final HMobStore mobStore; + private final List<MobCell> referencedMobCells; ReversedMobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, long readPt) throws IOException { @@ -50,6 +55,7 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner { throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); } mobStore = (HMobStore) store; + this.referencedMobCells = new ArrayList<>(); } /** @@ -70,11 +76,13 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner { for (int i = 0; i < outResult.size(); i++) { Cell cell = outResult.get(i); if (MobUtils.isMobReferenceCell(cell)) { - Cell mobCell = mobStore - .resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss); + MobCell mobCell = + mobStore.resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss); mobKVCount++; - mobKVSize += mobCell.getValueLength(); - outResult.set(i, mobCell); + mobKVSize += mobCell.getCell().getValueLength(); + outResult.set(i, mobCell.getCell()); + // Keep the MobCell here unless we shipped the RPC or close the scanner. + referencedMobCells.add(mobCell); } } mobStore.updateMobScanCellsCount(mobKVCount); @@ -82,4 +90,27 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner { } return result; } + + private void freeAllReferencedMobCells() throws IOException { + for (MobCell mobCell : referencedMobCells) { + mobCell.close(); + } + referencedMobCells.clear(); + } + + @Override + public void shipped() throws IOException { + super.shipped(); + this.freeAllReferencedMobCells(); + } + + @Override + public void close() { + super.close(); + try { + this.freeAllReferencedMobCells(); + } catch (IOException e) { + LOG.warn("Failed to free referenced mob cells: ", e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index b5b853a..6e70c5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -97,18 +97,6 @@ public class StoreFileScanner implements KeyValueScanner { this.reader.incrementRefCount(); } - boolean isPrimaryReplica() { - return reader.isPrimaryReplicaReader(); - } - - /** - * Return an array of scanners corresponding to the given set of store files. - */ - public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, - boolean cacheBlocks, boolean usePread, long readPt) throws IOException { - return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt); - } - /** * Return an array of scanners corresponding to the given set of store files. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java index bb194b6..d274db3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java @@ -113,46 +113,45 @@ public class TestCachedMobFile { Path testDir = TEST_UTIL.getDataTestDir(); FileSystem fs = testDir.getFileSystem(conf); HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); - StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs) - .withOutputDir(testDir).withFileContext(meta).build(); + StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(testDir) + .withFileContext(meta).build(); String caseName = testName.getMethodName(); MobTestUtil.writeStoreFile(writer, caseName); CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf); byte[] family = Bytes.toBytes(caseName); byte[] qualify = Bytes.toBytes(caseName); // Test the start key - byte[] startKey = Bytes.toBytes("aa"); // The start key bytes + byte[] startKey = Bytes.toBytes("aa"); // The start key bytes KeyValue expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey); KeyValue seekKey = expectedKey.createKeyOnly(false); - Cell cell = cachedMobFile.readCell(seekKey, false); + Cell cell = cachedMobFile.readCell(seekKey, false).getCell(); MobTestUtil.assertCellEquals(expectedKey, cell); // Test the end key - byte[] endKey = Bytes.toBytes("zz"); // The end key bytes + byte[] endKey = Bytes.toBytes("zz"); // The end key bytes expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey); seekKey = expectedKey.createKeyOnly(false); - cell = cachedMobFile.readCell(seekKey, false); + cell = cachedMobFile.readCell(seekKey, false).getCell(); MobTestUtil.assertCellEquals(expectedKey, cell); // Test the random key byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2)); expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey); seekKey = expectedKey.createKeyOnly(false); - cell = cachedMobFile.readCell(seekKey, false); + cell = cachedMobFile.readCell(seekKey, false).getCell(); MobTestUtil.assertCellEquals(expectedKey, cell); // Test the key which is less than the start key byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa" expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey); seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey); - cell = cachedMobFile.readCell(seekKey, false); + cell = cachedMobFile.readCell(seekKey, false).getCell(); MobTestUtil.assertCellEquals(expectedKey, cell); // Test the key which is more than the end key byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz" seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey); - cell = cachedMobFile.readCell(seekKey, false); - Assert.assertNull(cell); + Assert.assertNull(cachedMobFile.readCell(seekKey, false)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java index c22ca98..297c19f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java @@ -43,8 +43,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Category(SmallTests.class) public class TestMobFile { @@ -53,7 +51,6 @@ public class TestMobFile { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMobFile.class); - static final Logger LOG = LoggerFactory.getLogger(TestMobFile.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private Configuration conf = TEST_UTIL.getConfiguration(); private CacheConfig cacheConf = new CacheConfig(conf); @@ -64,11 +61,9 @@ public class TestMobFile { public void testReadKeyValue() throws Exception { Path testDir = TEST_UTIL.getDataTestDir(); FileSystem fs = testDir.getFileSystem(conf); - HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build(); - StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs) - .withOutputDir(testDir) - .withFileContext(meta) - .build(); + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(testDir) + .withFileContext(meta).build(); String caseName = testName.getMethodName(); MobTestUtil.writeStoreFile(writer, caseName); @@ -78,39 +73,38 @@ public class TestMobFile { byte[] qualify = Bytes.toBytes(caseName); // Test the start key - byte[] startKey = Bytes.toBytes("aa"); // The start key bytes + byte[] startKey = Bytes.toBytes("aa"); // The start key bytes KeyValue expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey); KeyValue seekKey = expectedKey.createKeyOnly(false); - Cell cell = mobFile.readCell(seekKey, false); + Cell cell = mobFile.readCell(seekKey, false).getCell(); MobTestUtil.assertCellEquals(expectedKey, cell); // Test the end key - byte[] endKey = Bytes.toBytes("zz"); // The end key bytes + byte[] endKey = Bytes.toBytes("zz"); // The end key bytes expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey); seekKey = expectedKey.createKeyOnly(false); - cell = mobFile.readCell(seekKey, false); + cell = mobFile.readCell(seekKey, false).getCell(); MobTestUtil.assertCellEquals(expectedKey, cell); // Test the random key byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2)); expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey); seekKey = expectedKey.createKeyOnly(false); - cell = mobFile.readCell(seekKey, false); + cell = mobFile.readCell(seekKey, false).getCell(); MobTestUtil.assertCellEquals(expectedKey, cell); // Test the key which is less than the start key byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa" expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey); seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey); - cell = mobFile.readCell(seekKey, false); + cell = mobFile.readCell(seekKey, false).getCell(); MobTestUtil.assertCellEquals(expectedKey, cell); // Test the key which is more than the end key byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz" seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey); - cell = mobFile.readCell(seekKey, false); - assertNull(cell); + assertNull(mobFile.readCell(seekKey, false)); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java new file mode 100644 index 0000000..a527740 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mob; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the MOB feature when enable RPC ByteBuffAllocator (HBASE-22122) + */ +@Category({ MediumTests.class }) +public class TestMobWithByteBuffAllocator { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMobWithByteBuffAllocator.class); + + private static final String TABLE_NAME = "TestMobWithByteBuffAllocator"; + private static final Logger LOG = LoggerFactory.getLogger(TestMobWithByteBuffAllocator.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final Configuration CONF = UTIL.getConfiguration(); + private static final byte[] FAMILY = Bytes.toBytes("f"); + + @BeforeClass + public static void setUp() throws Exception { + // Must use the ByteBuffAllocator here + CONF.setBoolean(RSRpcServices.RESERVOIR_ENABLED_KEY, true); + // Must use OFF-HEAP BucketCache here. + CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.1f); + CONF.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); + // 32MB for BucketCache. + CONF.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 32); + CONF.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0); + UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testReadingCellsFromHFile() throws Exception { + TableName tableName = TableName.valueOf(TABLE_NAME); + MobSnapshotTestingUtils.createMobTable(UTIL, tableName, 1, FAMILY); + LOG.info("Create an mob table {} successfully.", tableName); + + int expectedRows = 500; + SnapshotTestingUtils.loadData(UTIL, tableName, expectedRows, FAMILY); + LOG.info("Load 500 rows data into table {} successfully.", tableName); + + // Flush all the data into HFiles. + try (Admin admin = UTIL.getConnection().getAdmin()) { + admin.flush(tableName); + } + + // Scan the rows + MobSnapshotTestingUtils.verifyMobRowCount(UTIL, tableName, expectedRows); + + // Reversed scan the rows + int rows = 0; + try (Table table = UTIL.getConnection().getTable(tableName)) { + try (ResultScanner scanner = table.getScanner(new Scan().setReversed(true))) { + for (Result res; (res = scanner.next()) != null;) { + rows++; + for (Cell cell : res.listCells()) { + Assert.assertTrue(CellUtil.cloneValue(cell).length > 0); + } + } + } + } + Assert.assertEquals(expectedRows, rows); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index bf1f18e..152ea87 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -448,17 +448,14 @@ public class TestHMobStore { String targetPathName = MobUtils.formatDate(currentDate); Path targetPath = new Path(store.getPath(), targetPathName); store.commitFile(mobFilePath, targetPath); - //resolve - Cell resultCell1 = store.resolve(seekKey1, false); - Cell resultCell2 = store.resolve(seekKey2, false); - Cell resultCell3 = store.resolve(seekKey3, false); - //compare - Assert.assertEquals(Bytes.toString(value), - Bytes.toString(CellUtil.cloneValue(resultCell1))); - Assert.assertEquals(Bytes.toString(value), - Bytes.toString(CellUtil.cloneValue(resultCell2))); - Assert.assertEquals(Bytes.toString(value2), - Bytes.toString(CellUtil.cloneValue(resultCell3))); + // resolve + Cell resultCell1 = store.resolve(seekKey1, false).getCell(); + Cell resultCell2 = store.resolve(seekKey2, false).getCell(); + Cell resultCell3 = store.resolve(seekKey3, false).getCell(); + // compare + Assert.assertEquals(Bytes.toString(value), Bytes.toString(CellUtil.cloneValue(resultCell1))); + Assert.assertEquals(Bytes.toString(value), Bytes.toString(CellUtil.cloneValue(resultCell2))); + Assert.assertEquals(Bytes.toString(value2), Bytes.toString(CellUtil.cloneValue(resultCell3))); } /**
