This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit b704348dac2269cc630ac854e7449a386273bb75
Author: Wellington Ramos Chevreuil <wchevre...@apache.org>
AuthorDate: Tue Jul 15 11:48:35 2025 +0100

    HBASE-29427 Merge all commits related to custom tiering into the feature 
branch (#7124)
    
    This is the whole custom tiering implementation and involves the following 
individual works:
    
    * HBASE-29412 Extend date tiered compaction to allow for tiering by values 
other than cell timestamp
    * HBASE-29413 Implement a custom qualifier tiered compaction
    * HBASE-29414 Refactor DataTieringManager to make priority logic pluggable
    * HBASE-29422 Implement selectMinorCompation in 
CustomCellDateTieredCompactionPolicy
    * HBASE-29424 Implement configuration validation for custom tiering 
compactions
    * HBASE-29425 Refine and polish code
    * HBASE-29426 Define a tiering value provider and refactor custom tiered 
compaction related classes
    * HBASE-28463 Rebase time based priority branch (HBASE-28463) with latest 
master (and fix conflicts)
    
    Co-authored-by: Janardhan Hungund <janardhan.hung...@cloudera.com>
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
    Change-Id: I5dc170e31e47b297e1d81f07d625f680c5e0d848
---
 .../main/java/org/apache/hadoop/hbase/TagType.java |   2 +
 .../apache/hadoop/hbase/io/hfile/BlockCache.java   |   9 +-
 .../apache/hadoop/hbase/io/hfile/CacheConfig.java  |   3 +-
 .../hadoop/hbase/io/hfile/CombinedBlockCache.java  |   7 +-
 .../org/apache/hadoop/hbase/io/hfile/HFile.java    |   7 +
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     |   4 +-
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java     |  20 +-
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |  10 +-
 .../master/procedure/CreateTableProcedure.java     |   3 +
 .../master/procedure/ModifyTableProcedure.java     |   2 +
 .../hadoop/hbase/regionserver/CellTSTiering.java   |  57 +++++
 .../regionserver/CustomTieredStoreEngine.java      |  56 +++++
 .../hadoop/hbase/regionserver/CustomTiering.java   |  58 +++++
 .../regionserver/CustomTieringMultiFileWriter.java |  85 +++++++
 .../{DataTieringType.java => DataTiering.java}     |  11 +-
 .../hbase/regionserver/DataTieringManager.java     |  98 ++------
 .../hadoop/hbase/regionserver/DataTieringType.java |  15 +-
 .../regionserver/DateTieredMultiFileWriter.java    |  20 +-
 .../hbase/regionserver/DateTieredStoreEngine.java  |  17 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |   4 +-
 .../hadoop/hbase/regionserver/StoreFileWriter.java |  13 +
 .../hbase/regionserver/compactions/Compactor.java  |   6 +
 .../compactions/CustomCellTieredUtils.java         |  49 ++++
 .../CustomCellTieringValueProvider.java            |  86 +++++++
 .../CustomDateTieredCompactionPolicy.java          | 155 ++++++++++++
 .../compactions/CustomTieredCompactor.java         |  74 ++++++
 .../compactions/DateTieredCompactionPolicy.java    | 129 ++++++----
 .../compactions/DateTieredCompactor.java           |  12 +-
 .../hbase/client/TestIllegalTableDescriptor.java   |  14 +-
 .../hadoop/hbase/io/hfile/TestBytesReadFromFs.java |   4 +
 ....java => TestCustomCellDataTieringManager.java} | 158 ++++++------
 .../TestCustomCellTieredCompactionPolicy.java      | 267 +++++++++++++++++++++
 .../hbase/regionserver/TestDataTieringManager.java |  13 +-
 .../compactions/TestCustomCellTieredCompactor.java | 148 ++++++++++++
 34 files changed, 1371 insertions(+), 245 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index eb9a7f3eccc..b0df4920e4e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -36,4 +36,6 @@ public final class TagType {
   // String based tag type used in replication
   public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
   public static final byte TTL_TAG_TYPE = (byte) 8;
+  // tag with the custom cell tiering value for the row
+  public static final byte CELL_VALUE_TIERING_TAG_TYPE = (byte) 9;
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
index 90fb7ce3491..313b4034fb8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
@@ -23,7 +23,6 @@ import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -199,13 +198,13 @@ public interface BlockCache extends 
Iterable<CachedBlock>, ConfigurationObserver
    * not be overridden by all implementing classes. In such cases, the 
returned Optional will be
    * empty. For subclasses implementing this logic, the returned Optional 
would contain the boolean
    * value reflecting if the passed block should indeed be cached.
-   * @param key              The key representing the block to check if it 
should be cached.
-   * @param timeRangeTracker the time range tracker containing the timestamps
-   * @param conf             The configuration object to use for determining 
caching behavior.
+   * @param key          The key representing the block to check if it should 
be cached.
+   * @param maxTimeStamp The maximum timestamp for the block to check if it 
should be cached.
+   * @param conf         The configuration object to use for determining 
caching behavior.
    * @return An empty Optional if this method is not supported; otherwise, the 
returned Optional
    *         contains the boolean value indicating if the block should be 
cached.
    */
-  default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, 
TimeRangeTracker timeRangeTracker,
+  default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long 
maxTimeStamp,
     Configuration conf) {
     return Optional.empty();
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 4f57668a0c6..b6357ca94b8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -269,7 +269,8 @@ public class CacheConfig implements 
PropagatingConfigurationObserver {
   public boolean shouldCacheBlockOnRead(BlockCategory category, HFileInfo 
hFileInfo,
     Configuration conf) {
     Optional<Boolean> cacheFileBlock = Optional.of(true);
-    if (getBlockCache().isPresent()) {
+    // For DATA blocks only, if BuckeCache is in use, we don't need to cache 
block again
+    if (getBlockCache().isPresent() && category.equals(BlockCategory.DATA)) {
       Optional<Boolean> result = 
getBlockCache().get().shouldCacheFile(hFileInfo, conf);
       if (result.isPresent()) {
         cacheFileBlock = result;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index cb3000dda19..856b5da6d11 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -484,10 +483,10 @@ public class CombinedBlockCache implements 
ResizableBlockCache, HeapSize {
   }
 
   @Override
-  public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, 
TimeRangeTracker timeRangeTracker,
+  public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long 
maxTimeStamp,
     Configuration conf) {
-    return combineCacheResults(l1Cache.shouldCacheBlock(key, timeRangeTracker, 
conf),
-      l2Cache.shouldCacheBlock(key, timeRangeTracker, conf));
+    return combineCacheResults(l1Cache.shouldCacheBlock(key, maxTimeStamp, 
conf),
+      l2Cache.shouldCacheBlock(key, maxTimeStamp, conf));
   }
 
   private Optional<Boolean> combineCacheResults(Optional<Boolean> result1,
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 2c3908aa33f..1f8f8421548 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -43,6 +43,7 @@ import 
org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.ShipperListener;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -217,6 +218,12 @@ public final class HFile {
      */
     void appendTrackedTimestampsToMetadata() throws IOException;
 
+    /**
+     * Add Custom cell timestamp to Metadata
+     */
+    public void appendCustomCellTimestampsToMetadata(TimeRangeTracker 
timeRangeTracker)
+      throws IOException;
+
     /** Returns the path to this {@link HFile} */
     Path getPath();
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 6aa16f9423c..e837e8f1bd6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1396,7 +1396,7 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
           cacheConf.getBlockCache().ifPresent(cache -> {
             LOG.debug("Skipping decompression of block {} in prefetch", 
cacheKey);
             // Cache the block if necessary
-            if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
+            if (cacheBlock && cacheOnRead) {
               cache.cacheBlock(cacheKey, blockNoChecksum, 
cacheConf.isInMemory(), cacheOnly);
             }
           });
@@ -1410,7 +1410,7 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
         HFileBlock unpackedNoChecksum = 
BlockCacheUtil.getBlockForCaching(cacheConf, unpacked);
         // Cache the block if necessary
         cacheConf.getBlockCache().ifPresent(cache -> {
-          if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
+          if (cacheBlock && cacheOnRead) {
             // Using the wait on cache during compaction and prefetching.
             cache.cacheBlock(cacheKey,
               cacheCompressed
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index 44ec324686e..2bdfcfa45b3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import static 
org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
+import static 
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
 
@@ -29,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -126,6 +128,12 @@ public class HFileWriterImpl implements HFile.Writer {
   /** Cache configuration for caching data on write. */
   protected final CacheConfig cacheConf;
 
+  public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> 
timeRangeTrackerForTiering) {
+    this.timeRangeTrackerForTiering = timeRangeTrackerForTiering;
+  }
+
+  private Supplier<TimeRangeTracker> timeRangeTrackerForTiering;
+
   /**
    * Name for this object used when logging or in toString. Is either the 
result of a toString on
    * stream or else name of passed file Path.
@@ -185,7 +193,9 @@ public class HFileWriterImpl implements HFile.Writer {
     this.path = path;
     this.name = path != null ? path.getName() : outputStream.toString();
     this.hFileContext = fileContext;
+    // TODO: Move this back to upper layer
     this.timeRangeTracker = 
TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
+    this.timeRangeTrackerForTiering = () -> this.timeRangeTracker;
     DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
     if (encoding != DataBlockEncoding.NONE) {
       this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
@@ -588,7 +598,8 @@ public class HFileWriterImpl implements HFile.Writer {
   }
 
   private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
-    Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeTracker, 
conf);
+    Optional<Boolean> result =
+      cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), 
conf);
     return result.orElse(true);
   }
 
@@ -899,6 +910,13 @@ public class HFileWriterImpl implements HFile.Writer {
     appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
   }
 
+  public void appendCustomCellTimestampsToMetadata(TimeRangeTracker 
timeRangeTracker)
+    throws IOException {
+    // TODO: The StoreFileReader always converts the byte[] to TimeRange
+    // via TimeRangeTracker, so we should write the serialization data of 
TimeRange directly.
+    appendFileInfo(CUSTOM_TIERING_TIME_RANGE, 
TimeRangeTracker.toByteArray(timeRangeTracker));
+  }
+
   /**
    * Record the earliest Put timestamp. If the timeRangeTracker is not set, 
update TimeRangeTracker
    * to include the timestamp of this key
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index f3903284067..8b333bce0b5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
 import org.apache.hadoop.hbase.regionserver.DataTieringManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.IdReadWriteLock;
@@ -1116,8 +1115,9 @@ public class BucketCache implements BlockCache, HeapSize {
           }
         }
 
-        if (bytesFreed < bytesToFreeWithExtra &&
-          coldFiles != null && 
coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
+        if (
+          bytesFreed < bytesToFreeWithExtra && coldFiles != null
+            && 
coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
         ) {
           int freedBlockSize = bucketEntryWithKey.getValue().getLength();
           if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
@@ -2365,10 +2365,10 @@ public class BucketCache implements BlockCache, 
HeapSize {
   }
 
   @Override
-  public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, 
TimeRangeTracker timeRangeTracker,
+  public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long 
maxTimestamp,
     Configuration conf) {
     DataTieringManager dataTieringManager = DataTieringManager.getInstance();
-    if (dataTieringManager != null && 
!dataTieringManager.isHotData(timeRangeTracker, conf)) {
+    if (dataTieringManager != null && 
!dataTieringManager.isHotData(maxTimestamp, conf)) {
       LOG.debug("Data tiering is enabled for file: '{}' and it is not hot 
data",
         key.getHfileName());
       return Optional.of(false);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 67e0236c7be..928b07ea561 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils;
 import 
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
 import 
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -296,6 +297,8 @@ public class CreateTableProcedure extends 
AbstractStateMachineTableProcedure<Cre
     
StoreFileTrackerValidationUtils.checkForCreateTable(env.getMasterConfiguration(),
       tableDescriptor);
 
+    CustomCellTieredUtils.checkForModifyTable(tableDescriptor);
+
     return true;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 5b51a5662db..76f3c086bea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.fs.ErasureCodingUtils;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils;
 import 
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
@@ -449,6 +450,7 @@ public class ModifyTableProcedure extends 
AbstractStateMachineTableProcedure<Mod
     // check for store file tracker configurations
     
StoreFileTrackerValidationUtils.checkForModifyTable(env.getMasterConfiguration(),
       unmodifiedTableDescriptor, modifiedTableDescriptor, 
!isTableEnabled(env));
+    CustomCellTieredUtils.checkForModifyTable(modifiedTableDescriptor);
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java
new file mode 100644
index 00000000000..ed7dc01ba8d
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
+
+import java.io.IOException;
+import java.util.OptionalLong;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class CellTSTiering implements DataTiering {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CellTSTiering.class);
+
+  public long getTimestamp(HStoreFile hStoreFile) {
+    OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
+    if (!maxTimestamp.isPresent()) {
+      LOG.debug("Maximum timestamp not present for {}", hStoreFile.getPath());
+      return Long.MAX_VALUE;
+    }
+    return maxTimestamp.getAsLong();
+  }
+
+  public long getTimestamp(HFileInfo hFileInfo) {
+    try {
+      byte[] hFileTimeRange = hFileInfo.get(TIMERANGE_KEY);
+      if (hFileTimeRange == null) {
+        LOG.debug("Timestamp information not found for file: {}",
+          hFileInfo.getHFileContext().getHFileName());
+        return Long.MAX_VALUE;
+      }
+      return TimeRangeTracker.parseFrom(hFileTimeRange).getMax();
+    } catch (IOException e) {
+      LOG.error("Error occurred while reading the timestamp metadata of file: 
{}",
+        hFileInfo.getHFileContext().getHFileName(), e);
+      return Long.MAX_VALUE;
+    }
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java
new file mode 100644
index 00000000000..518b31fb5be
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static 
org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CompoundConfiguration;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CustomDateTieredCompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.compactions.CustomTieredCompactor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Extension of {@link DateTieredStoreEngine} that uses a pluggable value 
provider for extracting
+ * the value to be used for comparison in this tiered compaction. Differently 
from the existing Date
+ * Tiered Compaction, this doesn't yield multiple tiers or files, but rather 
provides two tiers
+ * based on a configurable “cut-off” age. All rows with the cell tiering value 
older than this
+ * “cut-off” age would be placed together in an “old” tier, whilst younger 
rows would go to a
+ * separate, “young” tier file.
+ */
+@InterfaceAudience.Private
+public class CustomTieredStoreEngine extends DateTieredStoreEngine {
+
+  @Override
+  protected void createComponents(Configuration conf, HStore store, 
CellComparator kvComparator)
+    throws IOException {
+    CompoundConfiguration config = new CompoundConfiguration();
+    config.add(conf);
+    config.add(store.conf);
+    config.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
+      CustomDateTieredCompactionPolicy.class.getName());
+    createCompactionPolicy(config, store);
+    this.storeFileManager = new DefaultStoreFileManager(kvComparator,
+      StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, config, 
compactionPolicy.getConf());
+    this.storeFlusher = new DefaultStoreFlusher(config, store);
+    this.compactor = new CustomTieredCompactor(config, store);
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java
new file mode 100644
index 00000000000..7a9914c87d3
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static 
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
+
+import java.io.IOException;
+import java.util.Date;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class CustomTiering implements DataTiering {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CustomTiering.class);
+
+  private long getMaxTSFromTimeRange(byte[] hFileTimeRange, String hFileName) {
+    try {
+      if (hFileTimeRange == null) {
+        LOG.debug("Custom cell-based timestamp information not found for file: 
{}", hFileName);
+        return Long.MAX_VALUE;
+      }
+      long parsedValue = TimeRangeTracker.parseFrom(hFileTimeRange).getMax();
+      LOG.debug("Max TS for file {} is {}", hFileName, new Date(parsedValue));
+      return parsedValue;
+    } catch (IOException e) {
+      LOG.error("Error occurred while reading the Custom cell-based timestamp 
metadata of file: {}",
+        hFileName, e);
+      return Long.MAX_VALUE;
+    }
+  }
+
+  public long getTimestamp(HStoreFile hStoreFile) {
+    return 
getMaxTSFromTimeRange(hStoreFile.getMetadataValue(CUSTOM_TIERING_TIME_RANGE),
+      hStoreFile.getPath().getName());
+  }
+
+  public long getTimestamp(HFileInfo hFileInfo) {
+    return getMaxTSFromTimeRange(hFileInfo.get(CUSTOM_TIERING_TIME_RANGE),
+      hFileInfo.getHFileContext().getHFileName());
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java
new file mode 100644
index 00000000000..f2062fbf27b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.function.Function;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class CustomTieringMultiFileWriter extends DateTieredMultiFileWriter {
+
+  public static final byte[] CUSTOM_TIERING_TIME_RANGE = 
Bytes.toBytes("CUSTOM_TIERING_TIME_RANGE");
+
+  private NavigableMap<Long, TimeRangeTracker> lowerBoundary2TimeRanger = new 
TreeMap<>();
+
+  public CustomTieringMultiFileWriter(List<Long> lowerBoundaries,
+    Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile,
+    Function<Cell, Long> tieringFunction) {
+    super(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, 
tieringFunction);
+    for (Long lowerBoundary : lowerBoundaries) {
+      lowerBoundary2TimeRanger.put(lowerBoundary, null);
+    }
+  }
+
+  @Override
+  public void append(Cell cell) throws IOException {
+    super.append(cell);
+    long tieringValue = tieringFunction.apply(cell);
+    Map.Entry<Long, TimeRangeTracker> entry = 
lowerBoundary2TimeRanger.floorEntry(tieringValue);
+    if (entry.getValue() == null) {
+      TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
+      timeRangeTracker.setMin(tieringValue);
+      timeRangeTracker.setMax(tieringValue);
+      lowerBoundary2TimeRanger.put(entry.getKey(), timeRangeTracker);
+      ((HFileWriterImpl) 
lowerBoundary2Writer.get(entry.getKey()).getLiveFileWriter())
+        .setTimeRangeTrackerForTiering(() -> timeRangeTracker);
+    } else {
+      TimeRangeTracker timeRangeTracker = entry.getValue();
+      if (timeRangeTracker.getMin() > tieringValue) {
+        timeRangeTracker.setMin(tieringValue);
+      }
+      if (timeRangeTracker.getMax() < tieringValue) {
+        timeRangeTracker.setMax(tieringValue);
+      }
+    }
+  }
+
+  @Override
+  public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
+    Collection<HStoreFile> storeFiles) throws IOException {
+    for (Map.Entry<Long, StoreFileWriter> entry : 
this.lowerBoundary2Writer.entrySet()) {
+      StoreFileWriter writer = entry.getValue();
+      if (writer != null) {
+        writer.appendFileInfo(CUSTOM_TIERING_TIME_RANGE,
+          
TimeRangeTracker.toByteArray(lowerBoundary2TimeRanger.get(entry.getKey())));
+      }
+    }
+    return super.commitWriters(maxSeqId, majorCompaction, storeFiles);
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java
similarity index 82%
copy from 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
copy to 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java
index ee54576a648..51e89b0b79d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 
-@InterfaceAudience.Public
-public enum DataTieringType {
-  NONE,
-  TIME_RANGE
+@InterfaceAudience.Private
+public interface DataTiering {
+  long getTimestamp(HStoreFile hFile);
+
+  long getTimestamp(HFileInfo hFileInfo);
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
index f71bc5e43aa..8443827ccaa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
@@ -17,13 +17,11 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
-
 import java.io.IOException;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.OptionalLong;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -136,17 +134,18 @@ public class DataTieringManager {
    * the data tiering type is set to {@link DataTieringType#TIME_RANGE}, it 
uses the maximum
    * timestamp from the time range tracker to determine if the data is hot. 
Otherwise, it considers
    * the data as hot by default.
-   * @param timeRangeTracker the time range tracker containing the timestamps
-   * @param conf             The configuration object to use for determining 
hot data criteria.
+   * @param maxTimestamp the maximum timestamp associated with the data.
+   * @param conf         The configuration object to use for determining hot 
data criteria.
    * @return {@code true} if the data is hot, {@code false} otherwise
    */
-  public boolean isHotData(TimeRangeTracker timeRangeTracker, Configuration 
conf) {
+  public boolean isHotData(long maxTimestamp, Configuration conf) {
     DataTieringType dataTieringType = getDataTieringType(conf);
+
     if (
-      dataTieringType.equals(DataTieringType.TIME_RANGE)
-        && timeRangeTracker.getMax() != TimeRangeTracker.INITIAL_MAX_TIMESTAMP
+      !dataTieringType.equals(DataTieringType.NONE)
+        && maxTimestamp != TimeRangeTracker.INITIAL_MAX_TIMESTAMP
     ) {
-      return hotDataValidator(timeRangeTracker.getMax(), 
getDataTieringHotDataAge(conf));
+      return hotDataValidator(maxTimestamp, getDataTieringHotDataAge(conf));
     }
     // DataTieringType.NONE or other types are considered hot by default
     return true;
@@ -165,29 +164,14 @@ public class DataTieringManager {
     Configuration configuration = getConfiguration(hFilePath);
     DataTieringType dataTieringType = getDataTieringType(configuration);
 
-    if (dataTieringType.equals(DataTieringType.TIME_RANGE)) {
-      return hotDataValidator(getMaxTimestamp(hFilePath), 
getDataTieringHotDataAge(configuration));
-    }
-    // DataTieringType.NONE or other types are considered hot by default
-    return true;
-  }
-
-  /**
-   * Determines whether the data in the HFile at the given path is considered 
hot based on the
-   * configured data tiering type and hot data age. If the data tiering type 
is set to
-   * {@link DataTieringType#TIME_RANGE}, it validates the data against the 
provided maximum
-   * timestamp.
-   * @param hFilePath    the path to the HFile
-   * @param maxTimestamp the maximum timestamp to validate against
-   * @return {@code true} if the data is hot, {@code false} otherwise
-   * @throws DataTieringException if there is an error retrieving data tiering 
information
-   */
-  public boolean isHotData(Path hFilePath, long maxTimestamp) throws 
DataTieringException {
-    Configuration configuration = getConfiguration(hFilePath);
-    DataTieringType dataTieringType = getDataTieringType(configuration);
-
-    if (dataTieringType.equals(DataTieringType.TIME_RANGE)) {
-      return hotDataValidator(maxTimestamp, 
getDataTieringHotDataAge(configuration));
+    if (!dataTieringType.equals(DataTieringType.NONE)) {
+      HStoreFile hStoreFile = getHStoreFile(hFilePath);
+      if (hStoreFile == null) {
+        throw new DataTieringException(
+          "Store file corresponding to " + hFilePath + " doesn't exist");
+      }
+      return 
hotDataValidator(dataTieringType.getInstance().getTimestamp(getHStoreFile(hFilePath)),
+        getDataTieringHotDataAge(configuration));
     }
     // DataTieringType.NONE or other types are considered hot by default
     return true;
@@ -204,8 +188,9 @@ public class DataTieringManager {
    */
   public boolean isHotData(HFileInfo hFileInfo, Configuration configuration) {
     DataTieringType dataTieringType = getDataTieringType(configuration);
-    if (dataTieringType.equals(DataTieringType.TIME_RANGE)) {
-      return hotDataValidator(getMaxTimestamp(hFileInfo), 
getDataTieringHotDataAge(configuration));
+    if (hFileInfo != null && !dataTieringType.equals(DataTieringType.NONE)) {
+      return 
hotDataValidator(dataTieringType.getInstance().getTimestamp(hFileInfo),
+        getDataTieringHotDataAge(configuration));
     }
     // DataTieringType.NONE or other types are considered hot by default
     return true;
@@ -217,36 +202,6 @@ public class DataTieringManager {
     return diff <= hotDataAge;
   }
 
-  private long getMaxTimestamp(Path hFilePath) throws DataTieringException {
-    HStoreFile hStoreFile = getHStoreFile(hFilePath);
-    if (hStoreFile == null) {
-      LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath);
-      return Long.MAX_VALUE;
-    }
-    OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
-    if (!maxTimestamp.isPresent()) {
-      LOG.error("Maximum timestamp not present for {}", hFilePath);
-      return Long.MAX_VALUE;
-    }
-    return maxTimestamp.getAsLong();
-  }
-
-  private long getMaxTimestamp(HFileInfo hFileInfo) {
-    try {
-      byte[] hFileTimeRange = hFileInfo.get(TIMERANGE_KEY);
-      if (hFileTimeRange == null) {
-        LOG.error("Timestamp information not found for file: {}",
-          hFileInfo.getHFileContext().getHFileName());
-        return Long.MAX_VALUE;
-      }
-      return TimeRangeTracker.parseFrom(hFileTimeRange).getMax();
-    } catch (IOException e) {
-      LOG.error("Error occurred while reading the timestamp metadata of file: 
{}",
-        hFileInfo.getHFileContext().getHFileName(), e);
-      return Long.MAX_VALUE;
-    }
-  }
-
   private long getCurrentTimestamp() {
     return EnvironmentEdgeManager.getDelegate().currentTime();
   }
@@ -299,7 +254,7 @@ public class DataTieringManager {
   private HStoreFile getHStoreFile(Path hFilePath) throws DataTieringException 
{
     HStore hStore = getHStore(hFilePath);
     for (HStoreFile file : hStore.getStorefiles()) {
-      if 
(file.getPath().toUri().getPath().toString().equals(hFilePath.toString())) {
+      if (file.getPath().equals(hFilePath)) {
         return file;
       }
     }
@@ -330,7 +285,8 @@ public class DataTieringManager {
     for (HRegion r : this.onlineRegions.values()) {
       for (HStore hStore : r.getStores()) {
         Configuration conf = hStore.getReadOnlyConfiguration();
-        if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) {
+        DataTieringType dataTieringType = getDataTieringType(conf);
+        if (dataTieringType == DataTieringType.NONE) {
           // Data-Tiering not enabled for the store. Just skip it.
           continue;
         }
@@ -339,14 +295,10 @@ public class DataTieringManager {
         for (HStoreFile hStoreFile : hStore.getStorefiles()) {
           String hFileName =
             
hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName();
-          OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
-          if (!maxTimestamp.isPresent()) {
-            LOG.warn("maxTimestamp missing for file: {}",
-              hStoreFile.getFileInfo().getActiveFileName());
-            continue;
-          }
+          long maxTimeStamp = 
dataTieringType.getInstance().getTimestamp(hStoreFile);
+          LOG.debug("Max TS for file {} is {}", hFileName, new 
Date(maxTimeStamp));
           long currentTimestamp = 
EnvironmentEdgeManager.getDelegate().currentTime();
-          long fileAge = currentTimestamp - maxTimestamp.getAsLong();
+          long fileAge = currentTimestamp - maxTimeStamp;
           if (fileAge > hotDataAge) {
             // Values do not matter.
             coldFiles.put(hFileName, null);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
index ee54576a648..83da5f54e43 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
@@ -21,6 +21,17 @@ import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Public
 public enum DataTieringType {
-  NONE,
-  TIME_RANGE
+  NONE(null),
+  TIME_RANGE(new CellTSTiering()),
+  CUSTOM(new CustomTiering());
+
+  private final DataTiering instance;
+
+  DataTieringType(DataTiering instance) {
+    this.instance = instance;
+  }
+
+  public DataTiering getInstance() {
+    return instance;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
index e5ee8041c35..828d9ba0010 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.function.Function;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -33,12 +34,14 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
 
-  private final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer = new 
TreeMap<>();
+  protected final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer = 
new TreeMap<>();
 
   private final boolean needEmptyFile;
 
   private final Map<Long, String> lowerBoundariesPolicies;
 
+  protected Function<Cell, Long> tieringFunction;
+
   /**
    * @param lowerBoundariesPolicies each window to storage policy map.
    * @param needEmptyFile           whether need to create an empty store file 
if we haven't written
@@ -46,16 +49,29 @@ public class DateTieredMultiFileWriter extends 
AbstractMultiFileWriter {
    */
   public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
     Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile) {
+    this(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, c -> 
c.getTimestamp());
+  }
+
+  /**
+   * @param lowerBoundariesPolicies each window to storage policy map.
+   * @param needEmptyFile           whether need to create an empty store file 
if we haven't written
+   *                                out anything.
+   */
+  public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
+    Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile,
+    Function<Cell, Long> tieringFunction) {
     for (Long lowerBoundary : lowerBoundaries) {
       lowerBoundary2Writer.put(lowerBoundary, null);
     }
     this.needEmptyFile = needEmptyFile;
     this.lowerBoundariesPolicies = lowerBoundariesPolicies;
+    this.tieringFunction = tieringFunction;
   }
 
   @Override
   public void append(Cell cell) throws IOException {
-    Map.Entry<Long, StoreFileWriter> entry = 
lowerBoundary2Writer.floorEntry(cell.getTimestamp());
+    Map.Entry<Long, StoreFileWriter> entry =
+      lowerBoundary2Writer.floorEntry(tieringFunction.apply(cell));
     StoreFileWriter writer = entry.getValue();
     if (writer == null) {
       String lowerBoundaryStoragePolicy = 
lowerBoundariesPolicies.get(entry.getKey());
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
index 26437ab1124..dc13f190afa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static 
org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
+
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
@@ -29,6 +31,7 @@ import 
org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequ
 import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -44,6 +47,18 @@ public class DateTieredStoreEngine extends 
StoreEngine<DefaultStoreFlusher,
 
   public static final String DATE_TIERED_STORE_ENGINE = 
DateTieredStoreEngine.class.getName();
 
+  protected void createCompactionPolicy(Configuration conf, HStore store) 
throws IOException {
+    String className =
+      conf.get(DEFAULT_COMPACTION_POLICY_CLASS_KEY, 
DateTieredCompactionPolicy.class.getName());
+    try {
+      compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className,
+        new Class[] { Configuration.class, StoreConfigInformation.class },
+        new Object[] { conf, store });
+    } catch (Exception e) {
+      throw new IOException("Unable to load configured compaction policy '" + 
className + "'", e);
+    }
+  }
+
   @Override
   public boolean needsCompaction(List<HStoreFile> filesCompacting) {
     return compactionPolicy.needsCompaction(storeFileManager.getStoreFiles(), 
filesCompacting);
@@ -57,7 +72,7 @@ public class DateTieredStoreEngine extends 
StoreEngine<DefaultStoreFlusher,
   @Override
   protected void createComponents(Configuration conf, HStore store, 
CellComparator kvComparator)
     throws IOException {
-    this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
+    createCompactionPolicy(conf, store);
     this.storeFileManager = new DefaultStoreFileManager(kvComparator,
       StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, conf, 
compactionPolicy.getConf());
     this.storeFlusher = new DefaultStoreFlusher(conf, store);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 8fa555cc088..4085315ea88 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -701,8 +701,8 @@ public class HRegionServer extends Thread
       if (!isMasterNotCarryTable) {
         blockCache = BlockCacheFactory.createBlockCache(conf);
         // The call below, instantiates the DataTieringManager only when
-      // the configuration "hbase.regionserver.datatiering.enable" is set to 
true.
-      DataTieringManager.instantiate(conf,onlineRegions);
+        // the configuration "hbase.regionserver.datatiering.enable" is set to 
true.
+        DataTieringManager.instantiate(conf, onlineRegions);
         mobFileCache = new MobFileCache(conf);
       }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index 5f5fcf2001a..8a705c5ef14 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -255,6 +255,14 @@ public class StoreFileWriter implements CellSink, 
ShipperListener {
     }
   }
 
+  public void appendCustomCellTimestampsToMetadata(TimeRangeTracker 
timeRangeTracker)
+    throws IOException {
+    liveFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker);
+    if (historicalFileWriter != null) {
+      
historicalFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker);
+    }
+  }
+
   @Override
   public void beforeShipped() throws IOException {
     liveFileWriter.beforeShipped();
@@ -664,6 +672,11 @@ public class StoreFileWriter implements CellSink, 
ShipperListener {
       writer.appendTrackedTimestampsToMetadata();
     }
 
+    public void appendCustomCellTimestampsToMetadata(TimeRangeTracker 
timeRangeTracker)
+      throws IOException {
+      writer.appendCustomCellTimestampsToMetadata(timeRangeTracker);
+    }
+
     private void appendGeneralBloomfilter(final Cell cell) throws IOException {
       if (this.generalBloomFilterWriter != null) {
         /*
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 3e9e85a4aba..f18abf59309 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -401,6 +401,11 @@ public abstract class Compactor<T extends CellSink> {
 
   protected abstract void abortWriter(T writer) throws IOException;
 
+  protected List<Cell> decorateCells(List<Cell> cells) {
+    // no op
+    return cells;
+  }
+
   /**
    * Performs the compaction.
    * @param fd                FileDetails of cell sink writer
@@ -454,6 +459,7 @@ public abstract class Compactor<T extends CellSink> {
         // output to writer:
         Cell lastCleanCell = null;
         long lastCleanCellSeqId = 0;
+        cells = decorateCells(cells);
         for (Cell c : cells) {
           if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
             lastCleanCell = c;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java
new file mode 100644
index 00000000000..f908b31e4ae
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static 
org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class CustomCellTieredUtils {
+  private CustomCellTieredUtils() {
+    // Utility class, no instantiation
+  }
+
+  public static void checkForModifyTable(TableDescriptor newTable) throws 
IOException {
+    for (ColumnFamilyDescriptor descriptor : newTable.getColumnFamilies()) {
+      String storeEngineClassName = 
descriptor.getConfigurationValue(STORE_ENGINE_CLASS_KEY);
+      if (
+        storeEngineClassName != null && 
storeEngineClassName.contains("CustomCellTieredStoreEngine")
+      ) {
+        if (descriptor.getConfigurationValue(TIERING_CELL_QUALIFIER) == null) {
+          throw new DoNotRetryIOException("StoreEngine " + storeEngineClassName
+            + " is missing required " + TIERING_CELL_QUALIFIER + " 
parameter.");
+        }
+      }
+    }
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java
new file mode 100644
index 00000000000..fcf3b203b10
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * An extension of DateTieredCompactor, overriding the decorateCells method to 
allow for custom
+ * values to be used for the different file tiers during compaction.
+ */
+@InterfaceAudience.Private
+public class CustomCellTieringValueProvider implements 
CustomTieredCompactor.TieringValueProvider {
+  public static final String TIERING_CELL_QUALIFIER = "TIERING_CELL_QUALIFIER";
+  private byte[] tieringQualifier;
+
+  @Override
+  public void init(Configuration conf) throws Exception {
+    tieringQualifier = Bytes.toBytes(conf.get(TIERING_CELL_QUALIFIER));
+  }
+
+  @Override
+  public List<Cell> decorateCells(List<Cell> cells) {
+    // if no tiering qualifier properly set, skips the whole flow
+    if (tieringQualifier != null) {
+      byte[] tieringValue = null;
+      // first iterates through the cells within a row, to find the tiering 
value for the row
+      for (Cell cell : cells) {
+        if (CellUtil.matchingQualifier(cell, tieringQualifier)) {
+          tieringValue = CellUtil.cloneValue(cell);
+          break;
+        }
+      }
+      if (tieringValue == null) {
+        tieringValue = Bytes.toBytes(Long.MAX_VALUE);
+      }
+      // now apply the tiering value as a tag to all cells within the row
+      Tag tieringValueTag = new 
ArrayBackedTag(TagType.CELL_VALUE_TIERING_TAG_TYPE, tieringValue);
+      List<Cell> newCells = new ArrayList<>(cells.size());
+      for (Cell cell : cells) {
+        List<Tag> tags = PrivateCellUtil.getTags(cell);
+        tags.add(tieringValueTag);
+        newCells.add(PrivateCellUtil.createCell(cell, tags));
+      }
+      return newCells;
+    } else {
+      return cells;
+    }
+  }
+
+  @Override
+  public long getTieringValue(Cell cell) {
+    Optional<Tag> tagOptional = PrivateCellUtil.getTag(cell, 
TagType.CELL_VALUE_TIERING_TAG_TYPE);
+    if (tagOptional.isPresent()) {
+      Tag tag = tagOptional.get();
+      return Bytes.toLong(tag.getValueByteBuffer().array(), 
tag.getValueOffset(),
+        tag.getValueLength());
+    }
+    return Long.MAX_VALUE;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java
new file mode 100644
index 00000000000..dcc97c63d02
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static 
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Custom implementation of DateTieredCompactionPolicy that calculates 
compaction boundaries based
+ * on the <b>hbase.hstore.compaction.date.tiered.custom.age.limit.millis</b> 
configuration property
+ * and the TIERING_CELL_MIN/TIERING_CELL_MAX stored on metadata of each store 
file. This policy
+ * would produce either one or two tiers: - One tier if either all files data 
age are older than the
+ * configured age limit or all files data age are younger than the configured 
age limit. - Two tiers
+ * if files have both younger and older data than the configured age limit.
+ */
+@InterfaceAudience.Private
+public class CustomDateTieredCompactionPolicy extends 
DateTieredCompactionPolicy {
+
+  public static final String AGE_LIMIT_MILLIS =
+    "hbase.hstore.compaction.date.tiered.custom.age.limit.millis";
+
+  // Defaults to 10 years
+  public static final long DEFAULT_AGE_LIMIT_MILLIS =
+    (long) (10L * 365.25 * 24L * 60L * 60L * 1000L);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CustomDateTieredCompactionPolicy.class);
+
+  private long cutOffTimestamp;
+
+  public CustomDateTieredCompactionPolicy(Configuration conf,
+    StoreConfigInformation storeConfigInfo) throws IOException {
+    super(conf, storeConfigInfo);
+    cutOffTimestamp = EnvironmentEdgeManager.currentTime()
+      - conf.getLong(AGE_LIMIT_MILLIS, DEFAULT_AGE_LIMIT_MILLIS);
+
+  }
+
+  @Override
+  protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> 
filesToCompact,
+    long now) {
+    MutableLong min = new MutableLong(Long.MAX_VALUE);
+    MutableLong max = new MutableLong(0);
+    filesToCompact.forEach(f -> {
+      byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
+      long minCurrent = Long.MAX_VALUE;
+      long maxCurrent = 0;
+      if (timeRangeBytes != null) {
+        try {
+          TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.parseFrom(timeRangeBytes);
+          timeRangeTracker.getMin();
+          minCurrent = timeRangeTracker.getMin();
+          maxCurrent = timeRangeTracker.getMax();
+        } catch (IOException e) {
+          LOG.warn("Got TIERING_CELL_TIME_RANGE info from file, but failed to 
parse it:", e);
+        }
+      }
+      if (minCurrent < min.getValue()) {
+        min.setValue(minCurrent);
+      }
+      if (maxCurrent > max.getValue()) {
+        max.setValue(maxCurrent);
+      }
+    });
+
+    List<Long> boundaries = new ArrayList<>();
+    boundaries.add(Long.MIN_VALUE);
+    if (min.getValue() < cutOffTimestamp) {
+      boundaries.add(min.getValue());
+      if (max.getValue() > cutOffTimestamp) {
+        boundaries.add(cutOffTimestamp);
+      }
+    }
+    return boundaries;
+  }
+
+  @Override
+  public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> 
candidateSelection,
+    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
+    ArrayList<HStoreFile> filteredByPolicy = this.compactionPolicyPerWindow
+      .applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
+    return selectMajorCompaction(filteredByPolicy);
+  }
+
+  @Override
+  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> 
filesToCompact)
+    throws IOException {
+    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
+    long now = EnvironmentEdgeManager.currentTime();
+    if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) {
+      long cfTTL = this.storeConfigInfo.getStoreFileTtl();
+      int countLower = 0;
+      int countHigher = 0;
+      HDFSBlocksDistribution hdfsBlocksDistribution = new 
HDFSBlocksDistribution();
+      for (HStoreFile f : filesToCompact) {
+        if (checkForTtl(cfTTL, f)) {
+          return true;
+        }
+        if (isMajorOrBulkloadResult(f, now - lowTimestamp)) {
+          return true;
+        }
+        byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
+        TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.parseFrom(timeRangeBytes);
+        if (timeRangeTracker.getMin() < cutOffTimestamp) {
+          if (timeRangeTracker.getMax() > cutOffTimestamp) {
+            // Found at least one file crossing the cutOffTimestamp
+            return true;
+          } else {
+            countLower++;
+          }
+        } else {
+          countHigher++;
+        }
+        hdfsBlocksDistribution.add(f.getHDFSBlockDistribution());
+      }
+      // If we haven't found any files crossing the cutOffTimestamp, we have 
to check
+      // if there are at least more than one file on each tier and if so, 
perform compaction
+      if (countLower > 1 || countHigher > 1) {
+        return true;
+      }
+      return checkBlockLocality(hdfsBlocksDistribution);
+    }
+    return false;
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java
new file mode 100644
index 00000000000..905284a19c4
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class CustomTieredCompactor extends DateTieredCompactor {
+
+  public static final String TIERING_VALUE_PROVIDER =
+    "hbase.hstore.custom-tiering-value.provider.class";
+  private TieringValueProvider tieringValueProvider;
+
+  public CustomTieredCompactor(Configuration conf, HStore store) throws 
IOException {
+    super(conf, store);
+    String className =
+      conf.get(TIERING_VALUE_PROVIDER, 
CustomCellTieringValueProvider.class.getName());
+    try {
+      tieringValueProvider =
+        (TieringValueProvider) 
Class.forName(className).getConstructor().newInstance();
+      tieringValueProvider.init(conf);
+    } catch (Exception e) {
+      throw new IOException("Unable to load configured tiering value provider 
'" + className + "'",
+        e);
+    }
+  }
+
+  @Override
+  protected List<Cell> decorateCells(List<Cell> cells) {
+    return tieringValueProvider.decorateCells(cells);
+  }
+
+  @Override
+  protected DateTieredMultiFileWriter createMultiWriter(final 
CompactionRequestImpl request,
+    final List<Long> lowerBoundaries, final Map<Long, String> 
lowerBoundariesPolicies) {
+    return new CustomTieringMultiFileWriter(lowerBoundaries, 
lowerBoundariesPolicies,
+      needEmptyFile(request), 
CustomTieredCompactor.this.tieringValueProvider::getTieringValue);
+  }
+
+  public interface TieringValueProvider {
+
+    void init(Configuration configuration) throws Exception;
+
+    default List<Cell> decorateCells(List<Cell> cells) {
+      return cells;
+    }
+
+    long getTieringValue(Cell cell);
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
index c13bcf36af9..81cfd6a0c69 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -66,7 +67,7 @@ public class DateTieredCompactionPolicy extends 
SortedCompactionPolicy {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DateTieredCompactionPolicy.class);
 
-  private final RatioBasedCompactionPolicy compactionPolicyPerWindow;
+  protected final RatioBasedCompactionPolicy compactionPolicyPerWindow;
 
   private final CompactionWindowFactory windowFactory;
 
@@ -108,9 +109,8 @@ public class DateTieredCompactionPolicy extends 
SortedCompactionPolicy {
     }
   }
 
-  @Override
-  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> 
filesToCompact)
-    throws IOException {
+  protected boolean isMajorCompactionTime(Collection<HStoreFile> 
filesToCompact, long now,
+    long lowestModificationTime) throws IOException {
     long mcTime = getNextMajorCompactTime(filesToCompact);
     if (filesToCompact == null || mcTime == 0) {
       if (LOG.isDebugEnabled()) {
@@ -118,58 +118,40 @@ public class DateTieredCompactionPolicy extends 
SortedCompactionPolicy {
       }
       return false;
     }
-
     // TODO: Use better method for determining stamp of last major (HBASE-2990)
-    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
-    long now = EnvironmentEdgeManager.currentTime();
-    if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) {
+    if (lowestModificationTime <= 0L || lowestModificationTime >= (now - 
mcTime)) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("lowTimestamp: " + lowTimestamp + " lowTimestamp: " + 
lowTimestamp + " now: "
-          + now + " mcTime: " + mcTime);
+        LOG.debug("lowTimestamp: " + lowestModificationTime + " lowTimestamp: "
+          + lowestModificationTime + " now: " + now + " mcTime: " + mcTime);
       }
       return false;
     }
+    return true;
+  }
 
-    long cfTTL = this.storeConfigInfo.getStoreFileTtl();
-    HDFSBlocksDistribution hdfsBlocksDistribution = new 
HDFSBlocksDistribution();
-    List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now);
-    boolean[] filesInWindow = new boolean[boundaries.size()];
-
-    for (HStoreFile file : filesToCompact) {
-      OptionalLong minTimestamp = file.getMinimumTimestamp();
-      long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() 
: Long.MIN_VALUE;
-      if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) {
-        LOG.debug("Major compaction triggered on store " + this + "; for TTL 
maintenance");
-        return true;
-      }
-      if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) {
-        LOG.debug("Major compaction triggered on store " + this
-          + ", because there are new files and time since last major 
compaction "
-          + (now - lowTimestamp) + "ms");
-        return true;
-      }
+  protected boolean checkForTtl(long ttl, HStoreFile file) {
+    OptionalLong minTimestamp = file.getMinimumTimestamp();
+    long oldest = minTimestamp.isPresent()
+      ? EnvironmentEdgeManager.currentTime() - minTimestamp.getAsLong()
+      : Long.MIN_VALUE;
+    if (ttl != Long.MAX_VALUE && oldest >= ttl) {
+      LOG.debug("Major compaction triggered on store " + this + "; for TTL 
maintenance");
+      return true;
+    }
+    return false;
+  }
 
-      int lowerWindowIndex =
-        Collections.binarySearch(boundaries, 
minTimestamp.orElse(Long.MAX_VALUE));
-      int upperWindowIndex =
-        Collections.binarySearch(boundaries, 
file.getMaximumTimestamp().orElse(Long.MAX_VALUE));
-      // Handle boundary conditions and negative values of binarySearch
-      lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 
2) : lowerWindowIndex;
-      upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 
2) : upperWindowIndex;
-      if (lowerWindowIndex != upperWindowIndex) {
-        LOG.debug("Major compaction triggered on store " + this + "; because 
file " + file.getPath()
-          + " has data with timestamps cross window boundaries");
-        return true;
-      } else if (filesInWindow[upperWindowIndex]) {
-        LOG.debug("Major compaction triggered on store " + this
-          + "; because there are more than one file in some windows");
-        return true;
-      } else {
-        filesInWindow[upperWindowIndex] = true;
-      }
-      hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
+  protected boolean isMajorOrBulkloadResult(HStoreFile file, long timeDiff) {
+    if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) {
+      LOG.debug("Major compaction triggered on store " + this
+        + ", because there are new files and time since last major compaction 
" + timeDiff + "ms");
+      return true;
     }
+    return false;
+  }
 
+  protected boolean checkBlockLocality(HDFSBlocksDistribution 
hdfsBlocksDistribution)
+    throws UnknownHostException {
     float blockLocalityIndex = hdfsBlocksDistribution
       .getBlockLocalityIndex(DNS.getHostname(comConf.conf, 
DNS.ServerType.REGIONSERVER));
     if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
@@ -178,9 +160,55 @@ public class DateTieredCompactionPolicy extends 
SortedCompactionPolicy {
         + " (min " + comConf.getMinLocalityToForceCompact() + ")");
       return true;
     }
+    return false;
+  }
 
-    LOG.debug(
-      "Skipping major compaction of " + this + ", because the files are 
already major compacted");
+  @Override
+  public boolean shouldPerformMajorCompaction(Collection<HStoreFile> 
filesToCompact)
+    throws IOException {
+    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
+    long now = EnvironmentEdgeManager.currentTime();
+    if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) {
+      long cfTTL = this.storeConfigInfo.getStoreFileTtl();
+      HDFSBlocksDistribution hdfsBlocksDistribution = new 
HDFSBlocksDistribution();
+      List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, 
now);
+      boolean[] filesInWindow = new boolean[boundaries.size()];
+      for (HStoreFile file : filesToCompact) {
+        OptionalLong minTimestamp = file.getMinimumTimestamp();
+        if (checkForTtl(cfTTL, file)) {
+          return true;
+        }
+        if (isMajorOrBulkloadResult(file, now - lowTimestamp)) {
+          return true;
+        }
+        int lowerWindowIndex =
+          Collections.binarySearch(boundaries, 
minTimestamp.orElse(Long.MAX_VALUE));
+        int upperWindowIndex =
+          Collections.binarySearch(boundaries, 
file.getMaximumTimestamp().orElse(Long.MAX_VALUE));
+        // Handle boundary conditions and negative values of binarySearch
+        lowerWindowIndex =
+          (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : 
lowerWindowIndex;
+        upperWindowIndex =
+          (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : 
upperWindowIndex;
+        if (lowerWindowIndex != upperWindowIndex) {
+          LOG.debug("Major compaction triggered on store " + this + "; because 
file "
+            + file.getPath() + " has data with timestamps cross window 
boundaries");
+          return true;
+        } else if (filesInWindow[upperWindowIndex]) {
+          LOG.debug("Major compaction triggered on store " + this
+            + "; because there are more than one file in some windows");
+          return true;
+        } else {
+          filesInWindow[upperWindowIndex] = true;
+        }
+        hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
+      }
+      if (checkBlockLocality(hdfsBlocksDistribution)) {
+        return true;
+      }
+      LOG.debug(
+        "Skipping major compaction of " + this + ", because the files are 
already major compacted");
+    }
     return false;
   }
 
@@ -296,7 +324,8 @@ public class DateTieredCompactionPolicy extends 
SortedCompactionPolicy {
   /**
    * Return a list of boundaries for multiple compaction output in ascending 
order.
    */
-  private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> 
filesToCompact, long now) {
+  protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> 
filesToCompact,
+    long now) {
     long minTimestamp = filesToCompact.stream()
       .mapToLong(f -> 
f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min().orElse(Long.MAX_VALUE);
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index b5911b0cec4..9cef2ebc314 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -46,7 +46,7 @@ public class DateTieredCompactor extends 
AbstractMultiOutputCompactor<DateTiered
     super(conf, store);
   }
 
-  private boolean needEmptyFile(CompactionRequestImpl request) {
+  protected boolean needEmptyFile(CompactionRequestImpl request) {
     // if we are going to compact the last N files, then we need to emit an 
empty file to retain the
     // maxSeqId if we haven't written out anything.
     OptionalLong maxSeqId = 
StoreUtils.getMaxSequenceIdInList(request.getFiles());
@@ -70,14 +70,20 @@ public class DateTieredCompactor extends 
AbstractMultiOutputCompactor<DateTiered
         public DateTieredMultiFileWriter createWriter(InternalScanner scanner, 
FileDetails fd,
           boolean shouldDropBehind, boolean major, Consumer<Path> 
writerCreationTracker)
           throws IOException {
-          DateTieredMultiFileWriter writer = new 
DateTieredMultiFileWriter(lowerBoundaries,
-            lowerBoundariesPolicies, needEmptyFile(request));
+          DateTieredMultiFileWriter writer =
+            createMultiWriter(request, lowerBoundaries, 
lowerBoundariesPolicies);
           initMultiWriter(writer, scanner, fd, shouldDropBehind, major, 
writerCreationTracker);
           return writer;
         }
       }, throughputController, user);
   }
 
+  protected DateTieredMultiFileWriter createMultiWriter(final 
CompactionRequestImpl request,
+    final List<Long> lowerBoundaries, final Map<Long, String> 
lowerBoundariesPolicies) {
+    return new DateTieredMultiFileWriter(lowerBoundaries, 
lowerBoundariesPolicies,
+      needEmptyFile(request), c -> c.getTimestamp());
+  }
+
   @Override
   protected List<Path> commitWriter(DateTieredMultiFileWriter writer, 
FileDetails fd,
     CompactionRequestImpl request) throws IOException {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIllegalTableDescriptor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIllegalTableDescriptor.java
index 2d45c05324b..d7c743fbf59 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIllegalTableDescriptor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIllegalTableDescriptor.java
@@ -194,9 +194,10 @@ public class TestIllegalTableDescriptor {
 
   @Test
   public void testIllegalTableDescriptorWithDataTiering() throws IOException {
-    // table level configuration changes
     HTableDescriptor htd = new 
HTableDescriptor(TableName.valueOf(name.getMethodName()));
     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+    // table level configuration changes
+    htd.addFamily(hcd);
 
     // First scenario: DataTieringType set to TIME_RANGE without 
DateTieredStoreEngine
     htd.setValue(DataTieringManager.DATATIERING_KEY, 
DataTieringType.TIME_RANGE.name());
@@ -212,20 +213,23 @@ public class TestIllegalTableDescriptor {
       "org.apache.hadoop.hbase.regionserver.DefaultStoreEngine");
     checkTableIsIllegal(htd);
 
+    // column family level configuration changes
+    htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
+    hcd = new HColumnDescriptor(FAMILY);
+
     // First scenario: DataTieringType set to TIME_RANGE without 
DateTieredStoreEngine
-    hcd.setConfiguration(DataTieringManager.DATATIERING_KEY,
-      DataTieringType.TIME_RANGE.name());
+    hcd.setConfiguration(DataTieringManager.DATATIERING_KEY, 
DataTieringType.TIME_RANGE.name());
     checkTableIsIllegal(htd.addFamily(hcd));
 
     // Second scenario: DataTieringType set to TIME_RANGE with 
DateTieredStoreEngine
     hcd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY,
       "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine");
-    checkTableIsLegal(htd.addFamily(hcd));
+    checkTableIsLegal(htd.modifyFamily(hcd));
 
     // Third scenario: Disabling DateTieredStoreEngine while Time Range 
DataTiering is active
     hcd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY,
       "org.apache.hadoop.hbase.regionserver.DefaultStoreEngine");
-    checkTableIsIllegal(htd.addFamily(hcd));
+    checkTableIsIllegal(htd.modifyFamily(hcd));
   }
 
   private void checkTableIsLegal(HTableDescriptor htd) throws IOException {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java
index c85a162ad96..7e7b4cb5c37 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java
@@ -268,6 +268,10 @@ public class TestBytesReadFromFs {
 
     CacheConfig cacheConfig = new CacheConfig(conf);
     HFile.Reader reader = new HFilePreadReader(readerContext, hfile, 
cacheConfig, conf);
+    // Since HBASE-28466, we call fileInfo.initMetaAndIndex inside 
HFilePreadReader,
+    // which reads some blocks and increment the counters, so we need to reset 
it here.
+    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
     HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
 
     // Create iterator for reading root index block
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java
similarity index 89%
copy from 
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
copy to 
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java
index b24c1f2ed84..0771d41bb43 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -76,35 +77,37 @@ import org.slf4j.LoggerFactory;
 /**
  * This class is used to test the functionality of the DataTieringManager.
  *
- * The mock online regions are stored in {@link 
TestDataTieringManager#testOnlineRegions}.
- * For all tests, the setup of {@link 
TestDataTieringManager#testOnlineRegions} occurs only once.
- * Please refer to {@link TestDataTieringManager#setupOnlineRegions()} for the 
structure.
- * Additionally, a list of all store files is maintained in {@link 
TestDataTieringManager#hStoreFiles}.
+ * The mock online regions are stored in {@link 
TestCustomCellDataTieringManager#testOnlineRegions}.
+ * For all tests, the setup of
+ *  {@link TestCustomCellDataTieringManager#testOnlineRegions} occurs only 
once.
+ * Please refer to {@link 
TestCustomCellDataTieringManager#setupOnlineRegions()} for the structure.
+ * Additionally, a list of all store files is
+ *  maintained in {@link TestCustomCellDataTieringManager#hStoreFiles}.
  * The characteristics of these store files are listed below:
- * @formatter:off ## HStoreFile Information
- *
+ * @formatter:off
+ * ## HStoreFile Information
  * | HStoreFile       | Region             | Store               | DataTiering 
          | isHot |
  * 
|------------------|--------------------|---------------------|-----------------------|-------|
- * | hStoreFile0      | region1            | hStore11            | TIME_RANGE  
          | true  |
+ * | hStoreFile0      | region1            | hStore11            | 
CUSTOM_CELL_VALUE     | true  |
  * | hStoreFile1      | region1            | hStore12            | NONE        
          | true  |
- * | hStoreFile2      | region2            | hStore21            | TIME_RANGE  
          | true  |
- * | hStoreFile3      | region2            | hStore22            | TIME_RANGE  
          | false |
+ * | hStoreFile2      | region2            | hStore21            | 
CUSTOM_CELL_VALUE     | true  |
+ * | hStoreFile3      | region2            | hStore22            | 
CUSTOM_CELL_VALUE     | false |
  * @formatter:on
  */
 
 @Category({ RegionServerTests.class, SmallTests.class })
-public class TestDataTieringManager {
+public class TestCustomCellDataTieringManager {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestDataTieringManager.class);
+    HBaseClassTestRule.forClass(TestCustomCellDataTieringManager.class);
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestCustomCellDataTieringManager.class);
   private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestDataTieringManager.class);
   private static final long DAY = 24 * 60 * 60 * 1000;
   private static Configuration defaultConf;
   private static FileSystem fs;
-  private static BlockCache blockCache;
+  private BlockCache blockCache;
   private static CacheConfig cacheConf;
   private static Path testDir;
   private static final Map<String, HRegion> testOnlineRegions = new 
HashMap<>();
@@ -121,10 +124,10 @@ public class TestDataTieringManager {
 
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
-    testDir = 
TEST_UTIL.getDataTestDir(TestDataTieringManager.class.getSimpleName());
+    testDir = 
TEST_UTIL.getDataTestDir(TestCustomCellDataTieringManager.class.getSimpleName());
     defaultConf = TEST_UTIL.getConfiguration();
     updateCommonConfigurations();
-    assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
+    DataTieringManager.instantiate(defaultConf, testOnlineRegions);
     dataTieringManager = DataTieringManager.getInstance();
     rowKeyString = "";
   }
@@ -224,13 +227,14 @@ public class TestDataTieringManager {
 
     // Test with a filename where corresponding HStoreFile in not present
     hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(), 
"incorrectFileName");
-    testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, 
true);
+    testDataTieringMethodWithPathExpectingException(methodCallerWithPath, 
hFilePath,
+      new DataTieringException("Store file corresponding to " + hFilePath + " 
doesn't exist"));
   }
 
   @Test
   public void testPrefetchWhenDataTieringEnabled() throws IOException {
     setPrefetchBlocksOnOpen();
-    initializeTestEnvironment();
+    this.blockCache = initializeTestEnvironment();
     // Evict blocks from cache by closing the files and passing evict on close.
     // Then initialize the reader again. Since Prefetch on open is set to 
true, it should prefetch
     // those blocks.
@@ -280,27 +284,26 @@ public class TestDataTieringManager {
   @Test
   public void testCacheCompactedBlocksOnWriteDataTieringDisabled() throws 
IOException {
     setCacheCompactBlocksOnWrite();
-    initializeTestEnvironment();
-
-    HRegion region = createHRegion("table3");
+    this.blockCache = initializeTestEnvironment();
+    HRegion region = createHRegion("table3", this.blockCache);
     testCacheCompactedBlocksOnWrite(region, true);
   }
 
   @Test
   public void testCacheCompactedBlocksOnWriteWithHotData() throws IOException {
     setCacheCompactBlocksOnWrite();
-    initializeTestEnvironment();
-
-    HRegion region = createHRegion("table3", 
getConfWithTimeRangeDataTieringEnabled(5 * DAY));
+    this.blockCache = initializeTestEnvironment();
+    HRegion region =
+      createHRegion("table3", getConfWithCustomCellDataTieringEnabled(5 * 
DAY), this.blockCache);
     testCacheCompactedBlocksOnWrite(region, true);
   }
 
   @Test
   public void testCacheCompactedBlocksOnWriteWithColdData() throws IOException 
{
     setCacheCompactBlocksOnWrite();
-    initializeTestEnvironment();
-
-    HRegion region = createHRegion("table3", 
getConfWithTimeRangeDataTieringEnabled(DAY));
+    this.blockCache = initializeTestEnvironment();
+    HRegion region =
+      createHRegion("table3", getConfWithCustomCellDataTieringEnabled(DAY), 
this.blockCache);
     testCacheCompactedBlocksOnWrite(region, false);
   }
 
@@ -335,12 +338,11 @@ public class TestDataTieringManager {
     Path storeDir = hStore.getStoreContext().getFamilyStoreDirectoryPath();
     Configuration configuration = hStore.getReadOnlyConfiguration();
 
-    createHStoreFile(storeDir, configuration, currentTime - 2 * DAY,
-      hStore.getHRegion().getRegionFileSystem());
-    createHStoreFile(storeDir, configuration, currentTime - 3 * DAY,
-      hStore.getHRegion().getRegionFileSystem());
-    createHStoreFile(storeDir, configuration, currentTime - 4 * DAY,
-      hStore.getHRegion().getRegionFileSystem());
+    HRegionFileSystem regionFS = hStore.getHRegion().getRegionFileSystem();
+
+    createHStoreFile(storeDir, configuration, currentTime - 2 * DAY, regionFS);
+    createHStoreFile(storeDir, configuration, currentTime - 3 * DAY, regionFS);
+    createHStoreFile(storeDir, configuration, currentTime - 4 * DAY, regionFS);
   }
 
   @Test
@@ -568,48 +570,37 @@ public class TestDataTieringManager {
 
   @Test
   public void testCacheConfigShouldCacheFile() throws Exception {
-    // Evict the files from cache.
-    for (HStoreFile file : hStoreFiles) {
-      file.closeStoreFile(true);
-    }
+    initializeTestEnvironment();
     // Verify that the API shouldCacheFileBlock returns the result correctly.
     // hStoreFiles[0], hStoreFiles[1], hStoreFiles[2] are hot files.
     // hStoreFiles[3] is a cold file.
-    try {
-      assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
-        hStoreFiles.get(0).getFileInfo().getHFileInfo(),
-        hStoreFiles.get(0).getFileInfo().getConf()));
-      assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
-        hStoreFiles.get(1).getFileInfo().getHFileInfo(),
-        hStoreFiles.get(1).getFileInfo().getConf()));
-      assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
-        hStoreFiles.get(2).getFileInfo().getHFileInfo(),
-        hStoreFiles.get(2).getFileInfo().getConf()));
-      assertFalse(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
-        hStoreFiles.get(3).getFileInfo().getHFileInfo(),
-        hStoreFiles.get(3).getFileInfo().getConf()));
-    } finally {
-      for (HStoreFile file : hStoreFiles) {
-        file.initReader();
-      }
-    }
+    assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+      hStoreFiles.get(0).getFileInfo().getHFileInfo(), 
hStoreFiles.get(0).getFileInfo().getConf()));
+    assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+      hStoreFiles.get(1).getFileInfo().getHFileInfo(), 
hStoreFiles.get(1).getFileInfo().getConf()));
+    assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+      hStoreFiles.get(2).getFileInfo().getHFileInfo(), 
hStoreFiles.get(2).getFileInfo().getConf()));
+    assertFalse(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+      hStoreFiles.get(3).getFileInfo().getHFileInfo(), 
hStoreFiles.get(3).getFileInfo().getConf()));
   }
 
   @Test
   public void testCacheOnReadColdFile() throws Exception {
+    this.blockCache = initializeTestEnvironment();
     // hStoreFiles[3] is a cold file. the blocks should not get loaded after a 
readBlock call.
     HStoreFile hStoreFile = hStoreFiles.get(3);
     BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true, 
BlockType.DATA);
-    testCacheOnRead(hStoreFile, cacheKey, 23025, false);
+    testCacheOnRead(hStoreFile, cacheKey, -1, false);
   }
 
   @Test
   public void testCacheOnReadHotFile() throws Exception {
+    this.blockCache = initializeTestEnvironment();
     // hStoreFiles[0] is a hot file. the blocks should get loaded after a 
readBlock call.
     HStoreFile hStoreFile = hStoreFiles.get(0);
     BlockCacheKey cacheKey =
       new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
-    testCacheOnRead(hStoreFile, cacheKey, 23025, true);
+    testCacheOnRead(hStoreFile, cacheKey, -1, true);
   }
 
   private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long 
onDiskBlockSize,
@@ -618,7 +609,7 @@ public class TestDataTieringManager {
     hStoreFile.getReader().getHFileReader().readBlock(key.getOffset(), 
onDiskBlockSize, true, false,
       false, false, key.getBlockType(), DataBlockEncoding.NONE);
     // Validate that the hot block gets cached and cold block is not cached.
-    HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, 
false, BlockType.DATA);
+    HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, 
false);
     if (expectedCached) {
       assertNotNull(block);
     } else {
@@ -640,7 +631,8 @@ public class TestDataTieringManager {
           numColdBlocks++;
         }
       } catch (Exception e) {
-        fail("Unexpected exception!");
+        LOG.debug("Error validating priority for key {}", key, e);
+        fail(e.getMessage());
       }
     }
     assertEquals(expectedHotBlocks, numHotBlocks);
@@ -699,26 +691,28 @@ public class TestDataTieringManager {
     testDataTieringMethodWithKey(caller, key, expectedResult, null);
   }
 
-  private static void initializeTestEnvironment() throws IOException {
-    setupFileSystemAndCache();
-    setupOnlineRegions();
+  private static BlockCache initializeTestEnvironment() throws IOException {
+    BlockCache blockCache = setupFileSystemAndCache();
+    setupOnlineRegions(blockCache);
+    return blockCache;
   }
 
-  private static void setupFileSystemAndCache() throws IOException {
+  private static BlockCache setupFileSystemAndCache() throws IOException {
     fs = HFileSystem.get(defaultConf);
-    blockCache = BlockCacheFactory.createBlockCache(defaultConf);
+    BlockCache blockCache = BlockCacheFactory.createBlockCache(defaultConf);
     cacheConf = new CacheConfig(defaultConf, blockCache);
+    return blockCache;
   }
 
-  private static void setupOnlineRegions() throws IOException {
+  private static void setupOnlineRegions(BlockCache blockCache) throws 
IOException {
     testOnlineRegions.clear();
     hStoreFiles.clear();
     long day = 24 * 60 * 60 * 1000;
     long currentTime = System.currentTimeMillis();
 
-    HRegion region1 = createHRegion("table1");
+    HRegion region1 = createHRegion("table1", blockCache);
 
-    HStore hStore11 = createHStore(region1, "cf1", 
getConfWithTimeRangeDataTieringEnabled(day));
+    HStore hStore11 = createHStore(region1, "cf1", 
getConfWithCustomCellDataTieringEnabled(day));
     
hStoreFiles.add(createHStoreFile(hStore11.getStoreContext().getFamilyStoreDirectoryPath(),
       hStore11.getReadOnlyConfiguration(), currentTime, 
region1.getRegionFileSystem()));
     hStore11.refreshStoreFiles();
@@ -730,8 +724,8 @@ public class TestDataTieringManager {
     region1.stores.put(Bytes.toBytes("cf1"), hStore11);
     region1.stores.put(Bytes.toBytes("cf2"), hStore12);
 
-    HRegion region2 =
-      createHRegion("table2", getConfWithTimeRangeDataTieringEnabled((long) 
(2.5 * day)));
+    HRegion region2 = createHRegion("table2",
+      getConfWithCustomCellDataTieringEnabled((long) (2.5 * day)), blockCache);
 
     HStore hStore21 = createHStore(region2, "cf1");
     
hStoreFiles.add(createHStoreFile(hStore21.getStoreContext().getFamilyStoreDirectoryPath(),
@@ -753,11 +747,12 @@ public class TestDataTieringManager {
     testOnlineRegions.put(region2.getRegionInfo().getEncodedName(), region2);
   }
 
-  private static HRegion createHRegion(String table) throws IOException {
-    return createHRegion(table, defaultConf);
+  private static HRegion createHRegion(String table, BlockCache blockCache) 
throws IOException {
+    return createHRegion(table, defaultConf, blockCache);
   }
 
-  private static HRegion createHRegion(String table, Configuration conf) 
throws IOException {
+  private static HRegion createHRegion(String table, Configuration conf, 
BlockCache blockCache)
+    throws IOException {
     TableName tableName = TableName.valueOf(table);
 
     TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
@@ -796,15 +791,7 @@ public class TestDataTieringManager {
     return new HStore(region, columnFamilyDescriptor, conf, false);
   }
 
-  private static Configuration getConfWithTimeRangeDataTieringEnabled(long 
hotDataAge) {
-    Configuration conf = new Configuration(defaultConf);
-    conf.set(DataTieringManager.DATATIERING_KEY, 
DataTieringType.TIME_RANGE.name());
-    conf.set(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY, 
String.valueOf(hotDataAge));
-    return conf;
-  }
-
-
-  static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long 
timestamp,
+  private static HStoreFile createHStoreFile(Path storeDir, Configuration 
conf, long timestamp,
     HRegionFileSystem regionFs) throws IOException {
     String columnFamily = storeDir.getName();
 
@@ -816,6 +803,13 @@ public class TestDataTieringManager {
     return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf, 
BloomType.NONE, true);
   }
 
+  private static Configuration getConfWithCustomCellDataTieringEnabled(long 
hotDataAge) {
+    Configuration conf = new Configuration(defaultConf);
+    conf.set(DataTieringManager.DATATIERING_KEY, 
DataTieringType.CUSTOM.name());
+    conf.set(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY, 
String.valueOf(hotDataAge));
+    return conf;
+  }
+
   /**
    * Writes random data to a store file with rows arranged in 
lexicographically increasing order.
    * Each row is generated using the {@link #nextString()} method, ensuring 
that each subsequent row
@@ -833,6 +827,10 @@ public class TestDataTieringManager {
       }
     } finally {
       writer.appendTrackedTimestampsToMetadata();
+      TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
+      timeRangeTracker.setMin(timestamp);
+      timeRangeTracker.setMax(timestamp);
+      writer.appendCustomCellTimestampsToMetadata(timeRangeTracker);
       writer.close();
     }
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java
new file mode 100644
index 00000000000..b2c363bf53a
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static 
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CustomDateTieredCompactionPolicy;
+import 
org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestCustomCellTieredCompactionPolicy {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestCustomCellTieredCompactionPolicy.class);
+
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  public static final byte[] FAMILY = Bytes.toBytes("cf");
+
+  private HStoreFile createFile(Path file, long minValue, long maxValue, long 
size, int seqId)
+    throws IOException {
+    return createFile(mockRegionInfo(), file, minValue, maxValue, size, seqId, 
0);
+  }
+
+  private HStoreFile createFile(RegionInfo regionInfo, Path file, long 
minValue, long maxValue,
+    long size, int seqId, long ageInDisk) throws IOException {
+    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+    HRegionFileSystem regionFileSystem =
+      new HRegionFileSystem(TEST_UTIL.getConfiguration(), fs, file, 
regionInfo);
+    MockHStoreFile msf = new MockHStoreFile(TEST_UTIL, file, size, ageInDisk, 
false, (long) seqId);
+    TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
+    timeRangeTracker.setMin(minValue);
+    timeRangeTracker.setMax(maxValue);
+    msf.setMetadataValue(CUSTOM_TIERING_TIME_RANGE, 
TimeRangeTracker.toByteArray(timeRangeTracker));
+    return msf;
+  }
+
+  private CustomDateTieredCompactionPolicy mockAndCreatePolicy() throws 
Exception {
+    RegionInfo mockedRegionInfo = mockRegionInfo();
+    return mockAndCreatePolicy(mockedRegionInfo);
+  }
+
+  private CustomDateTieredCompactionPolicy mockAndCreatePolicy(RegionInfo 
regionInfo)
+    throws Exception {
+    StoreConfigInformation mockedStoreConfig = 
mock(StoreConfigInformation.class);
+    when(mockedStoreConfig.getRegionInfo()).thenReturn(regionInfo);
+    CustomDateTieredCompactionPolicy policy =
+      new CustomDateTieredCompactionPolicy(TEST_UTIL.getConfiguration(), 
mockedStoreConfig);
+    return policy;
+  }
+
+  private RegionInfo mockRegionInfo() {
+    RegionInfo mockedRegionInfo = mock(RegionInfo.class);
+    when(mockedRegionInfo.getEncodedName()).thenReturn("1234567890987654321");
+    return mockedRegionInfo;
+  }
+
+  private Path preparePath() throws Exception {
+    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+    Path file =
+      new Path(TEST_UTIL.getDataTestDir(), 
UUID.randomUUID().toString().replaceAll("-", ""));
+    fs.create(file);
+    return file;
+  }
+
+  @Test
+  public void testGetCompactBoundariesForMajorNoOld() throws Exception {
+    CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+    Path file = preparePath();
+    ArrayList<HStoreFile> files = new ArrayList<>();
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 0));
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 1));
+    assertEquals(1,
+      ((DateTieredCompactionRequest) 
policy.selectMajorCompaction(files)).getBoundaries().size());
+  }
+
+  @Test
+  public void testGetCompactBoundariesForMajorAllOld() throws Exception {
+    CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+    Path file = preparePath();
+    ArrayList<HStoreFile> files = new ArrayList<>();
+    // The default cut off age is 10 years, so any of the min/max value there 
should get in the old
+    // tier
+    files.add(createFile(file, 0, 1, 1024, 0));
+    files.add(createFile(file, 2, 3, 1024, 1));
+    assertEquals(2,
+      ((DateTieredCompactionRequest) 
policy.selectMajorCompaction(files)).getBoundaries().size());
+  }
+
+  @Test
+  public void testGetCompactBoundariesForMajorOneOnEachSide() throws Exception 
{
+    CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+    Path file = preparePath();
+    ArrayList<HStoreFile> files = new ArrayList<>();
+    files.add(createFile(file, 0, 1, 1024, 0));
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 1));
+    assertEquals(3,
+      ((DateTieredCompactionRequest) 
policy.selectMajorCompaction(files)).getBoundaries().size());
+  }
+
+  @Test
+  public void testGetCompactBoundariesForMajorOneCrossing() throws Exception {
+    CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+    Path file = preparePath();
+    ArrayList<HStoreFile> files = new ArrayList<>();
+    files.add(createFile(file, 0, EnvironmentEdgeManager.currentTime(), 1024, 
0));
+    assertEquals(3,
+      ((DateTieredCompactionRequest) 
policy.selectMajorCompaction(files)).getBoundaries().size());
+  }
+
+  @FunctionalInterface
+  interface PolicyValidator<T, U> {
+    void accept(T t, U u) throws Exception;
+  }
+
+  private void testShouldPerformMajorCompaction(long min, long max, int 
numFiles,
+    PolicyValidator<CustomDateTieredCompactionPolicy, ArrayList<HStoreFile>> 
validation)
+    throws Exception {
+    CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+    RegionInfo mockedRegionInfo = mockRegionInfo();
+    Path file = preparePath();
+    ArrayList<HStoreFile> files = new ArrayList<>();
+    ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(timeMachine);
+    for (int i = 0; i < numFiles; i++) {
+      MockHStoreFile mockedSFile = (MockHStoreFile) 
createFile(mockedRegionInfo, file, min, max,
+        1024, 0, HConstants.DEFAULT_MAJOR_COMPACTION_PERIOD);
+      mockedSFile.setIsMajor(true);
+      files.add(mockedSFile);
+    }
+    EnvironmentEdgeManager.reset();
+    validation.accept(policy, files);
+  }
+
+  @Test
+  public void testShouldPerformMajorCompactionOneFileCrossing() throws 
Exception {
+    long max = EnvironmentEdgeManager.currentTime();
+    testShouldPerformMajorCompaction(0, max, 1,
+      (p, f) -> assertTrue(p.shouldPerformMajorCompaction(f)));
+  }
+
+  @Test
+  public void testShouldPerformMajorCompactionOneFileMinMaxLow() throws 
Exception {
+    testShouldPerformMajorCompaction(0, 1, 1,
+      (p, f) -> assertFalse(p.shouldPerformMajorCompaction(f)));
+  }
+
+  @Test
+  public void testShouldPerformMajorCompactionOneFileMinMaxHigh() throws 
Exception {
+    long currentTime = EnvironmentEdgeManager.currentTime();
+    testShouldPerformMajorCompaction(currentTime, currentTime, 1,
+      (p, f) -> assertFalse(p.shouldPerformMajorCompaction(f)));
+  }
+
+  @Test
+  public void testShouldPerformMajorCompactionTwoFilesMinMaxHigh() throws 
Exception {
+    long currentTime = EnvironmentEdgeManager.currentTime();
+    testShouldPerformMajorCompaction(currentTime, currentTime, 2,
+      (p, f) -> assertTrue(p.shouldPerformMajorCompaction(f)));
+  }
+
+  @Test
+  public void testSelectMinorCompactionTwoFilesNoOld() throws Exception {
+    CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+    Path file = preparePath();
+    ArrayList<HStoreFile> files = new ArrayList<>();
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 0));
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 1));
+    // Shouldn't do minor compaction, as minimum number of files
+    // for minor compactions is 3
+    assertEquals(0, policy.selectMinorCompaction(files, true, 
true).getFiles().size());
+  }
+
+  @Test
+  public void testSelectMinorCompactionThreeFilesNoOld() throws Exception {
+    CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+    Path file = preparePath();
+    ArrayList<HStoreFile> files = new ArrayList<>();
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 0));
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 1));
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 2));
+    assertEquals(3, policy.selectMinorCompaction(files, true, 
true).getFiles().size());
+  }
+
+  @Test
+  public void testSelectMinorCompactionThreeFilesAllOld() throws Exception {
+    CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+    Path file = preparePath();
+    ArrayList<HStoreFile> files = new ArrayList<>();
+    files.add(createFile(file, 0, 1, 1024, 0));
+    files.add(createFile(file, 1, 2, 1024, 1));
+    files.add(createFile(file, 3, 4, 1024, 2));
+    assertEquals(3, policy.selectMinorCompaction(files, true, 
true).getFiles().size());
+  }
+
+  @Test
+  public void testSelectMinorCompactionThreeFilesOneOldTwoNew() throws 
Exception {
+    CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+    Path file = preparePath();
+    ArrayList<HStoreFile> files = new ArrayList<>();
+    files.add(createFile(file, 0, 1, 1024, 0));
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 1));
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 2));
+    assertEquals(3, policy.selectMinorCompaction(files, true, 
true).getFiles().size());
+  }
+
+  @Test
+  public void testSelectMinorCompactionThreeFilesTwoOldOneNew() throws 
Exception {
+    CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+    Path file = preparePath();
+    ArrayList<HStoreFile> files = new ArrayList<>();
+    files.add(createFile(file, 0, 1, 1024, 0));
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 1));
+    files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+      EnvironmentEdgeManager.currentTime(), 1024, 2));
+    assertEquals(3, policy.selectMinorCompaction(files, true, 
true).getFiles().size());
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
index b24c1f2ed84..585482c9409 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -224,7 +225,8 @@ public class TestDataTieringManager {
 
     // Test with a filename where corresponding HStoreFile in not present
     hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(), 
"incorrectFileName");
-    testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, 
true);
+    testDataTieringMethodWithPathExpectingException(methodCallerWithPath, 
hFilePath,
+      new DataTieringException("Store file corresponding to " + hFilePath + " 
doesn't exist"));
   }
 
   @Test
@@ -597,19 +599,21 @@ public class TestDataTieringManager {
 
   @Test
   public void testCacheOnReadColdFile() throws Exception {
+    initializeTestEnvironment();
     // hStoreFiles[3] is a cold file. the blocks should not get loaded after a 
readBlock call.
     HStoreFile hStoreFile = hStoreFiles.get(3);
     BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true, 
BlockType.DATA);
-    testCacheOnRead(hStoreFile, cacheKey, 23025, false);
+    testCacheOnRead(hStoreFile, cacheKey, -1, false);
   }
 
   @Test
   public void testCacheOnReadHotFile() throws Exception {
+    initializeTestEnvironment();
     // hStoreFiles[0] is a hot file. the blocks should get loaded after a 
readBlock call.
     HStoreFile hStoreFile = hStoreFiles.get(0);
     BlockCacheKey cacheKey =
       new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
-    testCacheOnRead(hStoreFile, cacheKey, 23025, true);
+    testCacheOnRead(hStoreFile, cacheKey, -1, true);
   }
 
   private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long 
onDiskBlockSize,
@@ -618,7 +622,7 @@ public class TestDataTieringManager {
     hStoreFile.getReader().getHFileReader().readBlock(key.getOffset(), 
onDiskBlockSize, true, false,
       false, false, key.getBlockType(), DataBlockEncoding.NONE);
     // Validate that the hot block gets cached and cold block is not cached.
-    HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, 
false, BlockType.DATA);
+    HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, 
false);
     if (expectedCached) {
       assertNotNull(block);
     } else {
@@ -803,7 +807,6 @@ public class TestDataTieringManager {
     return conf;
   }
 
-
   static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long 
timestamp,
     HRegionFileSystem regionFs) throws IOException {
     String columnFamily = storeDir.getName();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java
new file mode 100644
index 00000000000..331dd41e4f1
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static 
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
+import static 
org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.CustomTieredStoreEngine;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestCustomCellTieredCompactor {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestCustomCellTieredCompactor.class);
+
+  public static final byte[] FAMILY = Bytes.toBytes("cf");
+
+  protected HBaseTestingUtility utility;
+
+  protected Admin admin;
+
+  @Before
+  public void setUp() throws Exception {
+    utility = new HBaseTestingUtility();
+    
utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 
10);
+    utility.startMiniCluster();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    utility.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testCustomCellTieredCompactor() throws Exception {
+    ColumnFamilyDescriptorBuilder clmBuilder = 
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
+    clmBuilder.setValue("hbase.hstore.engine.class", 
CustomTieredStoreEngine.class.getName());
+    clmBuilder.setValue(TIERING_CELL_QUALIFIER, "date");
+    TableName tableName = TableName.valueOf("testCustomCellTieredCompactor");
+    TableDescriptorBuilder tblBuilder = 
TableDescriptorBuilder.newBuilder(tableName);
+    tblBuilder.setColumnFamily(clmBuilder.build());
+    utility.getAdmin().createTable(tblBuilder.build());
+    utility.waitTableAvailable(tableName);
+    Connection connection = utility.getConnection();
+    Table table = connection.getTable(tableName);
+    long recordTime = System.currentTimeMillis();
+    // write data and flush multiple store files:
+    for (int i = 0; i < 6; i++) {
+      List<Put> puts = new ArrayList<>(2);
+      Put put = new Put(Bytes.toBytes(i));
+      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
+      put.addColumn(FAMILY, Bytes.toBytes("date"),
+        Bytes.toBytes(recordTime - (11L * 366L * 24L * 60L * 60L * 1000L)));
+      puts.add(put);
+      put = new Put(Bytes.toBytes(i + 1000));
+      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 
1000)));
+      put.addColumn(FAMILY, Bytes.toBytes("date"), Bytes.toBytes(recordTime));
+      puts.add(put);
+      table.put(puts);
+      utility.flush(tableName);
+    }
+    table.close();
+    long firstCompactionTime = System.currentTimeMillis();
+    utility.getAdmin().majorCompact(tableName);
+    Waiter.waitFor(utility.getConfiguration(), 5000,
+      () -> 
utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName)
+          > firstCompactionTime);
+    long numHFiles = utility.getNumHFiles(tableName, FAMILY);
+    // The first major compaction would have no means to detect more than one 
tier,
+    // because without the min/max values available in the file info portion 
of the selected files
+    // for compaction, CustomCellDateTieredCompactionPolicy has no means
+    // to calculate the proper boundaries.
+    assertEquals(1, numHFiles);
+    
utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles()
+      .forEach(file -> {
+        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
+        assertNotNull(rangeBytes);
+        try {
+          TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.parseFrom(rangeBytes);
+          assertEquals((recordTime - (11L * 366L * 24L * 60L * 60L * 1000L)),
+            timeRangeTracker.getMin());
+          assertEquals(recordTime, timeRangeTracker.getMax());
+        } catch (IOException e) {
+          fail(e.getMessage());
+        }
+      });
+    // now do major compaction again, to make sure we write two separate files
+    long secondCompactionTime = System.currentTimeMillis();
+    utility.getAdmin().majorCompact(tableName);
+    Waiter.waitFor(utility.getConfiguration(), 5000,
+      () -> 
utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName)
+          > secondCompactionTime);
+    numHFiles = utility.getNumHFiles(tableName, FAMILY);
+    assertEquals(2, numHFiles);
+    
utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles()
+      .forEach(file -> {
+        byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
+        assertNotNull(rangeBytes);
+        try {
+          TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.parseFrom(rangeBytes);
+          assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
+        } catch (IOException e) {
+          fail(e.getMessage());
+        }
+      });
+  }
+}

Reply via email to