Repository: hbase Updated Branches: refs/heads/master fcdf96a0e -> e2cef8aa8
HBASE-18752 Recalculate the TimeRange in flushing snapshot to store file Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e2cef8aa Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e2cef8aa Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e2cef8aa Branch: refs/heads/master Commit: e2cef8aa805478feb7752fab738ee997e2bf374f Parents: fcdf96a Author: Chia-Ping Tsai <[email protected]> Authored: Thu Oct 5 22:17:16 2017 +0800 Committer: Chia-Ping Tsai <[email protected]> Committed: Mon Oct 9 17:22:29 2017 +0800 ---------------------------------------------------------------------- .../hbase/mob/DefaultMobStoreFlusher.java | 2 +- .../hbase/regionserver/DefaultStoreFlusher.java | 3 +- .../hadoop/hbase/regionserver/HStore.java | 20 +------ .../hbase/regionserver/StoreFileWriter.java | 59 ++------------------ .../hbase/regionserver/StripeStoreFlusher.java | 9 +-- .../hadoop/hbase/regionserver/TestHStore.java | 31 ++++++++++ 6 files changed, 42 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e2cef8aa/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index bef73f2..5b49862 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -119,7 +119,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(), - false, true, true, false, snapshot.getTimeRangeTracker()); + false, true, true, false); IOException e = null; try { // It's a mob store, flush the cells in a mob way. This is the difference of flushing http://git-wip-us.apache.org/repos/asf/hbase/blob/e2cef8aa/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index a5dd9f7..2e907e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -68,8 +68,7 @@ public class DefaultStoreFlusher extends StoreFlusher { /* isCompaction = */ false, /* includeMVCCReadpoint = */ true, /* includesTags = */ snapshot.isTagsPresent(), - /* shouldDropBehind = */ false, - snapshot.getTimeRangeTracker()); + /* shouldDropBehind = */ false); IOException e = null; try { performFlush(scanner, writer, smallestReadPoint, throughputController); http://git-wip-us.apache.org/repos/asf/hbase/blob/e2cef8aa/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 92171d3..d8e82bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1034,26 +1034,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat * @param includesTag - includesTag or not * @return Writer for a new StoreFile in the tmp dir. */ - public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, - boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, - boolean shouldDropBehind) throws IOException { - return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint, - includesTag, shouldDropBehind, null); - } - - /** - * @param maxKeyCount - * @param compression Compression algorithm to use - * @param isCompaction whether we are creating a new file in a compaction - * @param includeMVCCReadpoint - whether to include MVCC or not - * @param includesTag - includesTag or not - * @return Writer for a new StoreFile in the tmp dir. - */ // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of // compaction public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, - boolean shouldDropBehind, TimeRangeTracker trt) throws IOException { + boolean shouldDropBehind) throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { // Don't cache data on write on compactions. @@ -1079,9 +1064,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat .withFavoredNodes(favoredNodes) .withFileContext(hFileContext) .withShouldDropCacheBehind(shouldDropBehind); - if (trt != null) { - builder.withTimeRangeTracker(trt); - } return builder.build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/e2cef8aa/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index b2da46f..6ff6997 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -71,40 +71,10 @@ public class StoreFileWriter implements CellSink, ShipperListener { private long deleteFamilyCnt = 0; private BloomContext bloomContext = null; private BloomContext deleteFamilyBloomContext = null; - - /** - * timeRangeTrackerSet is used to figure if we were passed a filled-out TimeRangeTracker or not. - * When flushing a memstore, we set the TimeRangeTracker that it accumulated during updates to - * memstore in here into this Writer and use this variable to indicate that we do not need to - * recalculate the timeRangeTracker bounds; it was done already as part of add-to-memstore. - * A completed TimeRangeTracker is not set in cases of compactions when it is recalculated. - */ - private final boolean timeRangeTrackerSet; - final TimeRangeTracker timeRangeTracker; + private final TimeRangeTracker timeRangeTracker; protected HFile.Writer writer; - /** - * Creates an HFile.Writer that also write helpful meta data. - * @param fs file system to write to - * @param path file name to create - * @param conf user configuration - * @param comparator key comparator - * @param bloomType bloom filter setting - * @param maxKeys the expected maximum number of keys to be added. Was used - * for Bloom filter size in {@link HFile} format version 1. - * @param fileContext - The HFile context - * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. - * @throws IOException problem writing to FS - */ - StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf, - final CellComparator comparator, BloomType bloomType, long maxKeys, - InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind) - throws IOException { - this(fs, path, conf, cacheConf, comparator, bloomType, maxKeys, favoredNodes, fileContext, - shouldDropCacheBehind, null); - } - /** * Creates an HFile.Writer that also write helpful meta data. * @param fs file system to write to @@ -117,7 +87,6 @@ public class StoreFileWriter implements CellSink, ShipperListener { * @param favoredNodes * @param fileContext - The HFile context * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. - * @param trt Ready-made timetracker to use. * @throws IOException problem writing to FS */ private StoreFileWriter(FileSystem fs, Path path, @@ -125,13 +94,9 @@ public class StoreFileWriter implements CellSink, ShipperListener { CacheConfig cacheConf, final CellComparator comparator, BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext, - boolean shouldDropCacheBehind, final TimeRangeTracker trt) + boolean shouldDropCacheBehind) throws IOException { - // If passed a TimeRangeTracker, use it. Set timeRangeTrackerSet so we don't destroy it. - // TODO: put the state of the TRT on the TRT; i.e. make a read-only version (TimeRange) when - // it no longer writable. - this.timeRangeTrackerSet = trt != null; - this.timeRangeTracker = this.timeRangeTrackerSet? trt: TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); // TODO : Change all writers to be specifically created for compaction context writer = HFile.getWriterFactory(conf, cacheConf) .withPath(fs, path) @@ -232,9 +197,7 @@ public class StoreFileWriter implements CellSink, ShipperListener { if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); } - if (!timeRangeTrackerSet) { - timeRangeTracker.includeTimestamp(cell); - } + timeRangeTracker.includeTimestamp(cell); } private void appendGeneralBloomfilter(final Cell cell) throws IOException { @@ -389,7 +352,6 @@ public class StoreFileWriter implements CellSink, ShipperListener { private Path filePath; private InetSocketAddress[] favoredNodes; private HFileContext fileContext; - private TimeRangeTracker trt; private boolean shouldDropCacheBehind; public Builder(Configuration conf, CacheConfig cacheConf, @@ -409,17 +371,6 @@ public class StoreFileWriter implements CellSink, ShipperListener { } /** - * @param trt A premade TimeRangeTracker to use rather than build one per append (building one - * of these is expensive so good to pass one in if you have one). - * @return this (for chained invocation) - */ - public Builder withTimeRangeTracker(final TimeRangeTracker trt) { - Preconditions.checkNotNull(trt); - this.trt = trt; - return this; - } - - /** * Use either this method or {@link #withFilePath}, but not both. * @param dir Path to column family directory. The directory is created if * does not exist. The file is given a unique name within this @@ -523,7 +474,7 @@ public class StoreFileWriter implements CellSink, ShipperListener { } return new StoreFileWriter(fs, filePath, conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext, - shouldDropCacheBehind, trt); + shouldDropCacheBehind); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e2cef8aa/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index bc4d624..c858f8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -75,8 +75,7 @@ public class StripeStoreFlusher extends StoreFlusher { StripeMultiFileWriter mw = null; try { mw = req.createWriter(); // Writer according to the policy. - StripeMultiFileWriter.WriterFactory factory = createWriterFactory( - snapshot.getTimeRangeTracker(), cellsCount); + StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount); StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; mw.init(storeScanner, factory); @@ -104,8 +103,7 @@ public class StripeStoreFlusher extends StoreFlusher { return result; } - private StripeMultiFileWriter.WriterFactory createWriterFactory( - final TimeRangeTracker tracker, final long kvCount) { + private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) { return new StripeMultiFileWriter.WriterFactory() { @Override public StoreFileWriter createWriter() throws IOException { @@ -114,8 +112,7 @@ public class StripeStoreFlusher extends StoreFlusher { /* isCompaction = */ false, /* includeMVCCReadpoint = */ true, /* includesTags = */ true, - /* shouldDropBehind = */ false, - tracker); + /* shouldDropBehind = */ false); return writer; } }; http://git-wip-us.apache.org/repos/asf/hbase/blob/e2cef8aa/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index d54543e..86beb6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -515,6 +515,37 @@ public class TestHStore { assertCheck(); } + @Test + public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException { + testTimeRangeIfSomeCellsAreDroppedInFlush(1); + testTimeRangeIfSomeCellsAreDroppedInFlush(3); + testTimeRangeIfSomeCellsAreDroppedInFlush(5); + } + + private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException { + init(this.name.getMethodName(), TEST_UTIL.getConfiguration(), + ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build()); + long currentTs = 100; + long minTs = currentTs; + // the extra cell won't be flushed to disk, + // so the min of timerange will be different between memStore and hfile. + for (int i = 0; i != (maxVersion + 1); ++i) { + this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null); + if (i == 1) { + minTs = currentTs; + } + } + flushStore(store, id++); + + Collection<HStoreFile> files = store.getStorefiles(); + assertEquals(1, files.size()); + HStoreFile f = files.iterator().next(); + f.initReader(); + StoreFileReader reader = f.getReader(); + assertEquals(minTs, reader.timeRange.getMin()); + assertEquals(currentTs, reader.timeRange.getMax()); + } + /** * Getting data from files only * @throws IOException
