This is an automated email from the ASF dual-hosted git repository. wchevreuil pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit b704348dac2269cc630ac854e7449a386273bb75 Author: Wellington Ramos Chevreuil <wchevre...@apache.org> AuthorDate: Tue Jul 15 11:48:35 2025 +0100 HBASE-29427 Merge all commits related to custom tiering into the feature branch (#7124) This is the whole custom tiering implementation and involves the following individual works: * HBASE-29412 Extend date tiered compaction to allow for tiering by values other than cell timestamp * HBASE-29413 Implement a custom qualifier tiered compaction * HBASE-29414 Refactor DataTieringManager to make priority logic pluggable * HBASE-29422 Implement selectMinorCompation in CustomCellDateTieredCompactionPolicy * HBASE-29424 Implement configuration validation for custom tiering compactions * HBASE-29425 Refine and polish code * HBASE-29426 Define a tiering value provider and refactor custom tiered compaction related classes * HBASE-28463 Rebase time based priority branch (HBASE-28463) with latest master (and fix conflicts) Co-authored-by: Janardhan Hungund <janardhan.hung...@cloudera.com> Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org> Change-Id: I5dc170e31e47b297e1d81f07d625f680c5e0d848 --- .../main/java/org/apache/hadoop/hbase/TagType.java | 2 + .../apache/hadoop/hbase/io/hfile/BlockCache.java | 9 +- .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 3 +- .../hadoop/hbase/io/hfile/CombinedBlockCache.java | 7 +- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 7 + .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 4 +- .../hadoop/hbase/io/hfile/HFileWriterImpl.java | 20 +- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 10 +- .../master/procedure/CreateTableProcedure.java | 3 + .../master/procedure/ModifyTableProcedure.java | 2 + .../hadoop/hbase/regionserver/CellTSTiering.java | 57 +++++ .../regionserver/CustomTieredStoreEngine.java | 56 +++++ .../hadoop/hbase/regionserver/CustomTiering.java | 58 +++++ .../regionserver/CustomTieringMultiFileWriter.java | 85 +++++++ .../{DataTieringType.java => DataTiering.java} | 11 +- .../hbase/regionserver/DataTieringManager.java | 98 ++------ .../hadoop/hbase/regionserver/DataTieringType.java | 15 +- .../regionserver/DateTieredMultiFileWriter.java | 20 +- .../hbase/regionserver/DateTieredStoreEngine.java | 17 +- .../hadoop/hbase/regionserver/HRegionServer.java | 4 +- .../hadoop/hbase/regionserver/StoreFileWriter.java | 13 + .../hbase/regionserver/compactions/Compactor.java | 6 + .../compactions/CustomCellTieredUtils.java | 49 ++++ .../CustomCellTieringValueProvider.java | 86 +++++++ .../CustomDateTieredCompactionPolicy.java | 155 ++++++++++++ .../compactions/CustomTieredCompactor.java | 74 ++++++ .../compactions/DateTieredCompactionPolicy.java | 129 ++++++---- .../compactions/DateTieredCompactor.java | 12 +- .../hbase/client/TestIllegalTableDescriptor.java | 14 +- .../hadoop/hbase/io/hfile/TestBytesReadFromFs.java | 4 + ....java => TestCustomCellDataTieringManager.java} | 158 ++++++------ .../TestCustomCellTieredCompactionPolicy.java | 267 +++++++++++++++++++++ .../hbase/regionserver/TestDataTieringManager.java | 13 +- .../compactions/TestCustomCellTieredCompactor.java | 148 ++++++++++++ 34 files changed, 1371 insertions(+), 245 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java index eb9a7f3eccc..b0df4920e4e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -36,4 +36,6 @@ public final class TagType { // String based tag type used in replication public static final byte STRING_VIS_TAG_TYPE = (byte) 7; public static final byte TTL_TAG_TYPE = (byte) 8; + // tag with the custom cell tiering value for the row + public static final byte CELL_VALUE_TIERING_TAG_TYPE = (byte) 9; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 90fb7ce3491..313b4034fb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -23,7 +23,6 @@ import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -199,13 +198,13 @@ public interface BlockCache extends Iterable<CachedBlock>, ConfigurationObserver * not be overridden by all implementing classes. In such cases, the returned Optional will be * empty. For subclasses implementing this logic, the returned Optional would contain the boolean * value reflecting if the passed block should indeed be cached. - * @param key The key representing the block to check if it should be cached. - * @param timeRangeTracker the time range tracker containing the timestamps - * @param conf The configuration object to use for determining caching behavior. + * @param key The key representing the block to check if it should be cached. + * @param maxTimeStamp The maximum timestamp for the block to check if it should be cached. + * @param conf The configuration object to use for determining caching behavior. * @return An empty Optional if this method is not supported; otherwise, the returned Optional * contains the boolean value indicating if the block should be cached. */ - default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker, + default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimeStamp, Configuration conf) { return Optional.empty(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 4f57668a0c6..b6357ca94b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -269,7 +269,8 @@ public class CacheConfig implements PropagatingConfigurationObserver { public boolean shouldCacheBlockOnRead(BlockCategory category, HFileInfo hFileInfo, Configuration conf) { Optional<Boolean> cacheFileBlock = Optional.of(true); - if (getBlockCache().isPresent()) { + // For DATA blocks only, if BuckeCache is in use, we don't need to cache block again + if (getBlockCache().isPresent() && category.equals(BlockCategory.DATA)) { Optional<Boolean> result = getBlockCache().get().shouldCacheFile(hFileInfo, conf); if (result.isPresent()) { cacheFileBlock = result; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index cb3000dda19..856b5da6d11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -484,10 +483,10 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { } @Override - public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker, + public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimeStamp, Configuration conf) { - return combineCacheResults(l1Cache.shouldCacheBlock(key, timeRangeTracker, conf), - l2Cache.shouldCacheBlock(key, timeRangeTracker, conf)); + return combineCacheResults(l1Cache.shouldCacheBlock(key, maxTimeStamp, conf), + l2Cache.shouldCacheBlock(key, maxTimeStamp, conf)); } private Optional<Boolean> combineCacheResults(Optional<Boolean> result1, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 2c3908aa33f..1f8f8421548 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.ShipperListener; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -217,6 +218,12 @@ public final class HFile { */ void appendTrackedTimestampsToMetadata() throws IOException; + /** + * Add Custom cell timestamp to Metadata + */ + public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) + throws IOException; + /** Returns the path to this {@link HFile} */ Path getPath(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 6aa16f9423c..e837e8f1bd6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1396,7 +1396,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { cacheConf.getBlockCache().ifPresent(cache -> { LOG.debug("Skipping decompression of block {} in prefetch", cacheKey); // Cache the block if necessary - if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + if (cacheBlock && cacheOnRead) { cache.cacheBlock(cacheKey, blockNoChecksum, cacheConf.isInMemory(), cacheOnly); } }); @@ -1410,7 +1410,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { HFileBlock unpackedNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, unpacked); // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { - if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + if (cacheBlock && cacheOnRead) { // Using the wait on cache during compaction and prefetching. cache.cacheBlock(cacheKey, cacheCompressed diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 44ec324686e..2bdfcfa45b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; @@ -29,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -126,6 +128,12 @@ public class HFileWriterImpl implements HFile.Writer { /** Cache configuration for caching data on write. */ protected final CacheConfig cacheConf; + public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) { + this.timeRangeTrackerForTiering = timeRangeTrackerForTiering; + } + + private Supplier<TimeRangeTracker> timeRangeTrackerForTiering; + /** * Name for this object used when logging or in toString. Is either the result of a toString on * stream or else name of passed file Path. @@ -185,7 +193,9 @@ public class HFileWriterImpl implements HFile.Writer { this.path = path; this.name = path != null ? path.getName() : outputStream.toString(); this.hFileContext = fileContext; + // TODO: Move this back to upper layer this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + this.timeRangeTrackerForTiering = () -> this.timeRangeTracker; DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); if (encoding != DataBlockEncoding.NONE) { this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); @@ -588,7 +598,8 @@ public class HFileWriterImpl implements HFile.Writer { } private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) { - Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeTracker, conf); + Optional<Boolean> result = + cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf); return result.orElse(true); } @@ -899,6 +910,13 @@ public class HFileWriterImpl implements HFile.Writer { appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); } + public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) + throws IOException { + // TODO: The StoreFileReader always converts the byte[] to TimeRange + // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. + appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker)); + } + /** * Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker * to include the timestamp of this key diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index f3903284067..8b333bce0b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.regionserver.DataTieringManager; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -1116,8 +1115,9 @@ public class BucketCache implements BlockCache, HeapSize { } } - if (bytesFreed < bytesToFreeWithExtra && - coldFiles != null && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName()) + if ( + bytesFreed < bytesToFreeWithExtra && coldFiles != null + && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName()) ) { int freedBlockSize = bucketEntryWithKey.getValue().getLength(); if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) { @@ -2365,10 +2365,10 @@ public class BucketCache implements BlockCache, HeapSize { } @Override - public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker, + public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimestamp, Configuration conf) { DataTieringManager dataTieringManager = DataTieringManager.getInstance(); - if (dataTieringManager != null && !dataTieringManager.isHotData(timeRangeTracker, conf)) { + if (dataTieringManager != null && !dataTieringManager.isHotData(maxTimestamp, conf)) { LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", key.getHfileName()); return Optional.of(false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index 67e0236c7be..928b07ea561 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -296,6 +297,8 @@ public class CreateTableProcedure extends AbstractStateMachineTableProcedure<Cre StoreFileTrackerValidationUtils.checkForCreateTable(env.getMasterConfiguration(), tableDescriptor); + CustomCellTieredUtils.checkForModifyTable(tableDescriptor); + return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java index 5b51a5662db..76f3c086bea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.fs.ErasureCodingUtils; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; @@ -449,6 +450,7 @@ public class ModifyTableProcedure extends AbstractStateMachineTableProcedure<Mod // check for store file tracker configurations StoreFileTrackerValidationUtils.checkForModifyTable(env.getMasterConfiguration(), unmodifiedTableDescriptor, modifiedTableDescriptor, !isTableEnabled(env)); + CustomCellTieredUtils.checkForModifyTable(modifiedTableDescriptor); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java new file mode 100644 index 00000000000..ed7dc01ba8d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; + +import java.io.IOException; +import java.util.OptionalLong; +import org.apache.hadoop.hbase.io.hfile.HFileInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class CellTSTiering implements DataTiering { + private static final Logger LOG = LoggerFactory.getLogger(CellTSTiering.class); + + public long getTimestamp(HStoreFile hStoreFile) { + OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp(); + if (!maxTimestamp.isPresent()) { + LOG.debug("Maximum timestamp not present for {}", hStoreFile.getPath()); + return Long.MAX_VALUE; + } + return maxTimestamp.getAsLong(); + } + + public long getTimestamp(HFileInfo hFileInfo) { + try { + byte[] hFileTimeRange = hFileInfo.get(TIMERANGE_KEY); + if (hFileTimeRange == null) { + LOG.debug("Timestamp information not found for file: {}", + hFileInfo.getHFileContext().getHFileName()); + return Long.MAX_VALUE; + } + return TimeRangeTracker.parseFrom(hFileTimeRange).getMax(); + } catch (IOException e) { + LOG.error("Error occurred while reading the timestamp metadata of file: {}", + hFileInfo.getHFileContext().getHFileName(), e); + return Long.MAX_VALUE; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java new file mode 100644 index 00000000000..518b31fb5be --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CustomDateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.CustomTieredCompactor; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Extension of {@link DateTieredStoreEngine} that uses a pluggable value provider for extracting + * the value to be used for comparison in this tiered compaction. Differently from the existing Date + * Tiered Compaction, this doesn't yield multiple tiers or files, but rather provides two tiers + * based on a configurable “cut-off” age. All rows with the cell tiering value older than this + * “cut-off” age would be placed together in an “old” tier, whilst younger rows would go to a + * separate, “young” tier file. + */ +@InterfaceAudience.Private +public class CustomTieredStoreEngine extends DateTieredStoreEngine { + + @Override + protected void createComponents(Configuration conf, HStore store, CellComparator kvComparator) + throws IOException { + CompoundConfiguration config = new CompoundConfiguration(); + config.add(conf); + config.add(store.conf); + config.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, + CustomDateTieredCompactionPolicy.class.getName()); + createCompactionPolicy(config, store); + this.storeFileManager = new DefaultStoreFileManager(kvComparator, + StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, config, compactionPolicy.getConf()); + this.storeFlusher = new DefaultStoreFlusher(config, store); + this.compactor = new CustomTieredCompactor(config, store); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java new file mode 100644 index 00000000000..7a9914c87d3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; + +import java.io.IOException; +import java.util.Date; +import org.apache.hadoop.hbase.io.hfile.HFileInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class CustomTiering implements DataTiering { + private static final Logger LOG = LoggerFactory.getLogger(CustomTiering.class); + + private long getMaxTSFromTimeRange(byte[] hFileTimeRange, String hFileName) { + try { + if (hFileTimeRange == null) { + LOG.debug("Custom cell-based timestamp information not found for file: {}", hFileName); + return Long.MAX_VALUE; + } + long parsedValue = TimeRangeTracker.parseFrom(hFileTimeRange).getMax(); + LOG.debug("Max TS for file {} is {}", hFileName, new Date(parsedValue)); + return parsedValue; + } catch (IOException e) { + LOG.error("Error occurred while reading the Custom cell-based timestamp metadata of file: {}", + hFileName, e); + return Long.MAX_VALUE; + } + } + + public long getTimestamp(HStoreFile hStoreFile) { + return getMaxTSFromTimeRange(hStoreFile.getMetadataValue(CUSTOM_TIERING_TIME_RANGE), + hStoreFile.getPath().getName()); + } + + public long getTimestamp(HFileInfo hFileInfo) { + return getMaxTSFromTimeRange(hFileInfo.get(CUSTOM_TIERING_TIME_RANGE), + hFileInfo.getHFileContext().getHFileName()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java new file mode 100644 index 00000000000..f2062fbf27b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.function.Function; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class CustomTieringMultiFileWriter extends DateTieredMultiFileWriter { + + public static final byte[] CUSTOM_TIERING_TIME_RANGE = Bytes.toBytes("CUSTOM_TIERING_TIME_RANGE"); + + private NavigableMap<Long, TimeRangeTracker> lowerBoundary2TimeRanger = new TreeMap<>(); + + public CustomTieringMultiFileWriter(List<Long> lowerBoundaries, + Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile, + Function<Cell, Long> tieringFunction) { + super(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, tieringFunction); + for (Long lowerBoundary : lowerBoundaries) { + lowerBoundary2TimeRanger.put(lowerBoundary, null); + } + } + + @Override + public void append(Cell cell) throws IOException { + super.append(cell); + long tieringValue = tieringFunction.apply(cell); + Map.Entry<Long, TimeRangeTracker> entry = lowerBoundary2TimeRanger.floorEntry(tieringValue); + if (entry.getValue() == null) { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + timeRangeTracker.setMin(tieringValue); + timeRangeTracker.setMax(tieringValue); + lowerBoundary2TimeRanger.put(entry.getKey(), timeRangeTracker); + ((HFileWriterImpl) lowerBoundary2Writer.get(entry.getKey()).getLiveFileWriter()) + .setTimeRangeTrackerForTiering(() -> timeRangeTracker); + } else { + TimeRangeTracker timeRangeTracker = entry.getValue(); + if (timeRangeTracker.getMin() > tieringValue) { + timeRangeTracker.setMin(tieringValue); + } + if (timeRangeTracker.getMax() < tieringValue) { + timeRangeTracker.setMax(tieringValue); + } + } + } + + @Override + public List<Path> commitWriters(long maxSeqId, boolean majorCompaction, + Collection<HStoreFile> storeFiles) throws IOException { + for (Map.Entry<Long, StoreFileWriter> entry : this.lowerBoundary2Writer.entrySet()) { + StoreFileWriter writer = entry.getValue(); + if (writer != null) { + writer.appendFileInfo(CUSTOM_TIERING_TIME_RANGE, + TimeRangeTracker.toByteArray(lowerBoundary2TimeRanger.get(entry.getKey()))); + } + } + return super.commitWriters(maxSeqId, majorCompaction, storeFiles); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java similarity index 82% copy from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java copy to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java index ee54576a648..51e89b0b79d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.io.hfile.HFileInfo; import org.apache.yetus.audience.InterfaceAudience; -@InterfaceAudience.Public -public enum DataTieringType { - NONE, - TIME_RANGE +@InterfaceAudience.Private +public interface DataTiering { + long getTimestamp(HStoreFile hFile); + + long getTimestamp(HFileInfo hFileInfo); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java index f71bc5e43aa..8443827ccaa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java @@ -17,13 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; - import java.io.IOException; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.OptionalLong; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -136,17 +134,18 @@ public class DataTieringManager { * the data tiering type is set to {@link DataTieringType#TIME_RANGE}, it uses the maximum * timestamp from the time range tracker to determine if the data is hot. Otherwise, it considers * the data as hot by default. - * @param timeRangeTracker the time range tracker containing the timestamps - * @param conf The configuration object to use for determining hot data criteria. + * @param maxTimestamp the maximum timestamp associated with the data. + * @param conf The configuration object to use for determining hot data criteria. * @return {@code true} if the data is hot, {@code false} otherwise */ - public boolean isHotData(TimeRangeTracker timeRangeTracker, Configuration conf) { + public boolean isHotData(long maxTimestamp, Configuration conf) { DataTieringType dataTieringType = getDataTieringType(conf); + if ( - dataTieringType.equals(DataTieringType.TIME_RANGE) - && timeRangeTracker.getMax() != TimeRangeTracker.INITIAL_MAX_TIMESTAMP + !dataTieringType.equals(DataTieringType.NONE) + && maxTimestamp != TimeRangeTracker.INITIAL_MAX_TIMESTAMP ) { - return hotDataValidator(timeRangeTracker.getMax(), getDataTieringHotDataAge(conf)); + return hotDataValidator(maxTimestamp, getDataTieringHotDataAge(conf)); } // DataTieringType.NONE or other types are considered hot by default return true; @@ -165,29 +164,14 @@ public class DataTieringManager { Configuration configuration = getConfiguration(hFilePath); DataTieringType dataTieringType = getDataTieringType(configuration); - if (dataTieringType.equals(DataTieringType.TIME_RANGE)) { - return hotDataValidator(getMaxTimestamp(hFilePath), getDataTieringHotDataAge(configuration)); - } - // DataTieringType.NONE or other types are considered hot by default - return true; - } - - /** - * Determines whether the data in the HFile at the given path is considered hot based on the - * configured data tiering type and hot data age. If the data tiering type is set to - * {@link DataTieringType#TIME_RANGE}, it validates the data against the provided maximum - * timestamp. - * @param hFilePath the path to the HFile - * @param maxTimestamp the maximum timestamp to validate against - * @return {@code true} if the data is hot, {@code false} otherwise - * @throws DataTieringException if there is an error retrieving data tiering information - */ - public boolean isHotData(Path hFilePath, long maxTimestamp) throws DataTieringException { - Configuration configuration = getConfiguration(hFilePath); - DataTieringType dataTieringType = getDataTieringType(configuration); - - if (dataTieringType.equals(DataTieringType.TIME_RANGE)) { - return hotDataValidator(maxTimestamp, getDataTieringHotDataAge(configuration)); + if (!dataTieringType.equals(DataTieringType.NONE)) { + HStoreFile hStoreFile = getHStoreFile(hFilePath); + if (hStoreFile == null) { + throw new DataTieringException( + "Store file corresponding to " + hFilePath + " doesn't exist"); + } + return hotDataValidator(dataTieringType.getInstance().getTimestamp(getHStoreFile(hFilePath)), + getDataTieringHotDataAge(configuration)); } // DataTieringType.NONE or other types are considered hot by default return true; @@ -204,8 +188,9 @@ public class DataTieringManager { */ public boolean isHotData(HFileInfo hFileInfo, Configuration configuration) { DataTieringType dataTieringType = getDataTieringType(configuration); - if (dataTieringType.equals(DataTieringType.TIME_RANGE)) { - return hotDataValidator(getMaxTimestamp(hFileInfo), getDataTieringHotDataAge(configuration)); + if (hFileInfo != null && !dataTieringType.equals(DataTieringType.NONE)) { + return hotDataValidator(dataTieringType.getInstance().getTimestamp(hFileInfo), + getDataTieringHotDataAge(configuration)); } // DataTieringType.NONE or other types are considered hot by default return true; @@ -217,36 +202,6 @@ public class DataTieringManager { return diff <= hotDataAge; } - private long getMaxTimestamp(Path hFilePath) throws DataTieringException { - HStoreFile hStoreFile = getHStoreFile(hFilePath); - if (hStoreFile == null) { - LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath); - return Long.MAX_VALUE; - } - OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp(); - if (!maxTimestamp.isPresent()) { - LOG.error("Maximum timestamp not present for {}", hFilePath); - return Long.MAX_VALUE; - } - return maxTimestamp.getAsLong(); - } - - private long getMaxTimestamp(HFileInfo hFileInfo) { - try { - byte[] hFileTimeRange = hFileInfo.get(TIMERANGE_KEY); - if (hFileTimeRange == null) { - LOG.error("Timestamp information not found for file: {}", - hFileInfo.getHFileContext().getHFileName()); - return Long.MAX_VALUE; - } - return TimeRangeTracker.parseFrom(hFileTimeRange).getMax(); - } catch (IOException e) { - LOG.error("Error occurred while reading the timestamp metadata of file: {}", - hFileInfo.getHFileContext().getHFileName(), e); - return Long.MAX_VALUE; - } - } - private long getCurrentTimestamp() { return EnvironmentEdgeManager.getDelegate().currentTime(); } @@ -299,7 +254,7 @@ public class DataTieringManager { private HStoreFile getHStoreFile(Path hFilePath) throws DataTieringException { HStore hStore = getHStore(hFilePath); for (HStoreFile file : hStore.getStorefiles()) { - if (file.getPath().toUri().getPath().toString().equals(hFilePath.toString())) { + if (file.getPath().equals(hFilePath)) { return file; } } @@ -330,7 +285,8 @@ public class DataTieringManager { for (HRegion r : this.onlineRegions.values()) { for (HStore hStore : r.getStores()) { Configuration conf = hStore.getReadOnlyConfiguration(); - if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) { + DataTieringType dataTieringType = getDataTieringType(conf); + if (dataTieringType == DataTieringType.NONE) { // Data-Tiering not enabled for the store. Just skip it. continue; } @@ -339,14 +295,10 @@ public class DataTieringManager { for (HStoreFile hStoreFile : hStore.getStorefiles()) { String hFileName = hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName(); - OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp(); - if (!maxTimestamp.isPresent()) { - LOG.warn("maxTimestamp missing for file: {}", - hStoreFile.getFileInfo().getActiveFileName()); - continue; - } + long maxTimeStamp = dataTieringType.getInstance().getTimestamp(hStoreFile); + LOG.debug("Max TS for file {} is {}", hFileName, new Date(maxTimeStamp)); long currentTimestamp = EnvironmentEdgeManager.getDelegate().currentTime(); - long fileAge = currentTimestamp - maxTimestamp.getAsLong(); + long fileAge = currentTimestamp - maxTimeStamp; if (fileAge > hotDataAge) { // Values do not matter. coldFiles.put(hFileName, null); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java index ee54576a648..83da5f54e43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java @@ -21,6 +21,17 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Public public enum DataTieringType { - NONE, - TIME_RANGE + NONE(null), + TIME_RANGE(new CellTSTiering()), + CUSTOM(new CustomTiering()); + + private final DataTiering instance; + + DataTieringType(DataTiering instance) { + this.instance = instance; + } + + public DataTiering getInstance() { + return instance; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java index e5ee8041c35..828d9ba0010 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.function.Function; import org.apache.hadoop.hbase.Cell; import org.apache.yetus.audience.InterfaceAudience; @@ -33,12 +34,14 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { - private final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer = new TreeMap<>(); + protected final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer = new TreeMap<>(); private final boolean needEmptyFile; private final Map<Long, String> lowerBoundariesPolicies; + protected Function<Cell, Long> tieringFunction; + /** * @param lowerBoundariesPolicies each window to storage policy map. * @param needEmptyFile whether need to create an empty store file if we haven't written @@ -46,16 +49,29 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { */ public DateTieredMultiFileWriter(List<Long> lowerBoundaries, Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile) { + this(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, c -> c.getTimestamp()); + } + + /** + * @param lowerBoundariesPolicies each window to storage policy map. + * @param needEmptyFile whether need to create an empty store file if we haven't written + * out anything. + */ + public DateTieredMultiFileWriter(List<Long> lowerBoundaries, + Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile, + Function<Cell, Long> tieringFunction) { for (Long lowerBoundary : lowerBoundaries) { lowerBoundary2Writer.put(lowerBoundary, null); } this.needEmptyFile = needEmptyFile; this.lowerBoundariesPolicies = lowerBoundariesPolicies; + this.tieringFunction = tieringFunction; } @Override public void append(Cell cell) throws IOException { - Map.Entry<Long, StoreFileWriter> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp()); + Map.Entry<Long, StoreFileWriter> entry = + lowerBoundary2Writer.floorEntry(tieringFunction.apply(cell)); StoreFileWriter writer = entry.getValue(); if (writer == null) { String lowerBoundaryStoragePolicy = lowerBoundariesPolicies.get(entry.getKey()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java index 26437ab1124..dc13f190afa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; + import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -29,6 +31,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequ import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; /** @@ -44,6 +47,18 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher, public static final String DATE_TIERED_STORE_ENGINE = DateTieredStoreEngine.class.getName(); + protected void createCompactionPolicy(Configuration conf, HStore store) throws IOException { + String className = + conf.get(DEFAULT_COMPACTION_POLICY_CLASS_KEY, DateTieredCompactionPolicy.class.getName()); + try { + compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className, + new Class[] { Configuration.class, StoreConfigInformation.class }, + new Object[] { conf, store }); + } catch (Exception e) { + throw new IOException("Unable to load configured compaction policy '" + className + "'", e); + } + } + @Override public boolean needsCompaction(List<HStoreFile> filesCompacting) { return compactionPolicy.needsCompaction(storeFileManager.getStoreFiles(), filesCompacting); @@ -57,7 +72,7 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher, @Override protected void createComponents(Configuration conf, HStore store, CellComparator kvComparator) throws IOException { - this.compactionPolicy = new DateTieredCompactionPolicy(conf, store); + createCompactionPolicy(conf, store); this.storeFileManager = new DefaultStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, conf, compactionPolicy.getConf()); this.storeFlusher = new DefaultStoreFlusher(conf, store); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8fa555cc088..4085315ea88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -701,8 +701,8 @@ public class HRegionServer extends Thread if (!isMasterNotCarryTable) { blockCache = BlockCacheFactory.createBlockCache(conf); // The call below, instantiates the DataTieringManager only when - // the configuration "hbase.regionserver.datatiering.enable" is set to true. - DataTieringManager.instantiate(conf,onlineRegions); + // the configuration "hbase.regionserver.datatiering.enable" is set to true. + DataTieringManager.instantiate(conf, onlineRegions); mobFileCache = new MobFileCache(conf); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index 5f5fcf2001a..8a705c5ef14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -255,6 +255,14 @@ public class StoreFileWriter implements CellSink, ShipperListener { } } + public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) + throws IOException { + liveFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker); + if (historicalFileWriter != null) { + historicalFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker); + } + } + @Override public void beforeShipped() throws IOException { liveFileWriter.beforeShipped(); @@ -664,6 +672,11 @@ public class StoreFileWriter implements CellSink, ShipperListener { writer.appendTrackedTimestampsToMetadata(); } + public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) + throws IOException { + writer.appendCustomCellTimestampsToMetadata(timeRangeTracker); + } + private void appendGeneralBloomfilter(final Cell cell) throws IOException { if (this.generalBloomFilterWriter != null) { /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 3e9e85a4aba..f18abf59309 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -401,6 +401,11 @@ public abstract class Compactor<T extends CellSink> { protected abstract void abortWriter(T writer) throws IOException; + protected List<Cell> decorateCells(List<Cell> cells) { + // no op + return cells; + } + /** * Performs the compaction. * @param fd FileDetails of cell sink writer @@ -454,6 +459,7 @@ public abstract class Compactor<T extends CellSink> { // output to writer: Cell lastCleanCell = null; long lastCleanCellSeqId = 0; + cells = decorateCells(cells); for (Cell c : cells) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { lastCleanCell = c; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java new file mode 100644 index 00000000000..f908b31e4ae --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY; +import static org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER; + +import java.io.IOException; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class CustomCellTieredUtils { + private CustomCellTieredUtils() { + // Utility class, no instantiation + } + + public static void checkForModifyTable(TableDescriptor newTable) throws IOException { + for (ColumnFamilyDescriptor descriptor : newTable.getColumnFamilies()) { + String storeEngineClassName = descriptor.getConfigurationValue(STORE_ENGINE_CLASS_KEY); + if ( + storeEngineClassName != null && storeEngineClassName.contains("CustomCellTieredStoreEngine") + ) { + if (descriptor.getConfigurationValue(TIERING_CELL_QUALIFIER) == null) { + throw new DoNotRetryIOException("StoreEngine " + storeEngineClassName + + " is missing required " + TIERING_CELL_QUALIFIER + " parameter."); + } + } + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java new file mode 100644 index 00000000000..fcf3b203b10 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * An extension of DateTieredCompactor, overriding the decorateCells method to allow for custom + * values to be used for the different file tiers during compaction. + */ +@InterfaceAudience.Private +public class CustomCellTieringValueProvider implements CustomTieredCompactor.TieringValueProvider { + public static final String TIERING_CELL_QUALIFIER = "TIERING_CELL_QUALIFIER"; + private byte[] tieringQualifier; + + @Override + public void init(Configuration conf) throws Exception { + tieringQualifier = Bytes.toBytes(conf.get(TIERING_CELL_QUALIFIER)); + } + + @Override + public List<Cell> decorateCells(List<Cell> cells) { + // if no tiering qualifier properly set, skips the whole flow + if (tieringQualifier != null) { + byte[] tieringValue = null; + // first iterates through the cells within a row, to find the tiering value for the row + for (Cell cell : cells) { + if (CellUtil.matchingQualifier(cell, tieringQualifier)) { + tieringValue = CellUtil.cloneValue(cell); + break; + } + } + if (tieringValue == null) { + tieringValue = Bytes.toBytes(Long.MAX_VALUE); + } + // now apply the tiering value as a tag to all cells within the row + Tag tieringValueTag = new ArrayBackedTag(TagType.CELL_VALUE_TIERING_TAG_TYPE, tieringValue); + List<Cell> newCells = new ArrayList<>(cells.size()); + for (Cell cell : cells) { + List<Tag> tags = PrivateCellUtil.getTags(cell); + tags.add(tieringValueTag); + newCells.add(PrivateCellUtil.createCell(cell, tags)); + } + return newCells; + } else { + return cells; + } + } + + @Override + public long getTieringValue(Cell cell) { + Optional<Tag> tagOptional = PrivateCellUtil.getTag(cell, TagType.CELL_VALUE_TIERING_TAG_TYPE); + if (tagOptional.isPresent()) { + Tag tag = tagOptional.get(); + return Bytes.toLong(tag.getValueByteBuffer().array(), tag.getValueOffset(), + tag.getValueLength()); + } + return Long.MAX_VALUE; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java new file mode 100644 index 00000000000..dcc97c63d02 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Custom implementation of DateTieredCompactionPolicy that calculates compaction boundaries based + * on the <b>hbase.hstore.compaction.date.tiered.custom.age.limit.millis</b> configuration property + * and the TIERING_CELL_MIN/TIERING_CELL_MAX stored on metadata of each store file. This policy + * would produce either one or two tiers: - One tier if either all files data age are older than the + * configured age limit or all files data age are younger than the configured age limit. - Two tiers + * if files have both younger and older data than the configured age limit. + */ +@InterfaceAudience.Private +public class CustomDateTieredCompactionPolicy extends DateTieredCompactionPolicy { + + public static final String AGE_LIMIT_MILLIS = + "hbase.hstore.compaction.date.tiered.custom.age.limit.millis"; + + // Defaults to 10 years + public static final long DEFAULT_AGE_LIMIT_MILLIS = + (long) (10L * 365.25 * 24L * 60L * 60L * 1000L); + + private static final Logger LOG = LoggerFactory.getLogger(CustomDateTieredCompactionPolicy.class); + + private long cutOffTimestamp; + + public CustomDateTieredCompactionPolicy(Configuration conf, + StoreConfigInformation storeConfigInfo) throws IOException { + super(conf, storeConfigInfo); + cutOffTimestamp = EnvironmentEdgeManager.currentTime() + - conf.getLong(AGE_LIMIT_MILLIS, DEFAULT_AGE_LIMIT_MILLIS); + + } + + @Override + protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, + long now) { + MutableLong min = new MutableLong(Long.MAX_VALUE); + MutableLong max = new MutableLong(0); + filesToCompact.forEach(f -> { + byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE); + long minCurrent = Long.MAX_VALUE; + long maxCurrent = 0; + if (timeRangeBytes != null) { + try { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(timeRangeBytes); + timeRangeTracker.getMin(); + minCurrent = timeRangeTracker.getMin(); + maxCurrent = timeRangeTracker.getMax(); + } catch (IOException e) { + LOG.warn("Got TIERING_CELL_TIME_RANGE info from file, but failed to parse it:", e); + } + } + if (minCurrent < min.getValue()) { + min.setValue(minCurrent); + } + if (maxCurrent > max.getValue()) { + max.setValue(maxCurrent); + } + }); + + List<Long> boundaries = new ArrayList<>(); + boundaries.add(Long.MIN_VALUE); + if (min.getValue() < cutOffTimestamp) { + boundaries.add(min.getValue()); + if (max.getValue() > cutOffTimestamp) { + boundaries.add(cutOffTimestamp); + } + } + return boundaries; + } + + @Override + public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + ArrayList<HStoreFile> filteredByPolicy = this.compactionPolicyPerWindow + .applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); + return selectMajorCompaction(filteredByPolicy); + } + + @Override + public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) + throws IOException { + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = EnvironmentEdgeManager.currentTime(); + if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) { + long cfTTL = this.storeConfigInfo.getStoreFileTtl(); + int countLower = 0; + int countHigher = 0; + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); + for (HStoreFile f : filesToCompact) { + if (checkForTtl(cfTTL, f)) { + return true; + } + if (isMajorOrBulkloadResult(f, now - lowTimestamp)) { + return true; + } + byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE); + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(timeRangeBytes); + if (timeRangeTracker.getMin() < cutOffTimestamp) { + if (timeRangeTracker.getMax() > cutOffTimestamp) { + // Found at least one file crossing the cutOffTimestamp + return true; + } else { + countLower++; + } + } else { + countHigher++; + } + hdfsBlocksDistribution.add(f.getHDFSBlockDistribution()); + } + // If we haven't found any files crossing the cutOffTimestamp, we have to check + // if there are at least more than one file on each tier and if so, perform compaction + if (countLower > 1 || countHigher > 1) { + return true; + } + return checkBlockLocality(hdfsBlocksDistribution); + } + return false; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java new file mode 100644 index 00000000000..905284a19c4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class CustomTieredCompactor extends DateTieredCompactor { + + public static final String TIERING_VALUE_PROVIDER = + "hbase.hstore.custom-tiering-value.provider.class"; + private TieringValueProvider tieringValueProvider; + + public CustomTieredCompactor(Configuration conf, HStore store) throws IOException { + super(conf, store); + String className = + conf.get(TIERING_VALUE_PROVIDER, CustomCellTieringValueProvider.class.getName()); + try { + tieringValueProvider = + (TieringValueProvider) Class.forName(className).getConstructor().newInstance(); + tieringValueProvider.init(conf); + } catch (Exception e) { + throw new IOException("Unable to load configured tiering value provider '" + className + "'", + e); + } + } + + @Override + protected List<Cell> decorateCells(List<Cell> cells) { + return tieringValueProvider.decorateCells(cells); + } + + @Override + protected DateTieredMultiFileWriter createMultiWriter(final CompactionRequestImpl request, + final List<Long> lowerBoundaries, final Map<Long, String> lowerBoundariesPolicies) { + return new CustomTieringMultiFileWriter(lowerBoundaries, lowerBoundariesPolicies, + needEmptyFile(request), CustomTieredCompactor.this.tieringValueProvider::getTieringValue); + } + + public interface TieringValueProvider { + + void init(Configuration configuration) throws Exception; + + default List<Cell> decorateCells(List<Cell> cells) { + return cells; + } + + long getTieringValue(Cell cell); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index c13bcf36af9..81cfd6a0c69 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -66,7 +67,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactionPolicy.class); - private final RatioBasedCompactionPolicy compactionPolicyPerWindow; + protected final RatioBasedCompactionPolicy compactionPolicyPerWindow; private final CompactionWindowFactory windowFactory; @@ -108,9 +109,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } } - @Override - public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) - throws IOException { + protected boolean isMajorCompactionTime(Collection<HStoreFile> filesToCompact, long now, + long lowestModificationTime) throws IOException { long mcTime = getNextMajorCompactTime(filesToCompact); if (filesToCompact == null || mcTime == 0) { if (LOG.isDebugEnabled()) { @@ -118,58 +118,40 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { } return false; } - // TODO: Use better method for determining stamp of last major (HBASE-2990) - long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); - long now = EnvironmentEdgeManager.currentTime(); - if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) { + if (lowestModificationTime <= 0L || lowestModificationTime >= (now - mcTime)) { if (LOG.isDebugEnabled()) { - LOG.debug("lowTimestamp: " + lowTimestamp + " lowTimestamp: " + lowTimestamp + " now: " - + now + " mcTime: " + mcTime); + LOG.debug("lowTimestamp: " + lowestModificationTime + " lowTimestamp: " + + lowestModificationTime + " now: " + now + " mcTime: " + mcTime); } return false; } + return true; + } - long cfTTL = this.storeConfigInfo.getStoreFileTtl(); - HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); - List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now); - boolean[] filesInWindow = new boolean[boundaries.size()]; - - for (HStoreFile file : filesToCompact) { - OptionalLong minTimestamp = file.getMinimumTimestamp(); - long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE; - if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) { - LOG.debug("Major compaction triggered on store " + this + "; for TTL maintenance"); - return true; - } - if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) { - LOG.debug("Major compaction triggered on store " + this - + ", because there are new files and time since last major compaction " - + (now - lowTimestamp) + "ms"); - return true; - } + protected boolean checkForTtl(long ttl, HStoreFile file) { + OptionalLong minTimestamp = file.getMinimumTimestamp(); + long oldest = minTimestamp.isPresent() + ? EnvironmentEdgeManager.currentTime() - minTimestamp.getAsLong() + : Long.MIN_VALUE; + if (ttl != Long.MAX_VALUE && oldest >= ttl) { + LOG.debug("Major compaction triggered on store " + this + "; for TTL maintenance"); + return true; + } + return false; + } - int lowerWindowIndex = - Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE)); - int upperWindowIndex = - Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE)); - // Handle boundary conditions and negative values of binarySearch - lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex; - upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex; - if (lowerWindowIndex != upperWindowIndex) { - LOG.debug("Major compaction triggered on store " + this + "; because file " + file.getPath() - + " has data with timestamps cross window boundaries"); - return true; - } else if (filesInWindow[upperWindowIndex]) { - LOG.debug("Major compaction triggered on store " + this - + "; because there are more than one file in some windows"); - return true; - } else { - filesInWindow[upperWindowIndex] = true; - } - hdfsBlocksDistribution.add(file.getHDFSBlockDistribution()); + protected boolean isMajorOrBulkloadResult(HStoreFile file, long timeDiff) { + if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) { + LOG.debug("Major compaction triggered on store " + this + + ", because there are new files and time since last major compaction " + timeDiff + "ms"); + return true; } + return false; + } + protected boolean checkBlockLocality(HDFSBlocksDistribution hdfsBlocksDistribution) + throws UnknownHostException { float blockLocalityIndex = hdfsBlocksDistribution .getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER)); if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { @@ -178,9 +160,55 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { + " (min " + comConf.getMinLocalityToForceCompact() + ")"); return true; } + return false; + } - LOG.debug( - "Skipping major compaction of " + this + ", because the files are already major compacted"); + @Override + public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) + throws IOException { + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = EnvironmentEdgeManager.currentTime(); + if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) { + long cfTTL = this.storeConfigInfo.getStoreFileTtl(); + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); + List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now); + boolean[] filesInWindow = new boolean[boundaries.size()]; + for (HStoreFile file : filesToCompact) { + OptionalLong minTimestamp = file.getMinimumTimestamp(); + if (checkForTtl(cfTTL, file)) { + return true; + } + if (isMajorOrBulkloadResult(file, now - lowTimestamp)) { + return true; + } + int lowerWindowIndex = + Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE)); + int upperWindowIndex = + Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE)); + // Handle boundary conditions and negative values of binarySearch + lowerWindowIndex = + (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex; + upperWindowIndex = + (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex; + if (lowerWindowIndex != upperWindowIndex) { + LOG.debug("Major compaction triggered on store " + this + "; because file " + + file.getPath() + " has data with timestamps cross window boundaries"); + return true; + } else if (filesInWindow[upperWindowIndex]) { + LOG.debug("Major compaction triggered on store " + this + + "; because there are more than one file in some windows"); + return true; + } else { + filesInWindow[upperWindowIndex] = true; + } + hdfsBlocksDistribution.add(file.getHDFSBlockDistribution()); + } + if (checkBlockLocality(hdfsBlocksDistribution)) { + return true; + } + LOG.debug( + "Skipping major compaction of " + this + ", because the files are already major compacted"); + } return false; } @@ -296,7 +324,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { /** * Return a list of boundaries for multiple compaction output in ascending order. */ - private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, long now) { + protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, + long now) { long minTimestamp = filesToCompact.stream() .mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min().orElse(Long.MAX_VALUE); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index b5911b0cec4..9cef2ebc314 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -46,7 +46,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered super(conf, store); } - private boolean needEmptyFile(CompactionRequestImpl request) { + protected boolean needEmptyFile(CompactionRequestImpl request) { // if we are going to compact the last N files, then we need to emit an empty file to retain the // maxSeqId if we haven't written out anything. OptionalLong maxSeqId = StoreUtils.getMaxSequenceIdInList(request.getFiles()); @@ -70,14 +70,20 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker) throws IOException { - DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries, - lowerBoundariesPolicies, needEmptyFile(request)); + DateTieredMultiFileWriter writer = + createMultiWriter(request, lowerBoundaries, lowerBoundariesPolicies); initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker); return writer; } }, throughputController, user); } + protected DateTieredMultiFileWriter createMultiWriter(final CompactionRequestImpl request, + final List<Long> lowerBoundaries, final Map<Long, String> lowerBoundariesPolicies) { + return new DateTieredMultiFileWriter(lowerBoundaries, lowerBoundariesPolicies, + needEmptyFile(request), c -> c.getTimestamp()); + } + @Override protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIllegalTableDescriptor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIllegalTableDescriptor.java index 2d45c05324b..d7c743fbf59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIllegalTableDescriptor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIllegalTableDescriptor.java @@ -194,9 +194,10 @@ public class TestIllegalTableDescriptor { @Test public void testIllegalTableDescriptorWithDataTiering() throws IOException { - // table level configuration changes HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); + // table level configuration changes + htd.addFamily(hcd); // First scenario: DataTieringType set to TIME_RANGE without DateTieredStoreEngine htd.setValue(DataTieringManager.DATATIERING_KEY, DataTieringType.TIME_RANGE.name()); @@ -212,20 +213,23 @@ public class TestIllegalTableDescriptor { "org.apache.hadoop.hbase.regionserver.DefaultStoreEngine"); checkTableIsIllegal(htd); + // column family level configuration changes + htd = new HTableDescriptor(TableName.valueOf(name.getMethodName())); + hcd = new HColumnDescriptor(FAMILY); + // First scenario: DataTieringType set to TIME_RANGE without DateTieredStoreEngine - hcd.setConfiguration(DataTieringManager.DATATIERING_KEY, - DataTieringType.TIME_RANGE.name()); + hcd.setConfiguration(DataTieringManager.DATATIERING_KEY, DataTieringType.TIME_RANGE.name()); checkTableIsIllegal(htd.addFamily(hcd)); // Second scenario: DataTieringType set to TIME_RANGE with DateTieredStoreEngine hcd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine"); - checkTableIsLegal(htd.addFamily(hcd)); + checkTableIsLegal(htd.modifyFamily(hcd)); // Third scenario: Disabling DateTieredStoreEngine while Time Range DataTiering is active hcd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, "org.apache.hadoop.hbase.regionserver.DefaultStoreEngine"); - checkTableIsIllegal(htd.addFamily(hcd)); + checkTableIsIllegal(htd.modifyFamily(hcd)); } private void checkTableIsLegal(HTableDescriptor htd) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java index c85a162ad96..7e7b4cb5c37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java @@ -268,6 +268,10 @@ public class TestBytesReadFromFs { CacheConfig cacheConfig = new CacheConfig(conf); HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf); + // Since HBASE-28466, we call fileInfo.initMetaAndIndex inside HFilePreadReader, + // which reads some blocks and increment the counters, so we need to reset it here. + ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); + ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); HFileBlock.FSReader blockReader = reader.getUncachedBlockReader(); // Create iterator for reading root index block diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java similarity index 89% copy from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java copy to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java index b24c1f2ed84..0771d41bb43 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -76,35 +77,37 @@ import org.slf4j.LoggerFactory; /** * This class is used to test the functionality of the DataTieringManager. * - * The mock online regions are stored in {@link TestDataTieringManager#testOnlineRegions}. - * For all tests, the setup of {@link TestDataTieringManager#testOnlineRegions} occurs only once. - * Please refer to {@link TestDataTieringManager#setupOnlineRegions()} for the structure. - * Additionally, a list of all store files is maintained in {@link TestDataTieringManager#hStoreFiles}. + * The mock online regions are stored in {@link TestCustomCellDataTieringManager#testOnlineRegions}. + * For all tests, the setup of + * {@link TestCustomCellDataTieringManager#testOnlineRegions} occurs only once. + * Please refer to {@link TestCustomCellDataTieringManager#setupOnlineRegions()} for the structure. + * Additionally, a list of all store files is + * maintained in {@link TestCustomCellDataTieringManager#hStoreFiles}. * The characteristics of these store files are listed below: - * @formatter:off ## HStoreFile Information - * + * @formatter:off + * ## HStoreFile Information * | HStoreFile | Region | Store | DataTiering | isHot | * |------------------|--------------------|---------------------|-----------------------|-------| - * | hStoreFile0 | region1 | hStore11 | TIME_RANGE | true | + * | hStoreFile0 | region1 | hStore11 | CUSTOM_CELL_VALUE | true | * | hStoreFile1 | region1 | hStore12 | NONE | true | - * | hStoreFile2 | region2 | hStore21 | TIME_RANGE | true | - * | hStoreFile3 | region2 | hStore22 | TIME_RANGE | false | + * | hStoreFile2 | region2 | hStore21 | CUSTOM_CELL_VALUE | true | + * | hStoreFile3 | region2 | hStore22 | CUSTOM_CELL_VALUE | false | * @formatter:on */ @Category({ RegionServerTests.class, SmallTests.class }) -public class TestDataTieringManager { +public class TestCustomCellDataTieringManager { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestDataTieringManager.class); + HBaseClassTestRule.forClass(TestCustomCellDataTieringManager.class); + private static final Logger LOG = LoggerFactory.getLogger(TestCustomCellDataTieringManager.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final Logger LOG = LoggerFactory.getLogger(TestDataTieringManager.class); private static final long DAY = 24 * 60 * 60 * 1000; private static Configuration defaultConf; private static FileSystem fs; - private static BlockCache blockCache; + private BlockCache blockCache; private static CacheConfig cacheConf; private static Path testDir; private static final Map<String, HRegion> testOnlineRegions = new HashMap<>(); @@ -121,10 +124,10 @@ public class TestDataTieringManager { @BeforeClass public static void setupBeforeClass() throws Exception { - testDir = TEST_UTIL.getDataTestDir(TestDataTieringManager.class.getSimpleName()); + testDir = TEST_UTIL.getDataTestDir(TestCustomCellDataTieringManager.class.getSimpleName()); defaultConf = TEST_UTIL.getConfiguration(); updateCommonConfigurations(); - assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions)); + DataTieringManager.instantiate(defaultConf, testOnlineRegions); dataTieringManager = DataTieringManager.getInstance(); rowKeyString = ""; } @@ -224,13 +227,14 @@ public class TestDataTieringManager { // Test with a filename where corresponding HStoreFile in not present hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(), "incorrectFileName"); - testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, true); + testDataTieringMethodWithPathExpectingException(methodCallerWithPath, hFilePath, + new DataTieringException("Store file corresponding to " + hFilePath + " doesn't exist")); } @Test public void testPrefetchWhenDataTieringEnabled() throws IOException { setPrefetchBlocksOnOpen(); - initializeTestEnvironment(); + this.blockCache = initializeTestEnvironment(); // Evict blocks from cache by closing the files and passing evict on close. // Then initialize the reader again. Since Prefetch on open is set to true, it should prefetch // those blocks. @@ -280,27 +284,26 @@ public class TestDataTieringManager { @Test public void testCacheCompactedBlocksOnWriteDataTieringDisabled() throws IOException { setCacheCompactBlocksOnWrite(); - initializeTestEnvironment(); - - HRegion region = createHRegion("table3"); + this.blockCache = initializeTestEnvironment(); + HRegion region = createHRegion("table3", this.blockCache); testCacheCompactedBlocksOnWrite(region, true); } @Test public void testCacheCompactedBlocksOnWriteWithHotData() throws IOException { setCacheCompactBlocksOnWrite(); - initializeTestEnvironment(); - - HRegion region = createHRegion("table3", getConfWithTimeRangeDataTieringEnabled(5 * DAY)); + this.blockCache = initializeTestEnvironment(); + HRegion region = + createHRegion("table3", getConfWithCustomCellDataTieringEnabled(5 * DAY), this.blockCache); testCacheCompactedBlocksOnWrite(region, true); } @Test public void testCacheCompactedBlocksOnWriteWithColdData() throws IOException { setCacheCompactBlocksOnWrite(); - initializeTestEnvironment(); - - HRegion region = createHRegion("table3", getConfWithTimeRangeDataTieringEnabled(DAY)); + this.blockCache = initializeTestEnvironment(); + HRegion region = + createHRegion("table3", getConfWithCustomCellDataTieringEnabled(DAY), this.blockCache); testCacheCompactedBlocksOnWrite(region, false); } @@ -335,12 +338,11 @@ public class TestDataTieringManager { Path storeDir = hStore.getStoreContext().getFamilyStoreDirectoryPath(); Configuration configuration = hStore.getReadOnlyConfiguration(); - createHStoreFile(storeDir, configuration, currentTime - 2 * DAY, - hStore.getHRegion().getRegionFileSystem()); - createHStoreFile(storeDir, configuration, currentTime - 3 * DAY, - hStore.getHRegion().getRegionFileSystem()); - createHStoreFile(storeDir, configuration, currentTime - 4 * DAY, - hStore.getHRegion().getRegionFileSystem()); + HRegionFileSystem regionFS = hStore.getHRegion().getRegionFileSystem(); + + createHStoreFile(storeDir, configuration, currentTime - 2 * DAY, regionFS); + createHStoreFile(storeDir, configuration, currentTime - 3 * DAY, regionFS); + createHStoreFile(storeDir, configuration, currentTime - 4 * DAY, regionFS); } @Test @@ -568,48 +570,37 @@ public class TestDataTieringManager { @Test public void testCacheConfigShouldCacheFile() throws Exception { - // Evict the files from cache. - for (HStoreFile file : hStoreFiles) { - file.closeStoreFile(true); - } + initializeTestEnvironment(); // Verify that the API shouldCacheFileBlock returns the result correctly. // hStoreFiles[0], hStoreFiles[1], hStoreFiles[2] are hot files. // hStoreFiles[3] is a cold file. - try { - assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, - hStoreFiles.get(0).getFileInfo().getHFileInfo(), - hStoreFiles.get(0).getFileInfo().getConf())); - assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, - hStoreFiles.get(1).getFileInfo().getHFileInfo(), - hStoreFiles.get(1).getFileInfo().getConf())); - assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, - hStoreFiles.get(2).getFileInfo().getHFileInfo(), - hStoreFiles.get(2).getFileInfo().getConf())); - assertFalse(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, - hStoreFiles.get(3).getFileInfo().getHFileInfo(), - hStoreFiles.get(3).getFileInfo().getConf())); - } finally { - for (HStoreFile file : hStoreFiles) { - file.initReader(); - } - } + assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, + hStoreFiles.get(0).getFileInfo().getHFileInfo(), hStoreFiles.get(0).getFileInfo().getConf())); + assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, + hStoreFiles.get(1).getFileInfo().getHFileInfo(), hStoreFiles.get(1).getFileInfo().getConf())); + assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, + hStoreFiles.get(2).getFileInfo().getHFileInfo(), hStoreFiles.get(2).getFileInfo().getConf())); + assertFalse(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, + hStoreFiles.get(3).getFileInfo().getHFileInfo(), hStoreFiles.get(3).getFileInfo().getConf())); } @Test public void testCacheOnReadColdFile() throws Exception { + this.blockCache = initializeTestEnvironment(); // hStoreFiles[3] is a cold file. the blocks should not get loaded after a readBlock call. HStoreFile hStoreFile = hStoreFiles.get(3); BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true, BlockType.DATA); - testCacheOnRead(hStoreFile, cacheKey, 23025, false); + testCacheOnRead(hStoreFile, cacheKey, -1, false); } @Test public void testCacheOnReadHotFile() throws Exception { + this.blockCache = initializeTestEnvironment(); // hStoreFiles[0] is a hot file. the blocks should get loaded after a readBlock call. HStoreFile hStoreFile = hStoreFiles.get(0); BlockCacheKey cacheKey = new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA); - testCacheOnRead(hStoreFile, cacheKey, 23025, true); + testCacheOnRead(hStoreFile, cacheKey, -1, true); } private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long onDiskBlockSize, @@ -618,7 +609,7 @@ public class TestDataTieringManager { hStoreFile.getReader().getHFileReader().readBlock(key.getOffset(), onDiskBlockSize, true, false, false, false, key.getBlockType(), DataBlockEncoding.NONE); // Validate that the hot block gets cached and cold block is not cached. - HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, false, BlockType.DATA); + HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, false); if (expectedCached) { assertNotNull(block); } else { @@ -640,7 +631,8 @@ public class TestDataTieringManager { numColdBlocks++; } } catch (Exception e) { - fail("Unexpected exception!"); + LOG.debug("Error validating priority for key {}", key, e); + fail(e.getMessage()); } } assertEquals(expectedHotBlocks, numHotBlocks); @@ -699,26 +691,28 @@ public class TestDataTieringManager { testDataTieringMethodWithKey(caller, key, expectedResult, null); } - private static void initializeTestEnvironment() throws IOException { - setupFileSystemAndCache(); - setupOnlineRegions(); + private static BlockCache initializeTestEnvironment() throws IOException { + BlockCache blockCache = setupFileSystemAndCache(); + setupOnlineRegions(blockCache); + return blockCache; } - private static void setupFileSystemAndCache() throws IOException { + private static BlockCache setupFileSystemAndCache() throws IOException { fs = HFileSystem.get(defaultConf); - blockCache = BlockCacheFactory.createBlockCache(defaultConf); + BlockCache blockCache = BlockCacheFactory.createBlockCache(defaultConf); cacheConf = new CacheConfig(defaultConf, blockCache); + return blockCache; } - private static void setupOnlineRegions() throws IOException { + private static void setupOnlineRegions(BlockCache blockCache) throws IOException { testOnlineRegions.clear(); hStoreFiles.clear(); long day = 24 * 60 * 60 * 1000; long currentTime = System.currentTimeMillis(); - HRegion region1 = createHRegion("table1"); + HRegion region1 = createHRegion("table1", blockCache); - HStore hStore11 = createHStore(region1, "cf1", getConfWithTimeRangeDataTieringEnabled(day)); + HStore hStore11 = createHStore(region1, "cf1", getConfWithCustomCellDataTieringEnabled(day)); hStoreFiles.add(createHStoreFile(hStore11.getStoreContext().getFamilyStoreDirectoryPath(), hStore11.getReadOnlyConfiguration(), currentTime, region1.getRegionFileSystem())); hStore11.refreshStoreFiles(); @@ -730,8 +724,8 @@ public class TestDataTieringManager { region1.stores.put(Bytes.toBytes("cf1"), hStore11); region1.stores.put(Bytes.toBytes("cf2"), hStore12); - HRegion region2 = - createHRegion("table2", getConfWithTimeRangeDataTieringEnabled((long) (2.5 * day))); + HRegion region2 = createHRegion("table2", + getConfWithCustomCellDataTieringEnabled((long) (2.5 * day)), blockCache); HStore hStore21 = createHStore(region2, "cf1"); hStoreFiles.add(createHStoreFile(hStore21.getStoreContext().getFamilyStoreDirectoryPath(), @@ -753,11 +747,12 @@ public class TestDataTieringManager { testOnlineRegions.put(region2.getRegionInfo().getEncodedName(), region2); } - private static HRegion createHRegion(String table) throws IOException { - return createHRegion(table, defaultConf); + private static HRegion createHRegion(String table, BlockCache blockCache) throws IOException { + return createHRegion(table, defaultConf, blockCache); } - private static HRegion createHRegion(String table, Configuration conf) throws IOException { + private static HRegion createHRegion(String table, Configuration conf, BlockCache blockCache) + throws IOException { TableName tableName = TableName.valueOf(table); TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) @@ -796,15 +791,7 @@ public class TestDataTieringManager { return new HStore(region, columnFamilyDescriptor, conf, false); } - private static Configuration getConfWithTimeRangeDataTieringEnabled(long hotDataAge) { - Configuration conf = new Configuration(defaultConf); - conf.set(DataTieringManager.DATATIERING_KEY, DataTieringType.TIME_RANGE.name()); - conf.set(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(hotDataAge)); - return conf; - } - - - static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long timestamp, + private static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long timestamp, HRegionFileSystem regionFs) throws IOException { String columnFamily = storeDir.getName(); @@ -816,6 +803,13 @@ public class TestDataTieringManager { return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf, BloomType.NONE, true); } + private static Configuration getConfWithCustomCellDataTieringEnabled(long hotDataAge) { + Configuration conf = new Configuration(defaultConf); + conf.set(DataTieringManager.DATATIERING_KEY, DataTieringType.CUSTOM.name()); + conf.set(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(hotDataAge)); + return conf; + } + /** * Writes random data to a store file with rows arranged in lexicographically increasing order. * Each row is generated using the {@link #nextString()} method, ensuring that each subsequent row @@ -833,6 +827,10 @@ public class TestDataTieringManager { } } finally { writer.appendTrackedTimestampsToMetadata(); + TimeRangeTracker timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + timeRangeTracker.setMin(timestamp); + timeRangeTracker.setMax(timestamp); + writer.appendCustomCellTimestampsToMetadata(timeRangeTracker); writer.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java new file mode 100644 index 00000000000..b2c363bf53a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.compactions.CustomDateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestCustomCellTieredCompactionPolicy { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCustomCellTieredCompactionPolicy.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + public static final byte[] FAMILY = Bytes.toBytes("cf"); + + private HStoreFile createFile(Path file, long minValue, long maxValue, long size, int seqId) + throws IOException { + return createFile(mockRegionInfo(), file, minValue, maxValue, size, seqId, 0); + } + + private HStoreFile createFile(RegionInfo regionInfo, Path file, long minValue, long maxValue, + long size, int seqId, long ageInDisk) throws IOException { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + HRegionFileSystem regionFileSystem = + new HRegionFileSystem(TEST_UTIL.getConfiguration(), fs, file, regionInfo); + MockHStoreFile msf = new MockHStoreFile(TEST_UTIL, file, size, ageInDisk, false, (long) seqId); + TimeRangeTracker timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + timeRangeTracker.setMin(minValue); + timeRangeTracker.setMax(maxValue); + msf.setMetadataValue(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker)); + return msf; + } + + private CustomDateTieredCompactionPolicy mockAndCreatePolicy() throws Exception { + RegionInfo mockedRegionInfo = mockRegionInfo(); + return mockAndCreatePolicy(mockedRegionInfo); + } + + private CustomDateTieredCompactionPolicy mockAndCreatePolicy(RegionInfo regionInfo) + throws Exception { + StoreConfigInformation mockedStoreConfig = mock(StoreConfigInformation.class); + when(mockedStoreConfig.getRegionInfo()).thenReturn(regionInfo); + CustomDateTieredCompactionPolicy policy = + new CustomDateTieredCompactionPolicy(TEST_UTIL.getConfiguration(), mockedStoreConfig); + return policy; + } + + private RegionInfo mockRegionInfo() { + RegionInfo mockedRegionInfo = mock(RegionInfo.class); + when(mockedRegionInfo.getEncodedName()).thenReturn("1234567890987654321"); + return mockedRegionInfo; + } + + private Path preparePath() throws Exception { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + Path file = + new Path(TEST_UTIL.getDataTestDir(), UUID.randomUUID().toString().replaceAll("-", "")); + fs.create(file); + return file; + } + + @Test + public void testGetCompactBoundariesForMajorNoOld() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList<HStoreFile> files = new ArrayList<>(); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + assertEquals(1, + ((DateTieredCompactionRequest) policy.selectMajorCompaction(files)).getBoundaries().size()); + } + + @Test + public void testGetCompactBoundariesForMajorAllOld() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList<HStoreFile> files = new ArrayList<>(); + // The default cut off age is 10 years, so any of the min/max value there should get in the old + // tier + files.add(createFile(file, 0, 1, 1024, 0)); + files.add(createFile(file, 2, 3, 1024, 1)); + assertEquals(2, + ((DateTieredCompactionRequest) policy.selectMajorCompaction(files)).getBoundaries().size()); + } + + @Test + public void testGetCompactBoundariesForMajorOneOnEachSide() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList<HStoreFile> files = new ArrayList<>(); + files.add(createFile(file, 0, 1, 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + assertEquals(3, + ((DateTieredCompactionRequest) policy.selectMajorCompaction(files)).getBoundaries().size()); + } + + @Test + public void testGetCompactBoundariesForMajorOneCrossing() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList<HStoreFile> files = new ArrayList<>(); + files.add(createFile(file, 0, EnvironmentEdgeManager.currentTime(), 1024, 0)); + assertEquals(3, + ((DateTieredCompactionRequest) policy.selectMajorCompaction(files)).getBoundaries().size()); + } + + @FunctionalInterface + interface PolicyValidator<T, U> { + void accept(T t, U u) throws Exception; + } + + private void testShouldPerformMajorCompaction(long min, long max, int numFiles, + PolicyValidator<CustomDateTieredCompactionPolicy, ArrayList<HStoreFile>> validation) + throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + RegionInfo mockedRegionInfo = mockRegionInfo(); + Path file = preparePath(); + ArrayList<HStoreFile> files = new ArrayList<>(); + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + for (int i = 0; i < numFiles; i++) { + MockHStoreFile mockedSFile = (MockHStoreFile) createFile(mockedRegionInfo, file, min, max, + 1024, 0, HConstants.DEFAULT_MAJOR_COMPACTION_PERIOD); + mockedSFile.setIsMajor(true); + files.add(mockedSFile); + } + EnvironmentEdgeManager.reset(); + validation.accept(policy, files); + } + + @Test + public void testShouldPerformMajorCompactionOneFileCrossing() throws Exception { + long max = EnvironmentEdgeManager.currentTime(); + testShouldPerformMajorCompaction(0, max, 1, + (p, f) -> assertTrue(p.shouldPerformMajorCompaction(f))); + } + + @Test + public void testShouldPerformMajorCompactionOneFileMinMaxLow() throws Exception { + testShouldPerformMajorCompaction(0, 1, 1, + (p, f) -> assertFalse(p.shouldPerformMajorCompaction(f))); + } + + @Test + public void testShouldPerformMajorCompactionOneFileMinMaxHigh() throws Exception { + long currentTime = EnvironmentEdgeManager.currentTime(); + testShouldPerformMajorCompaction(currentTime, currentTime, 1, + (p, f) -> assertFalse(p.shouldPerformMajorCompaction(f))); + } + + @Test + public void testShouldPerformMajorCompactionTwoFilesMinMaxHigh() throws Exception { + long currentTime = EnvironmentEdgeManager.currentTime(); + testShouldPerformMajorCompaction(currentTime, currentTime, 2, + (p, f) -> assertTrue(p.shouldPerformMajorCompaction(f))); + } + + @Test + public void testSelectMinorCompactionTwoFilesNoOld() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList<HStoreFile> files = new ArrayList<>(); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + // Shouldn't do minor compaction, as minimum number of files + // for minor compactions is 3 + assertEquals(0, policy.selectMinorCompaction(files, true, true).getFiles().size()); + } + + @Test + public void testSelectMinorCompactionThreeFilesNoOld() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList<HStoreFile> files = new ArrayList<>(); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 2)); + assertEquals(3, policy.selectMinorCompaction(files, true, true).getFiles().size()); + } + + @Test + public void testSelectMinorCompactionThreeFilesAllOld() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList<HStoreFile> files = new ArrayList<>(); + files.add(createFile(file, 0, 1, 1024, 0)); + files.add(createFile(file, 1, 2, 1024, 1)); + files.add(createFile(file, 3, 4, 1024, 2)); + assertEquals(3, policy.selectMinorCompaction(files, true, true).getFiles().size()); + } + + @Test + public void testSelectMinorCompactionThreeFilesOneOldTwoNew() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList<HStoreFile> files = new ArrayList<>(); + files.add(createFile(file, 0, 1, 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 2)); + assertEquals(3, policy.selectMinorCompaction(files, true, true).getFiles().size()); + } + + @Test + public void testSelectMinorCompactionThreeFilesTwoOldOneNew() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList<HStoreFile> files = new ArrayList<>(); + files.add(createFile(file, 0, 1, 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 2)); + assertEquals(3, policy.selectMinorCompaction(files, true, true).getFiles().size()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java index b24c1f2ed84..585482c9409 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -224,7 +225,8 @@ public class TestDataTieringManager { // Test with a filename where corresponding HStoreFile in not present hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(), "incorrectFileName"); - testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, true); + testDataTieringMethodWithPathExpectingException(methodCallerWithPath, hFilePath, + new DataTieringException("Store file corresponding to " + hFilePath + " doesn't exist")); } @Test @@ -597,19 +599,21 @@ public class TestDataTieringManager { @Test public void testCacheOnReadColdFile() throws Exception { + initializeTestEnvironment(); // hStoreFiles[3] is a cold file. the blocks should not get loaded after a readBlock call. HStoreFile hStoreFile = hStoreFiles.get(3); BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true, BlockType.DATA); - testCacheOnRead(hStoreFile, cacheKey, 23025, false); + testCacheOnRead(hStoreFile, cacheKey, -1, false); } @Test public void testCacheOnReadHotFile() throws Exception { + initializeTestEnvironment(); // hStoreFiles[0] is a hot file. the blocks should get loaded after a readBlock call. HStoreFile hStoreFile = hStoreFiles.get(0); BlockCacheKey cacheKey = new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA); - testCacheOnRead(hStoreFile, cacheKey, 23025, true); + testCacheOnRead(hStoreFile, cacheKey, -1, true); } private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long onDiskBlockSize, @@ -618,7 +622,7 @@ public class TestDataTieringManager { hStoreFile.getReader().getHFileReader().readBlock(key.getOffset(), onDiskBlockSize, true, false, false, false, key.getBlockType(), DataBlockEncoding.NONE); // Validate that the hot block gets cached and cold block is not cached. - HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, false, BlockType.DATA); + HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, false); if (expectedCached) { assertNotNull(block); } else { @@ -803,7 +807,6 @@ public class TestDataTieringManager { return conf; } - static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long timestamp, HRegionFileSystem regionFs) throws IOException { String columnFamily = storeDir.getName(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java new file mode 100644 index 00000000000..331dd41e4f1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; +import static org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.CustomTieredStoreEngine; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestCustomCellTieredCompactor { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCustomCellTieredCompactor.class); + + public static final byte[] FAMILY = Bytes.toBytes("cf"); + + protected HBaseTestingUtility utility; + + protected Admin admin; + + @Before + public void setUp() throws Exception { + utility = new HBaseTestingUtility(); + utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10); + utility.startMiniCluster(); + } + + @After + public void tearDown() throws Exception { + utility.shutdownMiniCluster(); + } + + @Test + public void testCustomCellTieredCompactor() throws Exception { + ColumnFamilyDescriptorBuilder clmBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY); + clmBuilder.setValue("hbase.hstore.engine.class", CustomTieredStoreEngine.class.getName()); + clmBuilder.setValue(TIERING_CELL_QUALIFIER, "date"); + TableName tableName = TableName.valueOf("testCustomCellTieredCompactor"); + TableDescriptorBuilder tblBuilder = TableDescriptorBuilder.newBuilder(tableName); + tblBuilder.setColumnFamily(clmBuilder.build()); + utility.getAdmin().createTable(tblBuilder.build()); + utility.waitTableAvailable(tableName); + Connection connection = utility.getConnection(); + Table table = connection.getTable(tableName); + long recordTime = System.currentTimeMillis(); + // write data and flush multiple store files: + for (int i = 0; i < 6; i++) { + List<Put> puts = new ArrayList<>(2); + Put put = new Put(Bytes.toBytes(i)); + put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i)); + put.addColumn(FAMILY, Bytes.toBytes("date"), + Bytes.toBytes(recordTime - (11L * 366L * 24L * 60L * 60L * 1000L))); + puts.add(put); + put = new Put(Bytes.toBytes(i + 1000)); + put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000))); + put.addColumn(FAMILY, Bytes.toBytes("date"), Bytes.toBytes(recordTime)); + puts.add(put); + table.put(puts); + utility.flush(tableName); + } + table.close(); + long firstCompactionTime = System.currentTimeMillis(); + utility.getAdmin().majorCompact(tableName); + Waiter.waitFor(utility.getConfiguration(), 5000, + () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName) + > firstCompactionTime); + long numHFiles = utility.getNumHFiles(tableName, FAMILY); + // The first major compaction would have no means to detect more than one tier, + // because without the min/max values available in the file info portion of the selected files + // for compaction, CustomCellDateTieredCompactionPolicy has no means + // to calculate the proper boundaries. + assertEquals(1, numHFiles); + utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles() + .forEach(file -> { + byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE); + assertNotNull(rangeBytes); + try { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes); + assertEquals((recordTime - (11L * 366L * 24L * 60L * 60L * 1000L)), + timeRangeTracker.getMin()); + assertEquals(recordTime, timeRangeTracker.getMax()); + } catch (IOException e) { + fail(e.getMessage()); + } + }); + // now do major compaction again, to make sure we write two separate files + long secondCompactionTime = System.currentTimeMillis(); + utility.getAdmin().majorCompact(tableName); + Waiter.waitFor(utility.getConfiguration(), 5000, + () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName) + > secondCompactionTime); + numHFiles = utility.getNumHFiles(tableName, FAMILY); + assertEquals(2, numHFiles); + utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles() + .forEach(file -> { + byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE); + assertNotNull(rangeBytes); + try { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes); + assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax()); + } catch (IOException e) { + fail(e.getMessage()); + } + }); + } +}