http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java new file mode 100644 index 0000000..b685115 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -0,0 +1,1743 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.lang.ref.SoftReference; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MemoryCompactionPolicy; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.util.Progressable; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; + +/** + * Test class for the Store + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestHStore { + private static final Log LOG = LogFactory.getLog(TestHStore.class); + @Rule + public TestName name = new TestName(); + + HStore store; + byte [] table = Bytes.toBytes("table"); + byte [] family = Bytes.toBytes("family"); + + byte [] row = Bytes.toBytes("row"); + byte [] row2 = Bytes.toBytes("row2"); + byte [] qf1 = Bytes.toBytes("qf1"); + byte [] qf2 = Bytes.toBytes("qf2"); + byte [] qf3 = Bytes.toBytes("qf3"); + byte [] qf4 = Bytes.toBytes("qf4"); + byte [] qf5 = Bytes.toBytes("qf5"); + byte [] qf6 = Bytes.toBytes("qf6"); + + NavigableSet<byte[]> qualifiers = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); + + List<Cell> expected = new ArrayList<>(); + List<Cell> result = new ArrayList<>(); + + long id = System.currentTimeMillis(); + Get get = new Get(row); + + private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString(); + + + /** + * Setup + * @throws IOException + */ + @Before + public void setUp() throws IOException { + qualifiers.add(qf1); + qualifiers.add(qf3); + qualifiers.add(qf5); + + Iterator<byte[]> iter = qualifiers.iterator(); + while(iter.hasNext()){ + byte [] next = iter.next(); + expected.add(new KeyValue(row, family, next, 1, (byte[])null)); + get.addColumn(family, next); + } + } + + private void init(String methodName) throws IOException { + init(methodName, TEST_UTIL.getConfiguration()); + } + + private Store init(String methodName, Configuration conf) throws IOException { + HColumnDescriptor hcd = new HColumnDescriptor(family); + // some of the tests write 4 versions and then flush + // (with HBASE-4241, lower versions are collected on flush) + hcd.setMaxVersions(4); + return init(methodName, conf, hcd); + } + + private HStore init(String methodName, Configuration conf, HColumnDescriptor hcd) + throws IOException { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); + return init(methodName, conf, htd, hcd); + } + + private HStore init(String methodName, Configuration conf, HTableDescriptor htd, + HColumnDescriptor hcd) throws IOException { + return init(methodName, conf, htd, hcd, null); + } + + @SuppressWarnings("deprecation") + private HStore init(String methodName, Configuration conf, HTableDescriptor htd, + HColumnDescriptor hcd, MyStoreHook hook) throws IOException { + return init(methodName, conf, htd, hcd, hook, false); + } + @SuppressWarnings("deprecation") + private HStore init(String methodName, Configuration conf, HTableDescriptor htd, + HColumnDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException { + //Setting up a Store + Path basedir = new Path(DIR+methodName); + Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); + final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName)); + + FileSystem fs = FileSystem.get(conf); + + fs.delete(logdir, true); + + if (htd.hasFamily(hcd.getName())) { + htd.modifyFamily(hcd); + } else { + htd.addFamily(hcd); + } + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); + HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); + final Configuration walConf = new Configuration(conf); + FSUtils.setRootDir(walConf, basedir); + final WALFactory wals = new WALFactory(walConf, null, methodName); + HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(), + info.getTable().getNamespace()), fs, conf, info, htd, null); + if (hook == null) { + store = new HStore(region, hcd, conf); + } else { + store = new MyStore(region, hcd, conf, hook, switchToPread); + } + return store; + } + + /** + * Test we do not lose data if we fail a flush and then close. + * Part of HBase-10466 + * @throws Exception + */ + @Test + public void testFlushSizeAccounting() throws Exception { + LOG.info("Setting up a faulty file system that cannot write in " + + this.name.getMethodName()); + final Configuration conf = HBaseConfiguration.create(); + // Only retry once. + conf.setInt("hbase.hstore.flush.retries.number", 1); + User user = User.createUserForTesting(conf, this.name.getMethodName(), + new String[]{"foo"}); + // Inject our faulty LocalFileSystem + conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); + user.runAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + // Make sure it worked (above is sensitive to caching details in hadoop core) + FileSystem fs = FileSystem.get(conf); + assertEquals(FaultyFileSystem.class, fs.getClass()); + FaultyFileSystem ffs = (FaultyFileSystem)fs; + + // Initialize region + init(name.getMethodName(), conf); + + MemstoreSize size = store.memstore.getFlushableSize(); + assertEquals(0, size.getDataSize()); + LOG.info("Adding some data"); + MemstoreSize kvSize = new MemstoreSize(); + store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); + // add the heap size of active (mutable) segment + kvSize.incMemstoreSize(0, MutableSegment.DEEP_OVERHEAD); + size = store.memstore.getFlushableSize(); + assertEquals(kvSize, size); + // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. + try { + LOG.info("Flushing"); + flushStore(store, id++); + fail("Didn't bubble up IOE!"); + } catch (IOException ioe) { + assertTrue(ioe.getMessage().contains("Fault injected")); + } + // due to snapshot, change mutable to immutable segment + kvSize.incMemstoreSize(0, + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD); + size = store.memstore.getFlushableSize(); + assertEquals(kvSize, size); + MemstoreSize kvSize2 = new MemstoreSize(); + store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2); + kvSize2.incMemstoreSize(0, MutableSegment.DEEP_OVERHEAD); + // Even though we add a new kv, we expect the flushable size to be 'same' since we have + // not yet cleared the snapshot -- the above flush failed. + assertEquals(kvSize, size); + ffs.fault.set(false); + flushStore(store, id++); + size = store.memstore.getFlushableSize(); + // Size should be the foreground kv size. + assertEquals(kvSize2, size); + flushStore(store, id++); + size = store.memstore.getFlushableSize(); + assertEquals(0, size.getDataSize()); + assertEquals(MutableSegment.DEEP_OVERHEAD, size.getHeapSize()); + return null; + } + }); + } + + /** + * Verify that compression and data block encoding are respected by the + * Store.createWriterInTmp() method, used on store flush. + */ + @Test + public void testCreateWriter() throws Exception { + Configuration conf = HBaseConfiguration.create(); + FileSystem fs = FileSystem.get(conf); + + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setCompressionType(Compression.Algorithm.GZ); + hcd.setDataBlockEncoding(DataBlockEncoding.DIFF); + init(name.getMethodName(), conf, hcd); + + // Test createWriterInTmp() + StoreFileWriter writer = store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false); + Path path = writer.getPath(); + writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); + writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); + writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3))); + writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4))); + writer.close(); + + // Verify that compression and encoding settings are respected + HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), true, conf); + assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm()); + assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); + reader.close(); + } + + @Test + public void testDeleteExpiredStoreFiles() throws Exception { + testDeleteExpiredStoreFiles(0); + testDeleteExpiredStoreFiles(1); + } + + /* + * @param minVersions the MIN_VERSIONS for the column family + */ + public void testDeleteExpiredStoreFiles(int minVersions) throws Exception { + int storeFileNum = 4; + int ttl = 4; + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + EnvironmentEdgeManagerTestHelper.injectEdge(edge); + + Configuration conf = HBaseConfiguration.create(); + // Enable the expired store file deletion + conf.setBoolean("hbase.store.delete.expired.storefile", true); + // Set the compaction threshold higher to avoid normal compactions. + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5); + + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMinVersions(minVersions); + hcd.setTimeToLive(ttl); + init(name.getMethodName() + "-" + minVersions, conf, hcd); + + long storeTtl = this.store.getScanInfo().getTtl(); + long sleepTime = storeTtl / storeFileNum; + long timeStamp; + // There are 4 store files and the max time stamp difference among these + // store files will be (this.store.ttl / storeFileNum) + for (int i = 1; i <= storeFileNum; i++) { + LOG.info("Adding some data for the store file #" + i); + timeStamp = EnvironmentEdgeManager.currentTime(); + this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), null); + this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), null); + this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), null); + flush(i); + edge.incrementTime(sleepTime); + } + + // Verify the total number of store files + assertEquals(storeFileNum, this.store.getStorefiles().size()); + + // Each call will find one expired store file and delete it before compaction happens. + // There will be no compaction due to threshold above. Last file will not be replaced. + for (int i = 1; i <= storeFileNum - 1; i++) { + // verify the expired store file. + assertFalse(this.store.requestCompaction().isPresent()); + Collection<HStoreFile> sfs = this.store.getStorefiles(); + // Ensure i files are gone. + if (minVersions == 0) { + assertEquals(storeFileNum - i, sfs.size()); + // Ensure only non-expired files remain. + for (HStoreFile sf : sfs) { + assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl)); + } + } else { + assertEquals(storeFileNum, sfs.size()); + } + // Let the next store file expired. + edge.incrementTime(sleepTime); + } + assertFalse(this.store.requestCompaction().isPresent()); + + Collection<HStoreFile> sfs = this.store.getStorefiles(); + // Assert the last expired file is not removed. + if (minVersions == 0) { + assertEquals(1, sfs.size()); + } + long ts = sfs.iterator().next().getReader().getMaxTimestamp(); + assertTrue(ts < (edge.currentTime() - storeTtl)); + + for (HStoreFile sf : sfs) { + sf.closeStoreFile(true); + } + } + + @Test + public void testLowestModificationTime() throws Exception { + Configuration conf = HBaseConfiguration.create(); + FileSystem fs = FileSystem.get(conf); + // Initialize region + init(name.getMethodName(), conf); + + int storeFileNum = 4; + for (int i = 1; i <= storeFileNum; i++) { + LOG.info("Adding some data for the store file #"+i); + this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), null); + flush(i); + } + // after flush; check the lowest time stamp + long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); + long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); + assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); + + // after compact; check the lowest time stamp + store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null); + lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); + lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); + assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); + } + + private static long getLowestTimeStampFromFS(FileSystem fs, + final Collection<HStoreFile> candidates) throws IOException { + long minTs = Long.MAX_VALUE; + if (candidates.isEmpty()) { + return minTs; + } + Path[] p = new Path[candidates.size()]; + int i = 0; + for (HStoreFile sf : candidates) { + p[i] = sf.getPath(); + ++i; + } + + FileStatus[] stats = fs.listStatus(p); + if (stats == null || stats.length == 0) { + return minTs; + } + for (FileStatus s : stats) { + minTs = Math.min(minTs, s.getModificationTime()); + } + return minTs; + } + + ////////////////////////////////////////////////////////////////////////////// + // Get tests + ////////////////////////////////////////////////////////////////////////////// + + private static final int BLOCKSIZE_SMALL = 8192; + /** + * Test for hbase-1686. + * @throws IOException + */ + @Test + public void testEmptyStoreFile() throws IOException { + init(this.name.getMethodName()); + // Write a store file. + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); + flush(1); + // Now put in place an empty store file. Its a little tricky. Have to + // do manually with hacked in sequence id. + HStoreFile f = this.store.getStorefiles().iterator().next(); + Path storedir = f.getPath().getParent(); + long seqid = f.getMaxSequenceId(); + Configuration c = HBaseConfiguration.create(); + FileSystem fs = FileSystem.get(c); + HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); + StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), + fs) + .withOutputDir(storedir) + .withFileContext(meta) + .build(); + w.appendMetadata(seqid + 1, false); + w.close(); + this.store.close(); + // Reopen it... should pick up two files + this.store = new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c); + assertEquals(2, this.store.getStorefilesCount()); + + result = HBaseTestingUtility.getFromStoreFile(store, + get.getRow(), + qualifiers); + assertEquals(1, result.size()); + } + + /** + * Getting data from memstore only + * @throws IOException + */ + @Test + public void testGet_FromMemStoreOnly() throws IOException { + init(this.name.getMethodName()); + + //Put data in memstore + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); + + //Get + result = HBaseTestingUtility.getFromStoreFile(store, + get.getRow(), qualifiers); + + //Compare + assertCheck(); + } + + /** + * Getting data from files only + * @throws IOException + */ + @Test + public void testGet_FromFilesOnly() throws IOException { + init(this.name.getMethodName()); + + //Put data in memstore + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); + //flush + flush(1); + + //Add more data + this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); + //flush + flush(2); + + //Add more data + this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); + //flush + flush(3); + + //Get + result = HBaseTestingUtility.getFromStoreFile(store, + get.getRow(), + qualifiers); + //this.store.get(get, qualifiers, result); + + //Need to sort the result since multiple files + Collections.sort(result, CellComparator.COMPARATOR); + + //Compare + assertCheck(); + } + + /** + * Getting data from memstore and files + * @throws IOException + */ + @Test + public void testGet_FromMemStoreAndFiles() throws IOException { + init(this.name.getMethodName()); + + //Put data in memstore + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); + //flush + flush(1); + + //Add more data + this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), null); + //flush + flush(2); + + //Add more data + this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), null); + this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), null); + + //Get + result = HBaseTestingUtility.getFromStoreFile(store, + get.getRow(), qualifiers); + + //Need to sort the result since multiple files + Collections.sort(result, CellComparator.COMPARATOR); + + //Compare + assertCheck(); + } + + private void flush(int storeFilessize) throws IOException{ + this.store.snapshot(); + flushStore(store, id++); + assertEquals(storeFilessize, this.store.getStorefiles().size()); + assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount()); + } + + private void assertCheck() { + assertEquals(expected.size(), result.size()); + for(int i=0; i<expected.size(); i++) { + assertEquals(expected.get(i), result.get(i)); + } + } + + @After + public void tearDown() throws Exception { + EnvironmentEdgeManagerTestHelper.reset(); + } + + @Test + public void testHandleErrorsInFlush() throws Exception { + LOG.info("Setting up a faulty file system that cannot write"); + + final Configuration conf = HBaseConfiguration.create(); + User user = User.createUserForTesting(conf, + "testhandleerrorsinflush", new String[]{"foo"}); + // Inject our faulty LocalFileSystem + conf.setClass("fs.file.impl", FaultyFileSystem.class, + FileSystem.class); + user.runAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + // Make sure it worked (above is sensitive to caching details in hadoop core) + FileSystem fs = FileSystem.get(conf); + assertEquals(FaultyFileSystem.class, fs.getClass()); + + // Initialize region + init(name.getMethodName(), conf); + + LOG.info("Adding some data"); + store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); + store.add(new KeyValue(row, family, qf2, 1, (byte[])null), null); + store.add(new KeyValue(row, family, qf3, 1, (byte[])null), null); + + LOG.info("Before flush, we should have no files"); + + Collection<StoreFileInfo> files = + store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); + assertEquals(0, files != null ? files.size() : 0); + + //flush + try { + LOG.info("Flushing"); + flush(1); + fail("Didn't bubble up IOE!"); + } catch (IOException ioe) { + assertTrue(ioe.getMessage().contains("Fault injected")); + } + + LOG.info("After failed flush, we should still have no files!"); + files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); + assertEquals(0, files != null ? files.size() : 0); + store.getHRegion().getWAL().close(); + return null; + } + }); + FileSystem.closeAllForUGI(user.getUGI()); + } + + /** + * Faulty file system that will fail if you write past its fault position the FIRST TIME + * only; thereafter it will succeed. Used by {@link TestHRegion} too. + */ + static class FaultyFileSystem extends FilterFileSystem { + List<SoftReference<FaultyOutputStream>> outStreams = new ArrayList<>(); + private long faultPos = 200; + AtomicBoolean fault = new AtomicBoolean(true); + + public FaultyFileSystem() { + super(new LocalFileSystem()); + System.err.println("Creating faulty!"); + } + + @Override + public FSDataOutputStream create(Path p) throws IOException { + return new FaultyOutputStream(super.create(p), faultPos, fault); + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + return new FaultyOutputStream(super.create(f, permission, + overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); + } + + @Override + public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, + int bufferSize, short replication, long blockSize, Progressable progress) + throws IOException { + // Fake it. Call create instead. The default implementation throws an IOE + // that this is not supported. + return create(f, overwrite, bufferSize, replication, blockSize, progress); + } + } + + static class FaultyOutputStream extends FSDataOutputStream { + volatile long faultPos = Long.MAX_VALUE; + private final AtomicBoolean fault; + + public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) + throws IOException { + super(out, null); + this.faultPos = faultPos; + this.fault = fault; + } + + @Override + public void write(byte[] buf, int offset, int length) throws IOException { + System.err.println("faulty stream write at pos " + getPos()); + injectFault(); + super.write(buf, offset, length); + } + + private void injectFault() throws IOException { + if (this.fault.get() && getPos() >= faultPos) { + throw new IOException("Fault injected"); + } + } + } + + private static void flushStore(HStore store, long id) throws IOException { + StoreFlushContext storeFlushCtx = store.createFlushContext(id); + storeFlushCtx.prepare(); + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + } + + /** + * Generate a list of KeyValues for testing based on given parameters + * @param timestamps + * @param numRows + * @param qualifier + * @param family + * @return + */ + List<Cell> getKeyValueSet(long[] timestamps, int numRows, + byte[] qualifier, byte[] family) { + List<Cell> kvList = new ArrayList<>(); + for (int i=1;i<=numRows;i++) { + byte[] b = Bytes.toBytes(i); + for (long timestamp: timestamps) { + kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); + } + } + return kvList; + } + + /** + * Test to ensure correctness when using Stores with multiple timestamps + * @throws IOException + */ + @Test + public void testMultipleTimestamps() throws IOException { + int numRows = 1; + long[] timestamps1 = new long[] {1,5,10,20}; + long[] timestamps2 = new long[] {30,80}; + + init(this.name.getMethodName()); + + List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); + for (Cell kv : kvList1) { + this.store.add(kv, null); + } + + this.store.snapshot(); + flushStore(store, id++); + + List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family); + for(Cell kv : kvList2) { + this.store.add(kv, null); + } + + List<Cell> result; + Get get = new Get(Bytes.toBytes(1)); + get.addColumn(family,qf1); + + get.setTimeRange(0,15); + result = HBaseTestingUtility.getFromStoreFile(store, get); + assertTrue(result.size()>0); + + get.setTimeRange(40,90); + result = HBaseTestingUtility.getFromStoreFile(store, get); + assertTrue(result.size()>0); + + get.setTimeRange(10,45); + result = HBaseTestingUtility.getFromStoreFile(store, get); + assertTrue(result.size()>0); + + get.setTimeRange(80,145); + result = HBaseTestingUtility.getFromStoreFile(store, get); + assertTrue(result.size()>0); + + get.setTimeRange(1,2); + result = HBaseTestingUtility.getFromStoreFile(store, get); + assertTrue(result.size()>0); + + get.setTimeRange(90,200); + result = HBaseTestingUtility.getFromStoreFile(store, get); + assertTrue(result.size()==0); + } + + /** + * Test for HBASE-3492 - Test split on empty colfam (no store files). + * + * @throws IOException When the IO operations fail. + */ + @Test + public void testSplitWithEmptyColFam() throws IOException { + init(this.name.getMethodName()); + assertFalse(store.getSplitPoint().isPresent()); + store.getHRegion().forceSplit(null); + assertFalse(store.getSplitPoint().isPresent()); + store.getHRegion().clearSplit(); + } + + @Test + public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { + final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; + long anyValue = 10; + + // We'll check that it uses correct config and propagates it appropriately by going thru + // the simplest "real" path I can find - "throttleCompaction", which just checks whether + // a number we pass in is higher than some config value, inside compactionPolicy. + Configuration conf = HBaseConfiguration.create(); + conf.setLong(CONFIG_KEY, anyValue); + init(name.getMethodName() + "-xml", conf); + assertTrue(store.throttleCompaction(anyValue + 1)); + assertFalse(store.throttleCompaction(anyValue)); + + // HTD overrides XML. + --anyValue; + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); + HColumnDescriptor hcd = new HColumnDescriptor(family); + htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue)); + init(name.getMethodName() + "-htd", conf, htd, hcd); + assertTrue(store.throttleCompaction(anyValue + 1)); + assertFalse(store.throttleCompaction(anyValue)); + + // HCD overrides them both. + --anyValue; + hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue)); + init(name.getMethodName() + "-hcd", conf, htd, hcd); + assertTrue(store.throttleCompaction(anyValue + 1)); + assertFalse(store.throttleCompaction(anyValue)); + } + + public static class DummyStoreEngine extends DefaultStoreEngine { + public static DefaultCompactor lastCreatedCompactor = null; + + @Override + protected void createComponents(Configuration conf, HStore store, CellComparator comparator) + throws IOException { + super.createComponents(conf, store, comparator); + lastCreatedCompactor = this.compactor; + } + } + + @Test + public void testStoreUsesSearchEngineOverride() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); + init(this.name.getMethodName(), conf); + assertEquals(DummyStoreEngine.lastCreatedCompactor, + this.store.storeEngine.getCompactor()); + } + + private void addStoreFile() throws IOException { + HStoreFile f = this.store.getStorefiles().iterator().next(); + Path storedir = f.getPath().getParent(); + long seqid = this.store.getMaxSequenceId(); + Configuration c = TEST_UTIL.getConfiguration(); + FileSystem fs = FileSystem.get(c); + HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); + StoreFileWriter w = new StoreFileWriter.Builder(c, new CacheConfig(c), + fs) + .withOutputDir(storedir) + .withFileContext(fileContext) + .build(); + w.appendMetadata(seqid + 1, false); + w.close(); + LOG.info("Added store file:" + w.getPath()); + } + + private void archiveStoreFile(int index) throws IOException { + Collection<HStoreFile> files = this.store.getStorefiles(); + HStoreFile sf = null; + Iterator<HStoreFile> it = files.iterator(); + for (int i = 0; i <= index; i++) { + sf = it.next(); + } + store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); + } + + private void closeCompactedFile(int index) throws IOException { + Collection<HStoreFile> files = + this.store.getStoreEngine().getStoreFileManager().getCompactedfiles(); + HStoreFile sf = null; + Iterator<HStoreFile> it = files.iterator(); + for (int i = 0; i <= index; i++) { + sf = it.next(); + } + sf.closeStoreFile(true); + store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf)); + } + + @Test + public void testRefreshStoreFiles() throws Exception { + init(name.getMethodName()); + + assertEquals(0, this.store.getStorefilesCount()); + + // Test refreshing store files when no store files are there + store.refreshStoreFiles(); + assertEquals(0, this.store.getStorefilesCount()); + + // add some data, flush + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); + flush(1); + assertEquals(1, this.store.getStorefilesCount()); + + // add one more file + addStoreFile(); + + assertEquals(1, this.store.getStorefilesCount()); + store.refreshStoreFiles(); + assertEquals(2, this.store.getStorefilesCount()); + + // add three more files + addStoreFile(); + addStoreFile(); + addStoreFile(); + + assertEquals(2, this.store.getStorefilesCount()); + store.refreshStoreFiles(); + assertEquals(5, this.store.getStorefilesCount()); + + closeCompactedFile(0); + archiveStoreFile(0); + + assertEquals(5, this.store.getStorefilesCount()); + store.refreshStoreFiles(); + assertEquals(4, this.store.getStorefilesCount()); + + archiveStoreFile(0); + archiveStoreFile(1); + archiveStoreFile(2); + + assertEquals(4, this.store.getStorefilesCount()); + store.refreshStoreFiles(); + assertEquals(1, this.store.getStorefilesCount()); + + archiveStoreFile(0); + store.refreshStoreFiles(); + assertEquals(0, this.store.getStorefilesCount()); + } + + @SuppressWarnings("unchecked") + @Test + public void testRefreshStoreFilesNotChanged() throws IOException { + init(name.getMethodName()); + + assertEquals(0, this.store.getStorefilesCount()); + + // add some data, flush + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null); + flush(1); + // add one more file + addStoreFile(); + + HStore spiedStore = spy(store); + + // call first time after files changed + spiedStore.refreshStoreFiles(); + assertEquals(2, this.store.getStorefilesCount()); + verify(spiedStore, times(1)).replaceStoreFiles(any(Collection.class), any(Collection.class)); + + // call second time + spiedStore.refreshStoreFiles(); + + //ensure that replaceStoreFiles is not called if files are not refreshed + verify(spiedStore, times(0)).replaceStoreFiles(null, null); + } + + private long countMemStoreScanner(StoreScanner scanner) { + if (scanner.currentScanners == null) { + return 0; + } + return scanner.currentScanners.stream() + .filter(s -> !s.isFileScanner()) + .count(); + } + + @Test + public void testNumberOfMemStoreScannersAfterFlush() throws IOException { + long seqId = 100; + long timestamp = System.currentTimeMillis(); + Cell cell0 = CellUtil.createCell(row, family, qf1, timestamp, + KeyValue.Type.Put.getCode(), qf1); + CellUtil.setSequenceId(cell0, seqId); + testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.EMPTY_LIST); + + Cell cell1 = CellUtil.createCell(row, family, qf2, timestamp, + KeyValue.Type.Put.getCode(), qf1); + CellUtil.setSequenceId(cell1, seqId); + testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1)); + + seqId = 101; + timestamp = System.currentTimeMillis(); + Cell cell2 = CellUtil.createCell(row2, family, qf2, timestamp, + KeyValue.Type.Put.getCode(), qf1); + CellUtil.setSequenceId(cell2, seqId); + testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2)); + } + + private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot, + List<Cell> inputCellsAfterSnapshot) throws IOException { + init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size()); + TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + long seqId = Long.MIN_VALUE; + for (Cell c : inputCellsBeforeSnapshot) { + quals.add(CellUtil.cloneQualifier(c)); + seqId = Math.max(seqId, c.getSequenceId()); + } + for (Cell c : inputCellsAfterSnapshot) { + quals.add(CellUtil.cloneQualifier(c)); + seqId = Math.max(seqId, c.getSequenceId()); + } + inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); + StoreFlushContext storeFlushCtx = store.createFlushContext(id++); + storeFlushCtx.prepare(); + inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); + int numberOfMemScannersWhenScaning = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; + try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) { + // snaptshot + active (if it isn't empty) + assertEquals(numberOfMemScannersWhenScaning, countMemStoreScanner(s)); + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + boolean more; + int cellCount = 0; + do { + List<Cell> cells = new ArrayList<>(); + more = s.next(cells); + cellCount += cells.size(); + assertEquals(more ? numberOfMemScannersWhenScaning : 0, countMemStoreScanner(s)); + } while (more); + assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size() + + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(), + inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount); + // the current scanners is cleared + assertEquals(0, countMemStoreScanner(s)); + } + } + + private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException { + Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value); + CellUtil.setSequenceId(c, sequenceId); + return c; + } + + private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) + throws IOException { + Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value); + CellUtil.setSequenceId(c, sequenceId); + return c; + } + + @Test + public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { + final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); + final int expectedSize = 3; + testFlushBeforeCompletingScan(new MyListHook() { + @Override + public void hook(int currentSize) { + if (currentSize == expectedSize - 1) { + try { + flushStore(store, id++); + timeToGoNextRow.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }, new FilterBase() { + @Override + public Filter.ReturnCode filterKeyValue(Cell v) throws IOException { + return ReturnCode.INCLUDE; + } + }, expectedSize); + } + + @Test + public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { + final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); + final int expectedSize = 2; + testFlushBeforeCompletingScan(new MyListHook() { + @Override + public void hook(int currentSize) { + if (currentSize == expectedSize - 1) { + try { + flushStore(store, id++); + timeToGoNextRow.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }, new FilterBase() { + @Override + public Filter.ReturnCode filterKeyValue(Cell v) throws IOException { + if (timeToGoNextRow.get()) { + timeToGoNextRow.set(false); + return ReturnCode.NEXT_ROW; + } else { + return ReturnCode.INCLUDE; + } + } + }, expectedSize); + } + + @Test + public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException { + final AtomicBoolean timeToGetHint = new AtomicBoolean(false); + final int expectedSize = 2; + testFlushBeforeCompletingScan(new MyListHook() { + @Override + public void hook(int currentSize) { + if (currentSize == expectedSize - 1) { + try { + flushStore(store, id++); + timeToGetHint.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }, new FilterBase() { + @Override + public Filter.ReturnCode filterKeyValue(Cell v) throws IOException { + if (timeToGetHint.get()) { + timeToGetHint.set(false); + return Filter.ReturnCode.SEEK_NEXT_USING_HINT; + } else { + return Filter.ReturnCode.INCLUDE; + } + } + @Override + public Cell getNextCellHint(Cell currentCell) throws IOException { + return currentCell; + } + }, expectedSize); + } + + private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize) + throws IOException, InterruptedException { + Configuration conf = HBaseConfiguration.create(); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMaxVersions(1); + byte[] r0 = Bytes.toBytes("row0"); + byte[] r1 = Bytes.toBytes("row1"); + byte[] r2 = Bytes.toBytes("row2"); + byte[] value0 = Bytes.toBytes("value0"); + byte[] value1 = Bytes.toBytes("value1"); + byte[] value2 = Bytes.toBytes("value2"); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + init(name.getMethodName(), conf, new HTableDescriptor(TableName.valueOf(table)), hcd, new MyStoreHook() { + @Override + public long getSmallestReadPoint(HStore store) { + return seqId + 3; + } + }); + // The cells having the value0 won't be flushed to disk because the value of max version is 1 + store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSize); + store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSize); + store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSize); + store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSize); + store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSize); + store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSize); + store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSize); + store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSize); + store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSize); + store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSize); + store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSize); + store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSize); + List<Cell> myList = new MyList<>(hook); + Scan scan = new Scan() + .withStartRow(r1) + .setFilter(filter); + try (InternalScanner scanner = (InternalScanner) store.getScanner( + scan, null, seqId + 3)){ + // r1 + scanner.next(myList); + assertEquals(expectedSize, myList.size()); + for (Cell c : myList) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value1) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value1)); + } + List<Cell> normalList = new ArrayList<>(3); + // r2 + scanner.next(normalList); + assertEquals(3, normalList.size()); + for (Cell c : normalList) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value2) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value2)); + } + } + } + + @Test + public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { + Configuration conf = HBaseConfiguration.create(); + conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName()); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); + init(name.getMethodName(), conf, hcd); + byte[] value = Bytes.toBytes("value"); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts, seqId, value), memStoreSize); + store.add(createCell(qf2, ts, seqId, value), memStoreSize); + store.add(createCell(qf3, ts, seqId, value), memStoreSize); + TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + StoreFlushContext storeFlushCtx = store.createFlushContext(id++); + MyCompactingMemStore.START_TEST.set(true); + Runnable flush = () -> { + // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) + // recreate the active memstore -- phase (4/5) + storeFlushCtx.prepare(); + }; + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(flush); + // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5) + // this is blocked until we recreate the active memstore -- phase (3/5) + // we get scanner from active memstore but it is empty -- phase (5/5) + InternalScanner scanner = (InternalScanner) store.getScanner( + new Scan(new Get(row)), quals, seqId + 1); + service.shutdown(); + service.awaitTermination(20, TimeUnit.SECONDS); + try { + try { + List<Cell> results = new ArrayList<>(); + scanner.next(results); + assertEquals(3, results.size()); + for (Cell c : results) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value)); + } + } finally { + scanner.close(); + } + } finally { + MyCompactingMemStore.START_TEST.set(false); + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + } + } + + @Test + public void testScanWithDoubleFlush() throws IOException { + Configuration conf = HBaseConfiguration.create(); + // Initialize region + MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook(){ + @Override + public void getScanners(MyStore store) throws IOException { + final long tmpId = id++; + ExecutorService s = Executors.newSingleThreadExecutor(); + s.submit(() -> { + try { + // flush the store before storescanner updates the scanners from store. + // The current data will be flushed into files, and the memstore will + // be clear. + // -- phase (4/4) + flushStore(store, tmpId); + }catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + s.shutdown(); + try { + // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. + s.awaitTermination(3, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + } + } + }); + byte[] oldValue = Bytes.toBytes("oldValue"); + byte[] currentValue = Bytes.toBytes("currentValue"); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + // older data whihc shouldn't be "seen" by client + myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSize); + myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSize); + myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSize); + long snapshotId = id++; + // push older data into snapshot -- phase (1/4) + StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId); + storeFlushCtx.prepare(); + + // insert current data into active -- phase (2/4) + myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSize); + myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSize); + myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSize); + TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + try (InternalScanner scanner = (InternalScanner) myStore.getScanner( + new Scan(new Get(row)), quals, seqId + 1)) { + // complete the flush -- phase (3/4) + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + + List<Cell> results = new ArrayList<>(); + scanner.next(results); + assertEquals(3, results.size()); + for (Cell c : results) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(currentValue) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, currentValue)); + } + } + } + + @Test + public void testReclaimChunkWhenScaning() throws IOException { + init("testReclaimChunkWhenScaning"); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + byte[] value = Bytes.toBytes("value"); + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts, seqId, value), null); + store.add(createCell(qf2, ts, seqId, value), null); + store.add(createCell(qf3, ts, seqId, value), null); + TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + try (InternalScanner scanner = (InternalScanner) store.getScanner( + new Scan(new Get(row)), quals, seqId)) { + List<Cell> results = new MyList<>(size -> { + switch (size) { + // 1) we get the first cell (qf1) + // 2) flush the data to have StoreScanner update inner scanners + // 3) the chunk will be reclaimed after updaing + case 1: + try { + flushStore(store, id++); + } catch (IOException e) { + throw new RuntimeException(e); + } + break; + // 1) we get the second cell (qf2) + // 2) add some cell to fill some byte into the chunk (we have only one chunk) + case 2: + try { + byte[] newValue = Bytes.toBytes("newValue"); + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); + store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); + store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); + } catch (IOException e) { + throw new RuntimeException(e); + } + break; + default: + break; + } + }); + scanner.next(results); + assertEquals(3, results.size()); + for (Cell c : results) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value)); + } + } + } + + /** + * If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable + * may change the versionedList. And the first InMemoryFlushRunnable will use the chagned + * versionedList to remove the corresponding segments. + * In short, there will be some segements which isn't in merge are removed. + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=30000) + public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException { + int flushSize = 500; + Configuration conf = HBaseConfiguration.create(); + conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName()); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize)); + // Set the lower threshold to invoke the "MERGE" policy + conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0)); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); + init(name.getMethodName(), conf, hcd); + byte[] value = Bytes.toBytes("thisisavarylargevalue"); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts, seqId, value), memStoreSize); + store.add(createCell(qf2, ts, seqId, value), memStoreSize); + store.add(createCell(qf3, ts, seqId, value), memStoreSize); + assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); + StoreFlushContext storeFlushCtx = store.createFlushContext(id++); + storeFlushCtx.prepare(); + // This shouldn't invoke another in-memory flush because the first compactor thread + // hasn't accomplished the in-memory compaction. + store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); + store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); + store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); + assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); + //okay. Let the compaction be completed + MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); + CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore; + while (mem.isMemStoreFlushingInMemory()) { + TimeUnit.SECONDS.sleep(1); + } + // This should invoke another in-memory flush. + store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); + store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); + store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); + assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + } + + private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) + throws IOException { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMaxVersions(5); + return (MyStore) init(methodName, conf, htd, hcd, hook); + } + + class MyStore extends HStore { + private final MyStoreHook hook; + + MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam, + MyStoreHook hook, boolean switchToPread) throws IOException { + super(region, family, confParam); + this.hook = hook; + } + + @Override + public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, + boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException { + hook.getScanners(this); + return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, + stopRow, false, readPt, includeMemstoreScanner); + } + + @Override + public long getSmallestReadPoint() { + return hook.getSmallestReadPoint(this); + } + } + + private abstract class MyStoreHook { + void getScanners(MyStore store) throws IOException { + } + long getSmallestReadPoint(HStore store) { + return store.getHRegion().getSmallestReadPoint(); + } + } + + @Test + public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { + int flushSize = 500; + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); + conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); + // Set the lower threshold to invoke the "MERGE" policy + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); + MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = System.currentTimeMillis(); + long seqID = 1l; + // Add some data to the region and do some flushes + for (int i = 1; i < 10; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + for (int i = 11; i < 20; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + for (int i = 21; i < 30; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + + assertEquals(3, store.getStorefilesCount()); + ScanInfo scanInfo = store.getScanInfo(); + Scan scan = new Scan(); + scan.addFamily(family); + Collection<HStoreFile> storefiles2 = store.getStorefiles(); + ArrayList<HStoreFile> actualStorefiles = Lists.newArrayList(storefiles2); + StoreScanner storeScanner = + (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); + // get the current heap + KeyValueHeap heap = storeScanner.heap; + // create more store files + for (int i = 31; i < 40; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + + for (int i = 41; i < 50; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + storefiles2 = store.getStorefiles(); + ArrayList<HStoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); + actualStorefiles1.removeAll(actualStorefiles); + // Do compaction + List<Exception> exceptions = new ArrayList<Exception>(); + MyThread thread = new MyThread(storeScanner); + thread.start(); + store.replaceStoreFiles(actualStorefiles, actualStorefiles1); + thread.join(); + KeyValueHeap heap2 = thread.getHeap(); + assertFalse(heap.equals(heap2)); + } + + private static class MyThread extends Thread { + private StoreScanner scanner; + private KeyValueHeap heap; + + public MyThread(StoreScanner scanner) { + this.scanner = scanner; + } + + public KeyValueHeap getHeap() { + return this.heap; + } + + public void run() { + scanner.trySwitchToStreamRead(); + heap = scanner.heap; + } + } + + private static class MyMemStoreCompactor extends MemStoreCompactor { + private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); + private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1); + public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy compactionPolicy) { + super(compactingMemStore, compactionPolicy); + } + + @Override + public boolean start() throws IOException { + boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; + boolean rval = super.start(); + if (isFirst) { + try { + START_COMPACTOR_LATCH.await(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + return rval; + } + } + + public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore { + private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); + public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparator c, + HStore store, RegionServicesForStores regionServices, + MemoryCompactionPolicy compactionPolicy) throws IOException { + super(conf, c, store, regionServices, compactionPolicy); + } + + @Override + protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) { + return new MyMemStoreCompactor(this, compactionPolicy); + } + + @Override + protected boolean shouldFlushInMemory() { + boolean rval = super.shouldFlushInMemory(); + if (rval) { + RUNNER_COUNT.incrementAndGet(); + } + return rval; + } + } + + public static class MyCompactingMemStore extends CompactingMemStore { + private static final AtomicBoolean START_TEST = new AtomicBoolean(false); + private final CountDownLatch getScannerLatch = new CountDownLatch(1); + private final CountDownLatch snapshotLatch = new CountDownLatch(1); + public MyCompactingMemStore(Configuration conf, CellComparator c, + HStore store, RegionServicesForStores regionServices, + MemoryCompactionPolicy compactionPolicy) throws IOException { + super(conf, c, store, regionServices, compactionPolicy); + } + + @Override + protected List<KeyValueScanner> createList(int capacity) { + if (START_TEST.get()) { + try { + getScannerLatch.countDown(); + snapshotLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return new ArrayList<>(capacity); + } + @Override + protected void pushActiveToPipeline(MutableSegment active) { + if (START_TEST.get()) { + try { + getScannerLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + super.pushActiveToPipeline(active); + if (START_TEST.get()) { + snapshotLatch.countDown(); + } + } + } + + interface MyListHook { + void hook(int currentSize); + } + + private static class MyList<T> implements List<T> { + private final List<T> delegatee = new ArrayList<>(); + private final MyListHook hookAtAdd; + MyList(final MyListHook hookAtAdd) { + this.hookAtAdd = hookAtAdd; + } + @Override + public int size() {return delegatee.size();} + + @Override + public boolean isEmpty() {return delegatee.isEmpty();} + + @Override + public boolean contains(Object o) {return delegatee.contains(o);} + + @Override + public Iterator<T> iterator() {return delegatee.iterator();} + + @Override + public Object[] toArray() {return delegatee.toArray();} + + @Override + public <T> T[] toArray(T[] a) {return delegatee.toArray(a);} + + @Override + public boolean add(T e) { + hookAtAdd.hook(size()); + return delegatee.add(e); + } + + @Override + public boolean remove(Object o) {return delegatee.remove(o);} + + @Override + public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);} + + @Override + public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);} + + @Override + public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);} + + @Override + public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);} + + @Override + public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);} + + @Override + public void clear() {delegatee.clear();} + + @Override + public T get(int index) {return delegatee.get(index);} + + @Override + public T set(int index, T element) {return delegatee.set(index, element);} + + @Override + public void add(int index, T element) {delegatee.add(index, element);} + + @Override + public T remove(int index) {return delegatee.remove(index);} + + @Override + public int indexOf(Object o) {return delegatee.indexOf(o);} + + @Override + public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} + + @Override + public ListIterator<T> listIterator() {return delegatee.listIterator();} + + @Override + public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);} + + @Override + public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index e74e939..b20cae8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -176,11 +176,9 @@ public class TestHStoreFile extends HBaseTestCase { // Split on a row, not in middle of row. Midkey returned by reader // may be in middle of row. Create new one with empty column and // timestamp. - Cell kv = reader.midkey(); - byte [] midRow = CellUtil.cloneRow(kv); - kv = reader.getLastKey(); - byte [] finalRow = CellUtil.cloneRow(kv); - hsf.closeReader(true); + byte [] midRow = CellUtil.cloneRow(reader.midKey().get()); + byte [] finalRow = CellUtil.cloneRow(reader.getLastKey().get()); + hsf.closeStoreFile(true); // Make a reference HRegionInfo splitHri = new HRegionInfo(hri.getTable(), null, midRow); @@ -190,7 +188,8 @@ public class TestHStoreFile extends HBaseTestCase { // Now confirm that I can read from the reference and that it only gets // keys from top half of the file. HFileScanner s = refHsf.getReader().getScanner(false, false); - for(boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) { + Cell kv = null; + for (boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) { ByteBuffer bb = ByteBuffer.wrap(((KeyValue) s.getKey()).getKey()); kv = KeyValueUtil.createKeyValueFromKey(bb); if (first) { @@ -301,7 +300,7 @@ public class TestHStoreFile extends HBaseTestCase { f.initReader(); Path pathA = splitStoreFile(cloneRegionFs, splitHriA, TEST_FAMILY, f, SPLITKEY, true); // top Path pathB = splitStoreFile(cloneRegionFs, splitHriB, TEST_FAMILY, f, SPLITKEY, false);// bottom - f.closeReader(true); + f.closeStoreFile(true); // OK test the thing FSUtils.logFileSystemState(fs, testDir, LOG); @@ -342,7 +341,7 @@ public class TestHStoreFile extends HBaseTestCase { private void checkHalfHFile(final HRegionFileSystem regionFs, final HStoreFile f) throws IOException { f.initReader(); - Cell midkey = f.getReader().midkey(); + Cell midkey = f.getReader().midKey().get(); KeyValue midKV = (KeyValue)midkey; byte [] midRow = CellUtil.cloneRow(midKV); // Create top split. http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index 707540a..0c33bdb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -84,7 +84,7 @@ public class TestMajorCompaction { private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); protected Configuration conf = UTIL.getConfiguration(); - private Region r = null; + private HRegion r = null; private HTableDescriptor htd = null; private static final byte [] COLUMN_FAMILY = fam1; private final byte [] STARTROW = Bytes.toBytes(START_KEY); @@ -328,7 +328,7 @@ public class TestMajorCompaction { // ensure that major compaction time is deterministic RatioBasedCompactionPolicy c = (RatioBasedCompactionPolicy)s.storeEngine.getCompactionPolicy(); - Collection<StoreFile> storeFiles = s.getStorefiles(); + Collection<HStoreFile> storeFiles = s.getStorefiles(); long mcTime = c.getNextMajorCompactTime(storeFiles); for (int i = 0; i < 10; ++i) { assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles)); @@ -358,7 +358,7 @@ public class TestMajorCompaction { private void verifyCounts(int countRow1, int countRow2) throws Exception { int count1 = 0; int count2 = 0; - for (StoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) { + for (HStoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) { HFileScanner scanner = f.getReader().getScanner(false, false); scanner.seekTo(); do { @@ -377,7 +377,7 @@ public class TestMajorCompaction { private int count() throws IOException { int count = 0; - for (StoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) { + for (HStoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) { HFileScanner scanner = f.getReader().getScanner(false, false); if (!scanner.seekTo()) { continue; http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java index 9ab1440..c08bd71 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -318,10 +320,10 @@ public class TestMobStoreCompaction { if (fs.exists(mobDirPath)) { FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath); for (FileStatus file : files) { - StoreFile sf = new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true); + HStoreFile sf = new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true); sf.initReader(); Map<byte[], byte[]> fileInfo = sf.getReader().loadFileInfo(); - byte[] count = fileInfo.get(StoreFile.MOB_CELLS_COUNT); + byte[] count = fileInfo.get(MOB_CELLS_COUNT); assertTrue(count != null); mobCellsCount += Bytes.toLong(count); } @@ -349,7 +351,7 @@ public class TestMobStoreCompaction { Bytes.toBytes("colX"), now, dummyData); writer.append(kv); } finally { - writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); + writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); writer.close(); } } @@ -428,20 +430,20 @@ public class TestMobStoreCompaction { copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); CacheConfig cacheConfig = new CacheConfig(copyOfConf); Path mobDirPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(), hcd.getNameAsString()); - List<StoreFile> sfs = new ArrayList<>(); + List<HStoreFile> sfs = new ArrayList<>(); int numDelfiles = 0; int size = 0; if (fs.exists(mobDirPath)) { for (FileStatus f : fs.listStatus(mobDirPath)) { - StoreFile sf = new HStoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE, true); + HStoreFile sf = new HStoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE, true); sfs.add(sf); if (StoreFileInfo.isDelFile(sf.getPath())) { numDelfiles++; } } - List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, false, - HConstants.LATEST_TIMESTAMP); + List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, + false, false, HConstants.LATEST_TIMESTAMP); long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); long ttl = HStore.determineTTLFromFamily(hcd); ScanInfo scanInfo = new ScanInfo(copyOfConf, hcd, ttl, timeToPurgeDeletes, http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java index 72a968c..86fe5af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -45,9 +45,6 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -55,9 +52,17 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.util.StringUtils; -import org.junit.*; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole * cluster. See {@link TestRegionServerNoMaster}. @@ -472,7 +477,7 @@ public class TestRegionReplicas { // should be able to deal with it giving us all the result we expect. int keys = 0; int sum = 0; - for (StoreFile sf: secondaryRegion.getStore(f).getStorefiles()) { + for (HStoreFile sf : ((HStore) secondaryRegion.getStore(f)).getStorefiles()) { // Our file does not exist anymore. was moved by the compaction above. LOG.debug(getRS().getFileSystem().exists(sf.getPath())); Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath())); http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java index cad060e..06c0bfd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionSplitPolicy.java @@ -25,13 +25,14 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -246,7 +247,7 @@ public class TestRegionSplitPolicy { HStore mockStore = Mockito.mock(HStore.class); Mockito.doReturn(2000L).when(mockStore).getSize(); Mockito.doReturn(true).when(mockStore).canSplit(); - Mockito.doReturn(Bytes.toBytes("abcd")).when(mockStore).getSplitPoint(); + Mockito.doReturn(Optional.of(Bytes.toBytes("abcd"))).when(mockStore).getSplitPoint(); stores.add(mockStore); KeyPrefixRegionSplitPolicy policy = (KeyPrefixRegionSplitPolicy) RegionSplitPolicy @@ -322,8 +323,7 @@ public class TestRegionSplitPolicy { HStore mockStore = Mockito.mock(HStore.class); Mockito.doReturn(2000L).when(mockStore).getSize(); Mockito.doReturn(true).when(mockStore).canSplit(); - Mockito.doReturn(Bytes.toBytes("store 1 split")) - .when(mockStore).getSplitPoint(); + Mockito.doReturn(Optional.of(Bytes.toBytes("store 1 split"))).when(mockStore).getSplitPoint(); stores.add(mockStore); assertEquals("store 1 split", @@ -333,8 +333,7 @@ public class TestRegionSplitPolicy { HStore mockStore2 = Mockito.mock(HStore.class); Mockito.doReturn(4000L).when(mockStore2).getSize(); Mockito.doReturn(true).when(mockStore2).canSplit(); - Mockito.doReturn(Bytes.toBytes("store 2 split")) - .when(mockStore2).getSplitPoint(); + Mockito.doReturn(Optional.of(Bytes.toBytes("store 2 split"))).when(mockStore2).getSplitPoint(); stores.add(mockStore2); assertEquals("store 2 split", @@ -355,7 +354,7 @@ public class TestRegionSplitPolicy { HStore mockStore = Mockito.mock(HStore.class); Mockito.doReturn(2000L).when(mockStore).getSize(); Mockito.doReturn(true).when(mockStore).canSplit(); - Mockito.doReturn(Bytes.toBytes("ab,cd")).when(mockStore).getSplitPoint(); + Mockito.doReturn(Optional.of(Bytes.toBytes("ab,cd"))).when(mockStore).getSplitPoint(); stores.add(mockStore); DelimitedKeyPrefixRegionSplitPolicy policy = (DelimitedKeyPrefixRegionSplitPolicy) RegionSplitPolicy http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index 8b34a2f..dbf3be0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -113,7 +113,7 @@ public class TestReversibleScanners { .withFileContext(hFileContext).build(); writeStoreFile(writer); - StoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf, + HStoreFile sf = new HStoreFile(fs, writer.getPath(), TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, true); List<StoreFileScanner> scanners = StoreFileScanner @@ -167,10 +167,10 @@ public class TestReversibleScanners { writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 }); - StoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf, + HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, true); - StoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf, + HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, true); /** * Test without MVCC @@ -257,10 +257,10 @@ public class TestReversibleScanners { writeMemstoreAndStoreFiles(memstore, new StoreFileWriter[] { writer1, writer2 }); - StoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf, + HStoreFile sf1 = new HStoreFile(fs, writer1.getPath(), TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, true); - StoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf, + HStoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, true); ScanInfo scanInfo = @@ -418,19 +418,15 @@ public class TestReversibleScanners { verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); } - private StoreScanner getReversibleStoreScanner(MemStore memstore, - StoreFile sf1, StoreFile sf2, Scan scan, - ScanInfo scanInfo, int readPoint) throws IOException { - List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, - false, readPoint); + private StoreScanner getReversibleStoreScanner(MemStore memstore, HStoreFile sf1, HStoreFile sf2, + Scan scan, ScanInfo scanInfo, int readPoint) throws IOException { + List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint); NavigableSet<byte[]> columns = null; - for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap() - .entrySet()) { + for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { // Should only one family columns = entry.getValue(); } - StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, - columns, scanners); + StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, columns, scanners); return storeScanner; } @@ -487,22 +483,17 @@ public class TestReversibleScanners { assertEquals(null, kvHeap.peek()); } - private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, - StoreFile sf1, StoreFile sf2, byte[] startRow, int readPoint) - throws IOException { - List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, - true, readPoint); - ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, - CellComparator.COMPARATOR); + private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, HStoreFile sf1, + HStoreFile sf2, byte[] startRow, int readPoint) throws IOException { + List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow, true, readPoint); + ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, CellComparator.COMPARATOR); return kvHeap; } - private List<KeyValueScanner> getScanners(MemStore memstore, StoreFile sf1, - StoreFile sf2, byte[] startRow, boolean doSeek, int readPoint) - throws IOException { - List<StoreFileScanner> fileScanners = StoreFileScanner - .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, - false, false, readPoint); + private List<KeyValueScanner> getScanners(MemStore memstore, HStoreFile sf1, HStoreFile sf2, + byte[] startRow, boolean doSeek, int readPoint) throws IOException { + List<StoreFileScanner> fileScanners = StoreFileScanner.getScannersForStoreFiles( + Lists.newArrayList(sf1, sf2), false, true, false, false, readPoint); List<KeyValueScanner> memScanners = memstore.getScanners(readPoint); List<KeyValueScanner> scanners = new ArrayList<>(fileScanners.size() + 1); scanners.addAll(fileScanners); http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index 5dce4ad..0c014fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; + import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -171,10 +174,10 @@ public class TestScannerWithBulkload { // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file. // Scan should only look at the seq id appended at the bulk load time, and not skip its // kv. - writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999))); + writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999))); } else { - writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); + writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); } writer.close(); return hfilePath;