This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new cbc2e76dded HBASE-29727 Introduce a String pool for repeating 
filename, region and cf string fields in BlockCacheKey (#7477) (#7503)
cbc2e76dded is described below

commit cbc2e76dded8be5887a1e70a2ea3f233992bc2b2
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Thu Dec 11 18:35:49 2025 +0000

    HBASE-29727 Introduce a String pool for repeating filename, region and cf 
string fields in BlockCacheKey (#7477) (#7503)
    
    * HBASE-29727 Introduce a String pool for repeating filename, region and cf 
string fields in BlockCacheKey (#7477)
    
    Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
    
    * Resolved conflict
    
    Change-Id: I05a7dfb7d73cf50e76fc9ca260f4dacad12cd2c5
    
    * adding extra log to understand the flakey ut errors
    
    Change-Id: I3bc75f878d04262ac11e6f4a8ef4d743720b8701
    
    * some UT refinements to avoid flakeyness
    
    Change-Id: Ia0117bf3b0e8e4424fdfcb7e9e3e6bf65769c05c
    
    ---------
    
    Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
---
 dev-support/spotbugs-exclude.xml                   |  12 +
 .../src/main/protobuf/BucketCacheEntry.proto       |   3 +
 .../apache/hadoop/hbase/io/hfile/BlockCache.java   |   8 -
 .../hadoop/hbase/io/hfile/BlockCacheKey.java       | 127 ++++++--
 .../hadoop/hbase/io/hfile/CombinedBlockCache.java  |   5 -
 .../hadoop/hbase/io/hfile/HFilePreadReader.java    |   2 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     |   2 +-
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |  92 +++---
 .../hbase/io/hfile/bucket/BucketProtoUtils.java    |   5 +-
 .../hbase/io/hfile/bucket/FilePathStringPool.java  | 176 +++++++++++
 .../hbase/regionserver/DataTieringManager.java     |  85 +++---
 .../hadoop/hbase/io/hfile/CacheTestUtils.java      |  32 +-
 .../hfile/TestBlockEvictionOnRegionMovement.java   |   8 +-
 .../hbase/io/hfile/bucket/TestBucketCache.java     |  45 ++-
 .../io/hfile/bucket/TestFilePathStringPool.java    | 326 +++++++++++++++++++++
 .../bucket/TestRecoveryPersistentBucketCache.java  |  44 +--
 .../io/hfile/bucket/TestVerifyBucketCacheFile.java | 101 ++++---
 .../TestCustomCellDataTieringManager.java          |  36 +--
 .../hbase/regionserver/TestDataTieringManager.java | 102 +------
 19 files changed, 889 insertions(+), 322 deletions(-)

diff --git a/dev-support/spotbugs-exclude.xml b/dev-support/spotbugs-exclude.xml
index 2f0684eff4d..17b8d2cbded 100644
--- a/dev-support/spotbugs-exclude.xml
+++ b/dev-support/spotbugs-exclude.xml
@@ -271,4 +271,16 @@
     <Bug pattern="ML_SYNC_ON_UPDATED_FIELD"/>
   </Match>
 
+  <!--
+  Given it's a singleton, the instance must be returned. We need the same 
instance to be shared
+  between all BlockCacheKey objects, as well as the BucketCache instance.
+  -->
+  <Match>
+    <Class name="org.apache.hadoop.hbase.io.hfile.bucket.FilePathStringPool"/>
+    <Or>
+      <Method name="getInstance"/>
+    </Or>
+    <Bug pattern="MS_EXPOSE_REP"/>
+  </Match>
+
 </FindBugsFilter>
diff --git a/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto 
b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto
index 80fc10ada78..c1f7b496776 100644
--- a/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto
@@ -49,6 +49,9 @@ message BlockCacheKey {
   required int64 offset = 2;
   required BlockType block_type = 3;
   required bool primary_replica_block = 4;
+  optional string region_name = 5;
+  optional string family_name = 6;
+  optional bool archived = 7;
 }
 
 enum BlockType {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
index 0ae65b9a008..4a2fddf8866 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
@@ -85,14 +85,6 @@ public interface BlockCache extends Iterable<CachedBlock>, 
ConfigurationObserver
    */
   int evictBlocksByHfileName(String hfileName);
 
-  /**
-   * Evicts all blocks for the given HFile by path.
-   * @return the number of blocks evicted
-   */
-  default int evictBlocksByHfilePath(Path hfilePath) {
-    return evictBlocksByHfileName(hfilePath.getName());
-  }
-
   /**
    * Get the statistics for this block cache.
    */
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
index f87b456c29b..2142de7053f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.io.hfile;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.bucket.FilePathStringPool;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -27,75 +29,146 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public class BlockCacheKey implements HeapSize, java.io.Serializable {
-  private static final long serialVersionUID = -5199992013113130534L;
-  private final String hfileName;
+  private static final long serialVersionUID = -5199992013113130535L; // 
Changed due to format
+                                                                      // change
+
+  // New compressed format using integer file ID (when codec is available)
+  private final int hfileNameId;
+
+  private transient final FilePathStringPool stringPool;
+
+  private final int regionId;
+
+  private final int cfId;
+
   private final long offset;
+
   private BlockType blockType;
+
   private final boolean isPrimaryReplicaBlock;
 
-  private Path filePath;
+  private final boolean archived;
 
   /**
-   * Construct a new BlockCacheKey
+   * Constructs a new BlockCacheKey with the file name and offset only. To be 
used for cache lookups
+   * only, DO NOT use this for creating keys when inserting into the cache. 
Use either the
+   * overriding constructors with the path parameter or the region and cf 
parameters, otherwise,
+   * region cache metrics won't be recorded properly.
    * @param hfileName The name of the HFile this block belongs to.
    * @param offset    Offset of the block into the file
    */
   public BlockCacheKey(String hfileName, long offset) {
-    this(hfileName, offset, true, BlockType.DATA);
+    this(hfileName, null, null, offset, true, BlockType.DATA, false);
   }
 
+  /**
+   * Constructs a new BlockCacheKey with the file name, offset, replica and 
type only. To be used
+   * for cache lookups only, DO NOT use this for creating keys when inserting 
into the cache. Use
+   * either the overriding constructors with the path parameter or the region 
and cf parameters,
+   * otherwise, region cache metrics won't be recorded properly.
+   * @param hfileName        The name of the HFile this block belongs to.
+   * @param offset           Offset of the block into the file
+   * @param isPrimaryReplica Whether this is from primary replica
+   * @param blockType        Type of block
+   */
   public BlockCacheKey(String hfileName, long offset, boolean isPrimaryReplica,
     BlockType blockType) {
+    this(hfileName, null, null, offset, isPrimaryReplica, blockType, false);
+  }
+
+  /**
+   * Construct a new BlockCacheKey, with file, column family and region 
information. This should be
+   * used when inserting keys into the cache, so that region cache metrics are 
recorded properly.
+   * @param hfileName        The name of the HFile this block belongs to.
+   * @param cfName           The column family name
+   * @param regionName       The region name
+   * @param offset           Offset of the block into the file
+   * @param isPrimaryReplica Whether this is from primary replica
+   * @param blockType        Type of block
+   */
+  public BlockCacheKey(String hfileName, String cfName, String regionName, 
long offset,
+    boolean isPrimaryReplica, BlockType blockType, boolean archived) {
     this.isPrimaryReplicaBlock = isPrimaryReplica;
-    this.hfileName = hfileName;
     this.offset = offset;
     this.blockType = blockType;
+    this.stringPool = FilePathStringPool.getInstance();
+    // Use string pool for file, region and cf values
+    this.hfileNameId = stringPool.encode(hfileName);
+    this.regionId = (regionName != null) ? stringPool.encode(regionName) : -1;
+    this.cfId = (cfName != null) ? stringPool.encode(cfName) : -1;
+    this.archived = archived;
   }
 
+  /**
+   * Construct a new BlockCacheKey using a file path. File, column family and 
region information
+   * will be extracted from the passed path. This should be used when 
inserting keys into the cache,
+   * so that region cache metrics are recorded properly.
+   * @param hfilePath        The path to the HFile
+   * @param offset           Offset of the block into the file
+   * @param isPrimaryReplica Whether this is from primary replica
+   * @param blockType        Type of block
+   */
   public BlockCacheKey(Path hfilePath, long offset, boolean isPrimaryReplica, 
BlockType blockType) {
-    this.filePath = hfilePath;
-    this.isPrimaryReplicaBlock = isPrimaryReplica;
-    this.hfileName = hfilePath.getName();
-    this.offset = offset;
-    this.blockType = blockType;
+    this(hfilePath.getName(), hfilePath.getParent().getName(),
+      hfilePath.getParent().getParent().getName(), offset, isPrimaryReplica, 
blockType,
+      HFileArchiveUtil.isHFileArchived(hfilePath));
   }
 
   @Override
   public int hashCode() {
-    return hfileName.hashCode() * 127 + (int) (offset ^ (offset >>> 32));
+    return hfileNameId * 127 + (int) (offset ^ (offset >>> 32));
   }
 
   @Override
   public boolean equals(Object o) {
     if (o instanceof BlockCacheKey) {
       BlockCacheKey k = (BlockCacheKey) o;
-      return offset == k.offset
-        && (hfileName == null ? k.hfileName == null : 
hfileName.equals(k.hfileName));
-    } else {
-      return false;
+      if (offset != k.offset) {
+        return false;
+      }
+      return getHfileName().equals(k.getHfileName());
     }
+    return false;
   }
 
   @Override
   public String toString() {
-    return this.hfileName + '_' + this.offset;
+    return getHfileName() + '_' + this.offset;
   }
 
   public static final long FIXED_OVERHEAD = 
ClassSize.estimateBase(BlockCacheKey.class, false);
 
   /**
-   * Strings have two bytes per character due to default Java Unicode encoding 
(hence length times
-   * 2).
+   * With the compressed format using integer file IDs, the heap size is 
significantly reduced. We
+   * now only store a 4-byte integer instead of the full file name string.
    */
   @Override
   public long heapSize() {
-    return ClassSize.align(FIXED_OVERHEAD + ClassSize.STRING + 2 * 
hfileName.length());
+    return ClassSize.align(FIXED_OVERHEAD);
   }
 
-  // can't avoid this unfortunately
-  /** Returns The hfileName portion of this cache key */
+  /**
+   * Returns the hfileName portion of this cache key.
+   * @return The file name
+   */
   public String getHfileName() {
-    return hfileName;
+    return stringPool.decode(hfileNameId);
+  }
+
+  /**
+   * Returns the region name portion of this cache key.
+   * @return The region name
+   */
+  public String getRegionName() {
+    return stringPool.decode(regionId);
+  }
+
+  /**
+   * Returns the column family name portion of this cache key.
+   * @return The column family name
+   */
+  public String getCfName() {
+    return stringPool.decode(cfId);
   }
 
   public boolean isPrimary() {
@@ -114,12 +187,8 @@ public class BlockCacheKey implements HeapSize, 
java.io.Serializable {
     this.blockType = blockType;
   }
 
-  public Path getFilePath() {
-    return filePath;
-  }
-
-  public void setFilePath(Path filePath) {
-    this.filePath = filePath;
+  public boolean isArchived() {
+    return archived;
   }
 
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index ba1a5ed9ae9..6a9c0c778f4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -149,11 +149,6 @@ public class CombinedBlockCache implements 
ResizableBlockCache, HeapSize {
     return l1Cache.evictBlocksByHfileName(hfileName) + 
l2Cache.evictBlocksByHfileName(hfileName);
   }
 
-  @Override
-  public int evictBlocksByHfilePath(Path hfilePath) {
-    return l1Cache.evictBlocksByHfilePath(hfilePath) + 
l2Cache.evictBlocksByHfilePath(hfilePath);
-  }
-
   @Override
   public CacheStats getStats() {
     return this.combinedCacheStats;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 39af3585112..147e2598ef9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -185,7 +185,7 @@ public class HFilePreadReader extends HFileReaderImpl {
     // Deallocate data blocks
     cacheConf.getBlockCache().ifPresent(cache -> {
       if (evictOnClose) {
-        int numEvicted = cache.evictBlocksByHfilePath(path);
+        int numEvicted = cache.evictBlocksByHfileName(name);
         if (LOG.isTraceEnabled()) {
           LOG.trace("On close, file= {} evicted= {} block(s)", name, 
numEvicted);
         }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index db183bb7177..7d865d6fdc1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1256,7 +1256,7 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
       // Check cache for block. If found return.
       long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
       BlockCacheKey cacheKey =
-        new BlockCacheKey(name, metaBlockOffset, 
this.isPrimaryReplicaReader(), BlockType.META);
+        new BlockCacheKey(path, metaBlockOffset, 
this.isPrimaryReplicaReader(), BlockType.META);
 
       cacheBlock &=
         cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory(), 
getHFileInfo(), conf);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 0b6ea4c6e66..6667af0de25 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -88,7 +88,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.IdReadWriteLock;
 import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
 import org.apache.hadoop.hbase.util.Pair;
@@ -398,6 +397,7 @@ public class BucketCache implements BlockCache, HeapSize {
         LOG.warn("Can't restore from file[{}]. The bucket cache will be reset 
and rebuilt."
           + " Exception seen: ", persistencePath, ex);
         backingMap.clear();
+        FilePathStringPool.getInstance().clear();
         fullyCachedFiles.clear();
         backingMapValidated.set(true);
         regionCachedSize.clear();
@@ -586,7 +586,8 @@ public class BucketCache implements BlockCache, HeapSize {
     if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) {
       cacheKey.setBlockType(cachedItem.getBlockType());
     }
-    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
+    LOG.debug("Caching key={}, item={}, key heap size={}", cacheKey, 
cachedItem,
+      cacheKey.heapSize());
     // Stuff the entry into the RAM cache so it can get drained to the 
persistent store
     RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, 
accessCount.incrementAndGet(),
       inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine, wait);
@@ -639,6 +640,7 @@ public class BucketCache implements BlockCache, HeapSize {
       referredFileName = 
StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond();
     }
     if (referredFileName != null) {
+      // Since we just need this key for a lookup, it's enough to use only 
name and offset
       BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, 
key.getOffset());
       foundEntry = backingMap.get(convertedCacheKey);
       LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", 
key.getHfileName(),
@@ -671,6 +673,8 @@ public class BucketCache implements BlockCache, HeapSize {
       return re.getData();
     }
     BucketEntry bucketEntry = backingMap.get(key);
+    LOG.debug("bucket entry for key {}: {}", key,
+      bucketEntry == null ? null : bucketEntry.offset());
     if (bucketEntry == null) {
       bucketEntry = getBlockForReference(key);
     }
@@ -749,7 +753,7 @@ public class BucketCache implements BlockCache, HeapSize {
   private void fileNotFullyCached(BlockCacheKey key, BucketEntry entry) {
     // Update the updateRegionCachedSize before removing the file from 
fullyCachedFiles.
     // This computation should happen even if the file is not in 
fullyCachedFiles map.
-    updateRegionCachedSize(key.getFilePath(), (entry.getLength() * -1));
+    updateRegionCachedSize(key, (entry.getLength() * -1));
     fullyCachedFiles.remove(key.getHfileName());
   }
 
@@ -762,12 +766,13 @@ public class BucketCache implements BlockCache, HeapSize {
     fullyCachedFiles.put(filePath.getName(), pair);
   }
 
-  private void updateRegionCachedSize(Path filePath, long cachedSize) {
-    if (filePath != null) {
-      if (HFileArchiveUtil.isHFileArchived(filePath)) {
-        LOG.trace("Skipping region cached size update for archived file: {}", 
filePath);
+  private void updateRegionCachedSize(BlockCacheKey key, long cachedSize) {
+    if (key.getRegionName() != null) {
+      if (key.isArchived()) {
+        LOG.trace("Skipping region cached size update for archived file:{} 
from region: {}",
+          key.getHfileName(), key.getRegionName());
       } else {
-        String regionName = filePath.getParent().getParent().getName();
+        String regionName = key.getRegionName();
         regionCachedSize.merge(regionName, cachedSize,
           (previousSize, newBlockSize) -> previousSize + newBlockSize);
         LOG.trace("Updating region cached size for region: {}", regionName);
@@ -775,6 +780,7 @@ public class BucketCache implements BlockCache, HeapSize {
         // remove the entry for that region from regionCachedSize map.
         if (regionCachedSize.get(regionName) <= 0) {
           regionCachedSize.remove(regionName);
+          FilePathStringPool.getInstance().remove(regionName);
         }
       }
     }
@@ -834,6 +840,7 @@ public class BucketCache implements BlockCache, HeapSize {
       if (existedInRamCache && evictedByEvictionProcess) {
         cacheStats.evicted(0, cacheKey.isPrimary());
       }
+      LOG.debug("Entry for key {} was not found in backing map", cacheKey);
       return existedInRamCache;
     } else {
       return bucketEntryToUse.withWriteLock(offsetLock, () -> {
@@ -843,6 +850,9 @@ public class BucketCache implements BlockCache, HeapSize {
           blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, 
evictedByEvictionProcess);
           return true;
         }
+        LOG.debug("Failed to remove key {} from map. Maybe entries in the map 
now differ? "
+          + "Original found entry: {}, what's in the map now: {}", cacheKey,
+          bucketEntryToUse, backingMap.get(cacheKey));
         return false;
       });
     }
@@ -994,6 +1004,7 @@ public class BucketCache implements BlockCache, HeapSize {
       + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + 
cacheStats.evictedPerEviction()
       + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount() + 
", blocksCount="
       + backingMap.size());
+    LOG.info(FilePathStringPool.getInstance().getPoolStats());
     cacheStats.reset();
 
     bucketAllocator.logDebugStatistics();
@@ -1338,7 +1349,7 @@ public class BucketCache implements BlockCache, HeapSize {
   protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) 
{
     BucketEntry previousEntry = backingMap.put(key, bucketEntry);
     blocksByHFile.add(key);
-    updateRegionCachedSize(key.getFilePath(), bucketEntry.getLength());
+    updateRegionCachedSize(key, bucketEntry.getLength());
     if (previousEntry != null && previousEntry != bucketEntry) {
       previousEntry.withWriteLock(offsetLock, () -> {
         blockEvicted(key, previousEntry, false, false);
@@ -1748,6 +1759,7 @@ public class BucketCache implements BlockCache, HeapSize {
 
     backingMap.clear();
     blocksByHFile.clear();
+    FilePathStringPool.getInstance().clear();
 
     // Read the backing map entries in batches.
     int numChunks = 0;
@@ -1803,6 +1815,7 @@ public class BucketCache implements BlockCache, HeapSize {
       this.blocksByHFile.clear();
       this.fullyCachedFiles.clear();
       this.regionCachedSize.clear();
+      FilePathStringPool.getInstance().clear();
     }
     if (cacheStats.getMetricsRollerScheduler() != null) {
       cacheStats.getMetricsRollerScheduler().shutdownNow();
@@ -1816,24 +1829,27 @@ public class BucketCache implements BlockCache, 
HeapSize {
 
   @Override
   public void shutdown() {
-    disableCache();
-    LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() 
+ "; path to write="
-      + persistencePath);
-    if (ioEngine.isPersistent() && persistencePath != null) {
-      try {
-        join();
-        if (cachePersister != null) {
-          LOG.info("Shutting down cache persister thread.");
-          cachePersister.shutdown();
-          while (cachePersister.isAlive()) {
-            Thread.sleep(10);
+    if (isCacheEnabled()) {
+      disableCache();
+      LOG.info("Shutdown bucket cache: IO persistent=" + 
ioEngine.isPersistent()
+        + "; path to write=" + persistencePath);
+      if (ioEngine.isPersistent() && persistencePath != null) {
+        try {
+          join();
+          if (cachePersister != null) {
+            LOG.info("Shutting down cache persister thread.");
+            cachePersister.shutdown();
+            while (cachePersister.isAlive()) {
+              Thread.sleep(10);
+            }
           }
+          persistToFile();
+          FilePathStringPool.getInstance().clear();
+        } catch (IOException ex) {
+          LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
+        } catch (InterruptedException e) {
+          LOG.warn("Failed to persist data on exit", e);
         }
-        persistToFile();
-      } catch (IOException ex) {
-        LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
-      } catch (InterruptedException e) {
-        LOG.warn("Failed to persist data on exit", e);
       }
     }
   }
@@ -1915,33 +1931,31 @@ public class BucketCache implements BlockCache, 
HeapSize {
   }
 
   @Override
-  public int evictBlocksByHfilePath(Path hfilePath) {
-    return evictBlocksRangeByHfileName(hfilePath.getName(), hfilePath, 0, 
Long.MAX_VALUE);
-  }
-
-  public int evictBlocksRangeByHfileName(String hfileName, Path filePath, long 
initOffset,
-    long endOffset) {
+  public int evictBlocksRangeByHfileName(String hfileName, long initOffset, 
long endOffset) {
     Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, 
endOffset);
+    // We need to make sure whether we are evicting all blocks for this given 
file. In case of
+    // split references, we might be evicting just half of the blocks
+    int totalFileKeys = (endOffset == Long.MAX_VALUE)
+      ? keySet.size()
+      : getAllCacheKeysForFile(hfileName, 0, Long.MAX_VALUE).size();
     LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: 
{}", keySet.size(),
       hfileName, initOffset, endOffset);
     int numEvicted = 0;
     for (BlockCacheKey key : keySet) {
-      if (filePath != null) {
-        key.setFilePath(filePath);
-      }
       if (evictBlock(key)) {
         ++numEvicted;
       }
     }
+    if (numEvicted > 0) {
+      if (totalFileKeys == numEvicted) {
+        FilePathStringPool.getInstance().remove(hfileName);
+      }
+    }
     return numEvicted;
   }
 
-  @Override
-  public int evictBlocksRangeByHfileName(String hfileName, long initOffset, 
long endOffset) {
-    return evictBlocksRangeByHfileName(hfileName, null, initOffset, endOffset);
-  }
-
   private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long 
init, long end) {
+    // These keys are just for comparison and are short lived, so we need only 
file name and offset
     return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true,
       new BlockCacheKey(hfileName, end), true);
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
index eb9c2cb5de8..b87e0e0dd62 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
@@ -165,8 +165,9 @@ final class BucketProtoUtils {
       
.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));
     for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
       BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
-      BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), 
protoKey.getOffset(),
-        protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
+      BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), 
protoKey.getFamilyName(),
+        protoKey.getRegionName(), protoKey.getOffset(), 
protoKey.getPrimaryReplicaBlock(),
+        fromPb(protoKey.getBlockType()), protoKey.getArchived());
       BucketCacheProtos.BucketEntry protoValue = entry.getValue();
       // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the 
ByteBuffAllocator
       // which created by RpcServer elegantly.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FilePathStringPool.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FilePathStringPool.java
new file mode 100644
index 00000000000..7e08158af0e
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FilePathStringPool.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pool of string values encoded to integer IDs for use in BlockCacheKey. This 
allows for avoiding
+ * duplicating string values for file names, region and CF values on various 
BlockCacheKey
+ * instances. Normally, single hfiles have many blocks. This means all blocks 
from the same file
+ * will have the very same file, region and CF names. On very large 
BucketCache setups (i.e. file
+ * based cache with TB size order), can save few GBs of memory by avoiding 
repeating these common
+ * string values on blocks from the same file. The FilePathStringPool is 
implemented as a singleton,
+ * since the same pool should be shared by all BlockCacheKey instances, as 
well as the BucketCache
+ * object itself. The Id for an encoded string is an integer. Any new String 
added to the pool is
+ * assigned the next available integer ID, starting from 0 upwards. That sets 
the total pool
+ * capacity to Integer.MAX_VALUE. In the event of ID exhaustion (integer 
overflow when Id values
+ * reach Integer.MAX_VALUE), the encode() method will restart iterating over 
int values
+ * incrementally from 0 until it finds an unused ID. Strings can be removed 
from the pool using the
+ * remove() method. BucketCache should call this when evicting all blocks for 
a given file (see
+ * BucketCache.evictFileBlocksFromCache()).
+ * <p>
+ * Thread-safe implementation that maintains bidirectional mappings between 
strings and IDs.
+ * </p>
+ */
[email protected]
+public class FilePathStringPool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePathStringPool.class);
+
+  // Bidirectional mappings for string objects re-use
+  private final ConcurrentHashMap<String, Integer> stringToId = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<Integer, String> idToString = new 
ConcurrentHashMap<>();
+  private final AtomicInteger nextId = new AtomicInteger(0);
+
+  private static FilePathStringPool instance;
+
+  public static FilePathStringPool getInstance() {
+    synchronized (FilePathStringPool.class) {
+      if (instance == null) {
+        instance = new FilePathStringPool();
+      }
+    }
+    return instance;
+  }
+
+  private FilePathStringPool() {
+    // Private constructor for singleton
+  }
+
+  /**
+   * Gets or creates an integer ID for the given String.
+   * @param string value for the file/region/CF name.
+   * @return the integer ID encoding this string in the pool.
+   */
+  public int encode(String string) {
+    if (string == null) {
+      throw new IllegalArgumentException("string cannot be null");
+    }
+    return stringToId.computeIfAbsent(string, name -> {
+      if (stringToId.size() == Integer.MAX_VALUE) {
+        throw new IllegalStateException(
+          "String pool has reached maximum capacity of " + Integer.MAX_VALUE + 
" unique strings.");
+      }
+      int id = nextId.getAndIncrement();
+      while (idToString.containsKey(id)) {
+        id = nextId.getAndIncrement();
+        if (id == Integer.MAX_VALUE) {
+          nextId.set(0);
+          LOG.info("Id values reached Integer.MAX_VALUE, restarting from 0");
+        }
+      }
+      idToString.put(id, name);
+      LOG.trace("Encoded new string to ID {}: {}", id, name);
+      return id;
+    });
+  }
+
+  /**
+   * Decodes an integer ID back to its original file name.
+   * @param id the integer ID
+   * @return the original file name, or null if not found
+   */
+  public String decode(int id) {
+    return idToString.get(id);
+  }
+
+  /**
+   * Checks if a given string ID is already being used.
+   * @param id the integer ID to check
+   * @return true if the ID exists
+   */
+  public boolean contains(int id) {
+    return idToString.containsKey(id);
+  }
+
+  /**
+   * Checks if a given string has been encoded.
+   * @param string the value to check
+   * @return true if the string value has been encoded
+   */
+  public boolean contains(String string) {
+    return stringToId.containsKey(string);
+  }
+
+  /**
+   * Gets the number of unique file names currently tracked.
+   * @return the number of entries in the codec
+   */
+  public int size() {
+    return stringToId.size();
+  }
+
+  /**
+   * Removes a string value and its ID from the pool. This should only be 
called when all blocks for
+   * a file have been evicted from the cache.
+   * @param string the file name to remove
+   * @return true if the file name was removed, false if it wasn't present
+   */
+  public boolean remove(String string) {
+    if (string == null) {
+      return false;
+    }
+    Integer id = stringToId.remove(string);
+    if (id != null) {
+      idToString.remove(id);
+      LOG.debug("Removed string value from pool: {} (ID: {})", string, id);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Clears all mappings from the codec.
+   */
+  public void clear() {
+    stringToId.clear();
+    idToString.clear();
+    nextId.set(0);
+    LOG.info("Cleared all file name mappings from codec");
+  }
+
+  /**
+   * Gets statistics about memory savings from string pooling.
+   * @return a formatted string with compression statistics
+   */
+  public String getPoolStats() {
+    long uniqueStrings = stringToId.size();
+    if (uniqueStrings == 0) {
+      return "No strings encoded";
+    }
+    // Calculate average string length
+    long totalChars = 
stringToId.keySet().stream().mapToLong(String::length).sum();
+    double avgLength = (double) totalChars / uniqueStrings;
+    return String.format("FilePathStringPool stats: %d unique strings, avg 
length: %.1f chars, ",
+      uniqueStrings, avgLength);
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
index 6638fd2049c..cb02c04e9e3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -96,11 +95,14 @@ public class DataTieringManager {
    * @throws DataTieringException if there is an error retrieving the HFile 
path or configuration
    */
   public boolean isDataTieringEnabled(BlockCacheKey key) throws 
DataTieringException {
-    Path hFilePath = key.getFilePath();
-    if (hFilePath == null) {
-      throw new DataTieringException("BlockCacheKey Doesn't Contain HFile 
Path");
+    if (key.getCfName() == null || key.getRegionName() == null) {
+      throw new DataTieringException(
+        "BlockCacheKey doesn't contain Column Family Name or Region Name");
     }
-    return isDataTieringEnabled(hFilePath);
+    Configuration configuration =
+      getHStore(key.getRegionName(), 
key.getCfName()).getReadOnlyConfiguration();
+    DataTieringType dataTieringType = getDataTieringType(configuration);
+    return !dataTieringType.equals(DataTieringType.NONE);
   }
 
   /**
@@ -125,11 +127,16 @@ public class DataTieringManager {
    * @throws DataTieringException if there is an error retrieving data tiering 
information
    */
   public boolean isHotData(BlockCacheKey key) throws DataTieringException {
-    Path hFilePath = key.getFilePath();
-    if (hFilePath == null) {
-      throw new DataTieringException("BlockCacheKey Doesn't Contain HFile 
Path");
+    if (key.getRegionName() == null) {
+      throw new DataTieringException("BlockCacheKey doesn't contain Region 
Name");
+    }
+    if (key.getCfName() == null) {
+      throw new DataTieringException("BlockCacheKey doesn't contain CF Name");
     }
-    return isHotData(hFilePath);
+    if (key.getHfileName() == null) {
+      throw new DataTieringException("BlockCacheKey doesn't contain File 
Name");
+    }
+    return isHotData(key.getRegionName(), key.getCfName(), key.getHfileName());
   }
 
   /**
@@ -157,24 +164,14 @@ public class DataTieringManager {
     return true;
   }
 
-  /**
-   * Determines whether the data in the HFile at the given path is considered 
hot based on the
-   * configured data tiering type and hot data age. If the data tiering type 
is set to
-   * {@link DataTieringType#TIME_RANGE} and maximum timestamp is not present, 
it considers
-   * {@code Long.MAX_VALUE} as the maximum timestamp, making the data hot by 
default.
-   * @param hFilePath the path to the HFile
-   * @return {@code true} if the data is hot, {@code false} otherwise
-   * @throws DataTieringException if there is an error retrieving data tiering 
information
-   */
-  public boolean isHotData(Path hFilePath) throws DataTieringException {
-    Configuration configuration = getConfiguration(hFilePath);
+  private boolean isHotData(String region, String cf, String fileName) throws 
DataTieringException {
+    Configuration configuration = getHStore(region, 
cf).getReadOnlyConfiguration();
     DataTieringType dataTieringType = getDataTieringType(configuration);
-
     if (!dataTieringType.equals(DataTieringType.NONE)) {
-      HStoreFile hStoreFile = getHStoreFile(hFilePath);
+      HStoreFile hStoreFile = getHStoreFile(region, cf, fileName);
       if (hStoreFile == null) {
         throw new DataTieringException(
-          "Store file corresponding to " + hFilePath + " doesn't exist");
+          "Store file corresponding to " + region + "/" + cf + "/" + fileName 
+ " doesn't exist");
       }
       long maxTimestamp = 
dataTieringType.getInstance().getTimestamp(hStoreFile);
       if (isWithinGracePeriod(maxTimestamp, configuration)) {
@@ -244,34 +241,29 @@ public class DataTieringManager {
     return coldHFiles;
   }
 
-  private HRegion getHRegion(Path hFilePath) throws DataTieringException {
-    String regionId;
-    try {
-      regionId = HRegionFileSystem.getRegionId(hFilePath);
-    } catch (IOException e) {
-      throw new DataTieringException(e.getMessage());
-    }
-    HRegion hRegion = this.onlineRegions.get(regionId);
+  private HRegion getHRegion(String region) throws DataTieringException {
+    HRegion hRegion = this.onlineRegions.get(region);
     if (hRegion == null) {
-      throw new DataTieringException("HRegion corresponding to " + hFilePath + 
" doesn't exist");
+      throw new DataTieringException("HRegion corresponding to " + region + " 
doesn't exist");
     }
     return hRegion;
   }
 
-  private HStore getHStore(Path hFilePath) throws DataTieringException {
-    HRegion hRegion = getHRegion(hFilePath);
-    String columnFamily = hFilePath.getParent().getName();
-    HStore hStore = hRegion.getStore(Bytes.toBytes(columnFamily));
+  private HStore getHStore(String region, String cf) throws 
DataTieringException {
+    HRegion hRegion = getHRegion(region);
+    HStore hStore = hRegion.getStore(Bytes.toBytes(cf));
     if (hStore == null) {
-      throw new DataTieringException("HStore corresponding to " + hFilePath + 
" doesn't exist");
+      throw new DataTieringException(
+        "HStore corresponding to " + region + "/" + cf + " doesn't exist");
     }
     return hStore;
   }
 
-  private HStoreFile getHStoreFile(Path hFilePath) throws DataTieringException 
{
-    HStore hStore = getHStore(hFilePath);
+  private HStoreFile getHStoreFile(String region, String cf, String fileName)
+    throws DataTieringException {
+    HStore hStore = getHStore(region, cf);
     for (HStoreFile file : hStore.getStorefiles()) {
-      if 
(file.getPath().toUri().getPath().toString().equals(hFilePath.toString())) {
+      if (file.getPath().getName().equals(fileName)) {
         return file;
       }
     }
@@ -279,7 +271,18 @@ public class DataTieringManager {
   }
 
   private Configuration getConfiguration(Path hFilePath) throws 
DataTieringException {
-    HStore hStore = getHStore(hFilePath);
+    String regionName = null;
+    String cfName = null;
+    try {
+      regionName = hFilePath.getParent().getParent().getName();
+      cfName = hFilePath.getParent().getName();
+    } catch (Exception e) {
+      throw new DataTieringException("Incorrect HFile Path: " + hFilePath);
+    }
+    if (regionName == null || cfName == null) {
+      throw new DataTieringException("Incorrect HFile Path: " + hFilePath);
+    }
+    HStore hStore = getHStore(regionName, cfName);
     return hStore.getReadOnlyConfiguration();
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index e66dd345418..501a0347026 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -280,7 +280,24 @@ public class CacheTestUtils {
   }
 
   public static HFileBlockPair[] generateHFileBlocks(int blockSize, int 
numBlocks) {
-    return generateBlocksForPath(blockSize, numBlocks, null);
+    return generateBlocksForPath(blockSize, numBlocks, null, false);
+  }
+
+  public static String[] getHFileNames(HFileBlockPair[] blocks) {
+    String[] names = new String[blocks.length];
+    for (int i = 0; i < blocks.length; i++) {
+      names[i] = blocks[i].blockName.getHfileName();
+    }
+    return names;
+  }
+
+  public static BlockCacheKey[] regenerateKeys(HFileBlockPair[] blocks, 
String[] names) {
+    BlockCacheKey[] keys = new BlockCacheKey[blocks.length];
+    for (int i = 0; i < blocks.length; i++) {
+      keys[i] = new BlockCacheKey(names[i], blocks[i].blockName.getOffset(), 
true,
+        blocks[i].blockName.getBlockType());
+    }
+    return keys;
   }
 
   public static HFileBlockPair[] generateBlocksForPath(int blockSize, int 
numBlocks, Path path,
@@ -312,28 +329,25 @@ public class CacheTestUtils {
         ByteBuffAllocator.HEAP);
       String key = null;
       long offset = 0;
+      returnedBlocks[i] = new HFileBlockPair();
       if (path != null) {
-        key = path.getName();
         offset = i * blockSize;
+        returnedBlocks[i].blockName =
+          new BlockCacheKey(path, offset, true, encoded ? 
BlockType.ENCODED_DATA : BlockType.DATA);
       } else {
         /* No conflicting keys */
         key = Long.toString(rand.nextLong());
         while (!usedStrings.add(key)) {
           key = Long.toString(rand.nextLong());
         }
+        returnedBlocks[i].blockName =
+          new BlockCacheKey(key, offset, true, encoded ? 
BlockType.ENCODED_DATA : BlockType.DATA);
       }
-      returnedBlocks[i] = new HFileBlockPair();
-      returnedBlocks[i].blockName =
-        new BlockCacheKey(key, offset, true, encoded ? BlockType.ENCODED_DATA 
: BlockType.DATA);
       returnedBlocks[i].block = generated;
     }
     return returnedBlocks;
   }
 
-  public static HFileBlockPair[] generateBlocksForPath(int blockSize, int 
numBlocks, Path path) {
-    return generateBlocksForPath(blockSize, numBlocks, path, false);
-  }
-
   public static class HFileBlockPair {
     BlockCacheKey blockName;
     HFileBlock block;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
index baacb3c18a2..5001113f248 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
@@ -177,7 +177,13 @@ public class TestBlockEvictionOnRegionMovement {
 
   @After
   public void tearDown() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
+    try {
+      TEST_UTIL.shutdownMiniCluster();
+    } catch (NullPointerException e) {
+      //shutdown clears the FilePathStringPool. Since it's a singleton, the 
second RS shutting down
+      // might try to persist bucket cache after string pool is cleared and 
NPE is thrown. This
+      // won't happen in real clusters, since there will be only one 
BucketCache instance per JVM.
+    }
     TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
     if (zkCluster != null) {
       zkCluster.shutdown();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 3c8a807fcb1..09f4091dfe8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -48,6 +48,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -658,6 +659,7 @@ public class TestBucketCache {
     assertEquals(0, cache.getStats().getEvictionCount());
 
     // add back
+    key = new BlockCacheKey("testEvictionCount", 0);
     CacheTestUtils.getBlockAndAssertEquals(cache, key, 
blockWithNextBlockMetadata, actualBuffer,
       block1Buffer);
     waitUntilFlushedToBucket(cache, key);
@@ -676,6 +678,37 @@ public class TestBucketCache {
     assertEquals(1, cache.getStats().getEvictionCount());
   }
 
+  @Test
+  public void testStringPool() throws Exception {
+    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+    Path testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+    BucketCache bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
+    assertTrue(bucketCache.waitForCacheInitialization(10000));
+    long usedSize = bucketCache.getAllocator().getUsedSize();
+    assertEquals(0, usedSize);
+    Random rand = ThreadLocalRandom.current();
+    Path filePath = new Path(testDir, Long.toString(rand.nextLong()));
+    CacheTestUtils.HFileBlockPair[] blocks =
+      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 1, filePath, 
false);
+    String name = blocks[0].getBlockName().getHfileName();
+    assertEquals(name, filePath.getName());
+    assertNotNull(blocks[0].getBlockName().getRegionName());
+    bucketCache.cacheBlock(blocks[0].getBlockName(), blocks[0].getBlock());
+    waitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName());
+    assertTrue(FilePathStringPool.getInstance().size() > 0);
+    bucketCache.evictBlock(blocks[0].getBlockName());
+    assertTrue(FilePathStringPool.getInstance().size() > 0);
+    bucketCache.cacheBlock(blocks[0].getBlockName(), blocks[0].getBlock());
+    waitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName());
+    bucketCache.fileCacheCompleted(filePath,
+      bucketCache.backingMap.get(blocks[0].getBlockName()).getLength());
+    bucketCache.evictBlocksByHfileName(name);
+    assertEquals(1, FilePathStringPool.getInstance().size());
+  }
+
   @Test
   public void testCacheBlockNextBlockMetadataMissing() throws Exception {
     int size = 100;
@@ -884,6 +917,7 @@ public class TestBucketCache {
 
       HFileBlockPair[] hfileBlockPairs =
         CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
+      String[] names = CacheTestUtils.getHFileNames(hfileBlockPairs);
       // Add blocks
       for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
         bucketCache.cacheBlock(hfileBlockPair.getBlockName(), 
hfileBlockPair.getBlock(), false,
@@ -912,10 +946,9 @@ public class TestBucketCache {
         constructedBlockSizes, writeThreads, writerQLen, persistencePath);
       assertTrue(bucketCache.waitForCacheInitialization(10000));
       assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
-
-      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
-        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
-        bucketCache.evictBlock(blockCacheKey);
+      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(hfileBlockPairs, 
names);
+      for (BlockCacheKey key : newKeys) {
+        bucketCache.evictBlock(key);
       }
       assertEquals(0, bucketCache.getAllocator().getUsedSize());
       assertEquals(0, bucketCache.backingMap.size());
@@ -1098,9 +1131,9 @@ public class TestBucketCache {
       constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, 
null, 60 * 1000,
       HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions);
     HFileBlockPair[] validBlockPairs =
-      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, 
validFile);
+      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, 
validFile, false);
     HFileBlockPair[] orphanBlockPairs =
-      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, 
orphanFile);
+      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, 
orphanFile, false);
     for (HFileBlockPair pair : validBlockPairs) {
       bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), 
false, true);
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFilePathStringPool.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFilePathStringPool.java
new file mode 100644
index 00000000000..a42a61c93fb
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFilePathStringPool.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for {@link FilePathStringPool}
+ */
+@Category({ SmallTests.class })
+public class TestFilePathStringPool {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestFilePathStringPool.class);
+
+  private FilePathStringPool pool;
+
+  @Before
+  public void setUp() {
+    pool = FilePathStringPool.getInstance();
+    pool.clear();
+  }
+
+  @Test
+  public void testSingletonPattern() {
+    FilePathStringPool instance1 = FilePathStringPool.getInstance();
+    FilePathStringPool instance2 = FilePathStringPool.getInstance();
+    assertNotNull(instance1);
+    assertNotNull(instance2);
+    assertEquals(instance1, instance2);
+  }
+
+  @Test
+  public void testBasicEncodeDecodeRoundTrip() {
+    String testString = 
"/hbase/data/default/test-table/region1/cf1/file1.hfile";
+    int id = pool.encode(testString);
+    String decoded = pool.decode(id);
+    assertEquals(testString, decoded);
+  }
+
+  @Test
+  public void testEncodeReturnsSameIdForSameString() {
+    String testString = "/hbase/data/file.hfile";
+    int id1 = pool.encode(testString);
+    int id2 = pool.encode(testString);
+    assertEquals(id1, id2);
+    assertEquals(1, pool.size());
+  }
+
+  @Test
+  public void testEncodeDifferentStringsGetDifferentIds() {
+    String string1 = "/path/to/file1.hfile";
+    String string2 = "/path/to/file2.hfile";
+    int id1 = pool.encode(string1);
+    int id2 = pool.encode(string2);
+    assertNotEquals(id1, id2);
+    assertEquals(2, pool.size());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEncodeNullStringThrowsException() {
+    pool.encode(null);
+  }
+
+  @Test
+  public void testDecodeNonExistentIdReturnsNull() {
+    String decoded = pool.decode(999999);
+    assertNull(decoded);
+  }
+
+  @Test
+  public void testContainsWithId() {
+    String testString = "/hbase/file.hfile";
+    int id = pool.encode(testString);
+    assertTrue(pool.contains(id));
+    assertFalse(pool.contains(id + 1));
+  }
+
+  @Test
+  public void testContainsWithString() {
+    String testString = "/hbase/file.hfile";
+    pool.encode(testString);
+    assertTrue(pool.contains(testString));
+    assertFalse(pool.contains("/hbase/other-file.hfile"));
+  }
+
+  @Test
+  public void testRemoveExistingString() {
+    String testString = "/hbase/file.hfile";
+    int id = pool.encode(testString);
+    assertEquals(1, pool.size());
+    assertTrue(pool.contains(testString));
+    boolean removed = pool.remove(testString);
+    assertTrue(removed);
+    assertEquals(0, pool.size());
+    assertFalse(pool.contains(testString));
+    assertFalse(pool.contains(id));
+    assertNull(pool.decode(id));
+  }
+
+  @Test
+  public void testRemoveNonExistentStringReturnsFalse() {
+    boolean removed = pool.remove("/non/existent/file.hfile");
+    assertFalse(removed);
+  }
+
+  @Test
+  public void testRemoveNullStringReturnsFalse() {
+    boolean removed = pool.remove(null);
+    assertFalse(removed);
+  }
+
+  @Test
+  public void testClear() {
+    pool.encode("/file1.hfile");
+    pool.encode("/file2.hfile");
+    pool.encode("/file3.hfile");
+    assertEquals(3, pool.size());
+    pool.clear();
+    assertEquals(0, pool.size());
+  }
+
+  @Test
+  public void testSizeTracking() {
+    assertEquals(0, pool.size());
+    pool.encode("/file1.hfile");
+    assertEquals(1, pool.size());
+    pool.encode("/file2.hfile");
+    assertEquals(2, pool.size());
+    // Encoding same string should not increase size
+    pool.encode("/file1.hfile");
+    assertEquals(2, pool.size());
+    pool.remove("/file1.hfile");
+    assertEquals(1, pool.size());
+    pool.clear();
+    assertEquals(0, pool.size());
+  }
+
+  @Test
+  public void testGetPoolStats() {
+    String stats = pool.getPoolStats();
+    assertEquals("No strings encoded", stats);
+    pool.encode("/hbase/data/table1/file1.hfile");
+    pool.encode("/hbase/data/table2/file2.hfile");
+    stats = pool.getPoolStats();
+    assertNotNull(stats);
+    assertTrue(stats.contains("2 unique strings"));
+    assertTrue(stats.contains("avg length:"));
+  }
+
+  @Test
+  public void testConcurrentEncoding() throws InterruptedException {
+    int numThreads = 10;
+    int stringsPerThread = 100;
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    CountDownLatch doneLatch = new CountDownLatch(numThreads);
+    ConcurrentHashMap<String, Integer> results = new ConcurrentHashMap<>();
+    AtomicInteger errorCount = new AtomicInteger(0);
+
+    for (int t = 0; t < numThreads; t++) {
+      final int threadId = t;
+      executor.submit(() -> {
+        try {
+          for (int i = 0; i < stringsPerThread; i++) {
+            String string = "/thread" + threadId + "/file" + i + ".hfile";
+            int id = pool.encode(string);
+            results.put(string, id);
+          }
+        } catch (Exception e) {
+          errorCount.incrementAndGet();
+        } finally {
+          doneLatch.countDown();
+        }
+      });
+    }
+
+    assertTrue(doneLatch.await(30, TimeUnit.SECONDS));
+    executor.shutdown();
+
+    assertEquals(0, errorCount.get());
+    assertEquals(numThreads * stringsPerThread, pool.size());
+    assertEquals(numThreads * stringsPerThread, results.size());
+
+    // Verify all strings can be decoded correctly
+    for (Map.Entry<String, Integer> entry : results.entrySet()) {
+      String decoded = pool.decode(entry.getValue());
+      assertEquals(entry.getKey(), decoded);
+    }
+  }
+
+  @Test
+  public void testConcurrentEncodingSameStrings() throws InterruptedException {
+    int numThreads = 20;
+    String sharedString = "/shared/file.hfile";
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    CountDownLatch doneLatch = new CountDownLatch(numThreads);
+    Set<Integer> ids = ConcurrentHashMap.newKeySet();
+    AtomicInteger errorCount = new AtomicInteger(0);
+
+    for (int i = 0; i < numThreads; i++) {
+      executor.submit(() -> {
+        try {
+          int id = pool.encode(sharedString);
+          ids.add(id);
+        } catch (Exception e) {
+          errorCount.incrementAndGet();
+        } finally {
+          doneLatch.countDown();
+        }
+      });
+    }
+
+    doneLatch.await(10, TimeUnit.SECONDS);
+    executor.shutdown();
+
+    assertEquals(0, errorCount.get());
+    // All threads should get the same ID
+    assertEquals(1, ids.size());
+    assertEquals(1, pool.size());
+  }
+
+  @Test
+  public void testConcurrentRemoval() throws InterruptedException {
+    // Pre-populate with strings
+    List<String> strings = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      String string = "/file" + i + ".hfile";
+      strings.add(string);
+      pool.encode(string);
+    }
+    assertEquals(100, pool.size());
+
+    int numThreads = 10;
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    CountDownLatch doneLatch = new CountDownLatch(numThreads);
+    AtomicInteger successfulRemovals = new AtomicInteger(0);
+
+    for (int t = 0; t < numThreads; t++) {
+      final int threadId = t;
+      executor.submit(() -> {
+        try {
+          for (int i = threadId * 10; i < (threadId + 1) * 10; i++) {
+            if (pool.remove(strings.get(i))) {
+              successfulRemovals.incrementAndGet();
+            }
+          }
+        } catch (Exception e) {
+          // Ignore
+        } finally {
+          doneLatch.countDown();
+        }
+      });
+    }
+
+    doneLatch.await(10, TimeUnit.SECONDS);
+    executor.shutdown();
+
+    assertEquals(100, successfulRemovals.get());
+    assertEquals(0, pool.size());
+  }
+
+  @Test
+  public void testBidirectionalMappingConsistency() {
+    // Verify that both mappings stay consistent
+    List<String> strings = new ArrayList<>();
+    List<Integer> ids = new ArrayList<>();
+
+    for (int i = 0; i < 50; i++) {
+      String string = "/region" + (i % 5) + "/file" + i + ".hfile";
+      strings.add(string);
+      ids.add(pool.encode(string));
+    }
+
+    // Verify forward mapping (string -> id)
+    for (int i = 0; i < strings.size(); i++) {
+      int expectedId = ids.get(i);
+      int actualId = pool.encode(strings.get(i));
+      assertEquals(expectedId, actualId);
+    }
+
+    // Verify reverse mapping (id -> string)
+    for (int i = 0; i < ids.size(); i++) {
+      String expectedString = strings.get(i);
+      String actualString = pool.decode(ids.get(i));
+      assertEquals(expectedString, actualString);
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
index b71660b88d8..5ae3343d21e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
@@ -70,8 +70,10 @@ public class TestRecoveryPersistentBucketCache {
       bucketCache.isCacheInitialized("testBucketCacheRecovery") && 
bucketCache.isCacheEnabled());
 
     CacheTestUtils.HFileBlockPair[] blocks = 
CacheTestUtils.generateHFileBlocks(8192, 4);
+    String[] names = CacheTestUtils.getHFileNames(blocks);
 
     CacheTestUtils.HFileBlockPair[] smallerBlocks = 
CacheTestUtils.generateHFileBlocks(4096, 1);
+    String[] smallerNames = CacheTestUtils.getHFileNames(smallerBlocks);
     // Add four blocks
     cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), 
blocks[0].getBlock());
     cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), 
blocks[1].getBlock());
@@ -104,16 +106,18 @@ public class TestRecoveryPersistentBucketCache {
       8192, bucketSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence",
       DEFAULT_ERROR_TOLERATION_DURATION, conf);
     assertTrue(newBucketCache.waitForCacheInitialization(1000));
-
-    assertEquals(3, newBucketCache.backingMap.size());
-    assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, 
false));
-    assertNull(newBucketCache.getBlock(smallerBlocks[0].getBlockName(), false, 
false, false));
-    assertEquals(blocks[0].getBlock(),
-      newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false));
-    assertEquals(blocks[1].getBlock(),
-      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
-    assertEquals(blocks[2].getBlock(),
-      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
+    BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
+    BlockCacheKey[] newKeysSmaller = 
CacheTestUtils.regenerateKeys(smallerBlocks, smallerNames);
+    // The new bucket cache would have only the first three blocks. Although 
we have persisted the
+    // the cache state when it had the first four blocks, the 4th block was 
evicted and then we
+    // added a 5th block, which overrides part of the 4th block in the cache. 
This would cause a
+    // checksum failure for this block offset, when we try to read from the 
cache, and we would
+    // consider that block as invalid and its offset available in the cache.
+    assertNull(newBucketCache.getBlock(newKeys[3], false, false, false));
+    assertNull(newBucketCache.getBlock(newKeysSmaller[0], false, false, 
false));
+    assertEquals(blocks[0].getBlock(), newBucketCache.getBlock(newKeys[0], 
false, false, false));
+    assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], 
false, false, false));
+    assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], 
false, false, false));
     TEST_UTIL.cleanupTestDir();
   }
 
@@ -138,6 +142,9 @@ public class TestRecoveryPersistentBucketCache {
     cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), 
blocks[1].getBlock());
     cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), 
blocks[2].getBlock());
     cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), 
blocks[3].getBlock());
+
+    String firstFileName = blocks[0].getBlockName().getHfileName();
+
     // saves the current state of the cache
     bucketCache.persistToFile();
 
@@ -146,7 +153,8 @@ public class TestRecoveryPersistentBucketCache {
       DEFAULT_ERROR_TOLERATION_DURATION, conf);
     assertTrue(newBucketCache.waitForCacheInitialization(10000));
     assertEquals(4, newBucketCache.backingMap.size());
-    
newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName());
+
+    newBucketCache.evictBlocksByHfileName(firstFileName);
     assertEquals(3, newBucketCache.backingMap.size());
     TEST_UTIL.cleanupTestDir();
   }
@@ -222,6 +230,7 @@ public class TestRecoveryPersistentBucketCache {
       bucketCache.isCacheInitialized("testBucketCacheRecovery") && 
bucketCache.isCacheEnabled());
 
     CacheTestUtils.HFileBlockPair[] blocks = 
CacheTestUtils.generateHFileBlocks(8192, 5);
+    String[] names = CacheTestUtils.getHFileNames(blocks);
 
     // Add four blocks
     cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), 
blocks[0].getBlock());
@@ -249,16 +258,15 @@ public class TestRecoveryPersistentBucketCache {
       Thread.sleep(10);
     }
 
-    assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, 
false));
+    BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
+
+    assertNull(newBucketCache.getBlock(newKeys[4], false, false, false));
     // The backing map entry with key blocks[0].getBlockName() for the may 
point to a valid entry
     // or null based on different ordering of the keys in the backing map.
     // Hence, skipping the check for that key.
-    assertEquals(blocks[1].getBlock(),
-      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
-    assertEquals(blocks[2].getBlock(),
-      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
-    assertEquals(blocks[3].getBlock(),
-      newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
+    assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], 
false, false, false));
+    assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], 
false, false, false));
+    assertEquals(blocks[3].getBlock(), newBucketCache.getBlock(newKeys[3], 
false, false, false));
     assertEquals(4, newBucketCache.backingMap.size());
     TEST_UTIL.cleanupTestDir();
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
index e35db698cca..9cf677b77f1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -48,8 +48,10 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -72,6 +74,9 @@ public class TestVerifyBucketCacheFile {
           128 * 1024 + 1024 } } });
   }
 
+  @Rule
+  public TestName name = new TestName();
+
   @Parameterized.Parameter(0)
   public int constructedBlockSize;
 
@@ -101,18 +106,19 @@ public class TestVerifyBucketCacheFile {
     Configuration conf = HBaseConfiguration.create();
     // Disables the persister thread by setting its interval to MAX_VALUE
     conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
-
     BucketCache bucketCache = null;
     BucketCache recoveredBucketCache = null;
     try {
-      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
-        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      bucketCache =
+        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+          constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence"
+          + name.getMethodName());
       assertTrue(bucketCache.waitForCacheInitialization(10000));
       long usedSize = bucketCache.getAllocator().getUsedSize();
       assertEquals(0, usedSize);
       CacheTestUtils.HFileBlockPair[] blocks =
         CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+      String[] names = CacheTestUtils.getHFileNames(blocks);
       // Add blocks
       for (CacheTestUtils.HFileBlockPair block : blocks) {
         cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
@@ -122,11 +128,11 @@ public class TestVerifyBucketCacheFile {
       // 1.persist cache to file
       bucketCache.shutdown();
       // restore cache from file
-      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
-        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      bucketCache =
+        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+          constructedBlockSizes, writeThreads, writerQLen, testDir
+          + "/bucket.persistence" + name.getMethodName());
       assertTrue(bucketCache.waitForCacheInitialization(10000));
-      waitPersistentCacheValidation(conf, bucketCache);
       assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
       // persist cache to file
       bucketCache.shutdown();
@@ -136,17 +142,17 @@ public class TestVerifyBucketCacheFile {
         FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
       assertTrue(Files.deleteIfExists(cacheFile));
       // can't restore cache from file
-      recoveredBucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
-        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+      recoveredBucketCache =
+        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+          constructedBlockSizes, writeThreads, writerQLen, testDir
+          + "/bucket.persistence" + name.getMethodName());
       assertTrue(recoveredBucketCache.waitForCacheInitialization(10000));
-      waitPersistentCacheValidation(conf, recoveredBucketCache);
       assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
       assertEquals(0, recoveredBucketCache.backingMap.size());
+      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
       // Add blocks
-      for (CacheTestUtils.HFileBlockPair block : blocks) {
-        cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, 
block.getBlockName(),
-          block.getBlock());
+      for (int i = 0; i < blocks.length; i++) {
+        cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, newKeys[i], 
blocks[i].getBlock());
       }
       usedSize = recoveredBucketCache.getAllocator().getUsedSize();
       assertNotEquals(0, usedSize);
@@ -155,12 +161,13 @@ public class TestVerifyBucketCacheFile {
 
       // 3.delete backingMap persistence file
       final java.nio.file.Path mapFile =
-        FileSystems.getDefault().getPath(testDir.toString(), 
"bucket.persistence");
+        FileSystems.getDefault().getPath(testDir.toString(), 
"bucket.persistence" + name.getMethodName());
       assertTrue(Files.deleteIfExists(mapFile));
       // can't restore cache from file
       bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
         constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+        testDir + "/bucket.persistence" + name.getMethodName(),
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
       assertTrue(bucketCache.waitForCacheInitialization(10000));
       waitPersistentCacheValidation(conf, bucketCache);
       assertEquals(0, bucketCache.getAllocator().getUsedSize());
@@ -178,16 +185,16 @@ public class TestVerifyBucketCacheFile {
 
   @Test
   public void testRetrieveFromFileAfterDelete() throws Exception {
-
     HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
     Path testDir = TEST_UTIL.getDataTestDir();
     TEST_UTIL.getTestFileSystem().mkdirs(testDir);
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
-    String mapFileName = testDir + "/bucket.persistence" + 
EnvironmentEdgeManager.currentTime();
+    String mapFileName = testDir + "/bucket.persistence"
+      + name.getMethodName() + EnvironmentEdgeManager.currentTime();
     BucketCache bucketCache = null;
     try {
-      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache" , 
capacitySize,
         constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
         DEFAULT_ERROR_TOLERATION_DURATION, conf);
       assertTrue(bucketCache.waitForCacheInitialization(10000));
@@ -242,7 +249,8 @@ public class TestVerifyBucketCacheFile {
     try {
       bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
         constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+        testDir + "/bucket.persistence" + name.getMethodName(),
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
       assertTrue(bucketCache.waitForCacheInitialization(10000));
       long usedSize = bucketCache.getAllocator().getUsedSize();
       assertEquals(0, usedSize);
@@ -267,7 +275,8 @@ public class TestVerifyBucketCacheFile {
       // can't restore cache from file
       bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
         constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+        testDir + "/bucket.persistence" + name.getMethodName(),
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
       assertTrue(bucketCache.waitForCacheInitialization(10000));
       waitPersistentCacheValidation(conf, bucketCache);
       assertEquals(0, bucketCache.getAllocator().getUsedSize());
@@ -306,7 +315,8 @@ public class TestVerifyBucketCacheFile {
     try {
       bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
         constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+        testDir + "/bucket.persistence" + name.getMethodName(),
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
       assertTrue(bucketCache.waitForCacheInitialization(10000));
       long usedSize = bucketCache.getAllocator().getUsedSize();
       assertEquals(0, usedSize);
@@ -333,7 +343,8 @@ public class TestVerifyBucketCacheFile {
       // can't restore cache from file
       bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
         constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
-        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
+        testDir + "/bucket.persistence" + name.getMethodName(),
+        DEFAULT_ERROR_TOLERATION_DURATION, conf);
       assertTrue(bucketCache.waitForCacheInitialization(10000));
       waitPersistentCacheValidation(conf, bucketCache);
       assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
@@ -362,7 +373,8 @@ public class TestVerifyBucketCacheFile {
     Configuration conf = HBaseConfiguration.create();
     // Disables the persister thread by setting its interval to MAX_VALUE
     conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
-    String mapFileName = testDir + "/bucket.persistence" + 
EnvironmentEdgeManager.currentTime();
+    String mapFileName = testDir + "/bucket.persistence"
+      + EnvironmentEdgeManager.currentTime() + name.getMethodName();
     BucketCache bucketCache = null;
     BucketCache newBucketCache = null;
     try {
@@ -373,6 +385,7 @@ public class TestVerifyBucketCacheFile {
 
       CacheTestUtils.HFileBlockPair[] blocks =
         CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
+      String[] names = CacheTestUtils.getHFileNames(blocks);
       // Add three blocks
       cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), 
blocks[0].getBlock());
       cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), 
blocks[1].getBlock());
@@ -395,23 +408,21 @@ public class TestVerifyBucketCacheFile {
         constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
         DEFAULT_ERROR_TOLERATION_DURATION, conf);
       assertTrue(newBucketCache.waitForCacheInitialization(10000));
-      waitPersistentCacheValidation(conf, newBucketCache);
-      assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, 
false, false));
-      assertEquals(blocks[1].getBlock(),
-        newBucketCache.getBlock(blocks[1].getBlockName(), false, false, 
false));
-      assertEquals(blocks[2].getBlock(),
-        newBucketCache.getBlock(blocks[2].getBlockName(), false, false, 
false));
-      assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, 
false, false));
+      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
+      assertNull(newBucketCache.getBlock(newKeys[0], false, false, false));
+      assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], 
false, false, false));
+      assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], 
false, false, false));
+      assertNull(newBucketCache.getBlock(newKeys[3], false, false, false));
       assertEquals(2, newBucketCache.backingMap.size());
     } finally {
-      if (bucketCache != null) {
+      if (newBucketCache == null && bucketCache != null) {
         bucketCache.shutdown();
       }
       if (newBucketCache != null) {
         newBucketCache.shutdown();
       }
+      TEST_UTIL.cleanupTestDir();
     }
-    TEST_UTIL.cleanupTestDir();
   }
 
   @Test
@@ -438,7 +449,8 @@ public class TestVerifyBucketCacheFile {
     Configuration conf = HBaseConfiguration.create();
     conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);
 
-    String mapFileName = testDir + "/bucket.persistence" + 
EnvironmentEdgeManager.currentTime();
+    String mapFileName = testDir + "/bucket.persistence"
+      + EnvironmentEdgeManager.currentTime() + name.getMethodName();
     BucketCache bucketCache = null;
     BucketCache newBucketCache = null;
     try {
@@ -449,35 +461,42 @@ public class TestVerifyBucketCacheFile {
 
       CacheTestUtils.HFileBlockPair[] blocks =
         CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
+      String[] names = CacheTestUtils.getHFileNames(blocks);
 
       for (int i = 0; i < numBlocks; i++) {
         cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(),
           blocks[i].getBlock());
       }
-
       // saves the current state
       bucketCache.persistToFile();
-
       // Create a new bucket which reads from persistence file.
       newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
         constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
         DEFAULT_ERROR_TOLERATION_DURATION, conf);
       assertTrue(newBucketCache.waitForCacheInitialization(10000));
-      waitPersistentCacheValidation(conf, newBucketCache);
       assertEquals(numBlocks, newBucketCache.backingMap.size());
+      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
       for (int i = 0; i < numBlocks; i++) {
         assertEquals(blocks[i].getBlock(),
-          newBucketCache.getBlock(blocks[i].getBlockName(), false, false, 
false));
+          newBucketCache.getBlock(newKeys[i], false, false, false));
       }
     } finally {
       if (bucketCache != null) {
         bucketCache.shutdown();
       }
       if (newBucketCache != null) {
-        newBucketCache.shutdown();
+        try {
+          newBucketCache.shutdown();
+        } catch (NullPointerException e) {
+          // We need to enforce these two shutdown to make sure we don't leave 
"orphan" persister
+          // threads running while the unit test JVM instance is up.
+          // This would lead to a NPE because of the StringPoolCleanup in 
bucketCache.shutdown
+          //  but that's fine because we don't have more than one bucket cache 
instance in real life
+          //  and here we passed the point where we stop background threads 
inside shutdown.
+        }
       }
+      TEST_UTIL.cleanupTestDir();
     }
-    TEST_UTIL.cleanupTestDir();
   }
 
   private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey 
cacheKey)
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java
index a6caff22759..febf88d6f6f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java
@@ -163,10 +163,6 @@ public class TestCustomCellDataTieringManager {
     key = new BlockCacheKey(hStoreFiles.get(1).getPath(), 0, true, 
BlockType.DATA);
     testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, false);
 
-    // Test with valid key with no HFile Path
-    key = new BlockCacheKey(hStoreFiles.get(0).getPath().getName(), 0);
-    testDataTieringMethodWithKeyExpectingException(methodCallerWithKey, key,
-      new DataTieringException("BlockCacheKey Doesn't Contain HFile Path"));
   }
 
   @Test
@@ -191,13 +187,14 @@ public class TestCustomCellDataTieringManager {
     Path basePath = 
hStoreFiles.get(0).getPath().getParent().getParent().getParent();
     hFilePath = new Path(basePath, "incorrectRegion/cf1/filename");
     testDataTieringMethodWithPathExpectingException(methodCallerWithPath, 
hFilePath,
-      new DataTieringException("HRegion corresponding to " + hFilePath + " 
doesn't exist"));
+      new DataTieringException("HRegion corresponding to incorrectRegion 
doesn't exist"));
 
     // Test with a non-existing HStore path
     basePath = hStoreFiles.get(0).getPath().getParent().getParent();
     hFilePath = new Path(basePath, "incorrectCf/filename");
     testDataTieringMethodWithPathExpectingException(methodCallerWithPath, 
hFilePath,
-      new DataTieringException("HStore corresponding to " + hFilePath + " 
doesn't exist"));
+      new DataTieringException(
+        "HStore corresponding to " + basePath.getName() + "/incorrectCf 
doesn't exist"));
   }
 
   @Test
@@ -214,25 +211,6 @@ public class TestCustomCellDataTieringManager {
     testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, false);
   }
 
-  @Test
-  public void testHotDataWithPath() throws IOException {
-    initializeTestEnvironment();
-    DataTieringMethodCallerWithPath methodCallerWithPath = 
DataTieringManager::isHotData;
-
-    // Test with valid path
-    Path hFilePath = hStoreFiles.get(2).getPath();
-    testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, 
true);
-
-    // Test with another valid path
-    hFilePath = hStoreFiles.get(3).getPath();
-    testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, 
false);
-
-    // Test with a filename where corresponding HStoreFile in not present
-    hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(), 
"incorrectFileName");
-    testDataTieringMethodWithPathExpectingException(methodCallerWithPath, 
hFilePath,
-      new DataTieringException("Store file corresponding to " + hFilePath + " 
doesn't exist"));
-  }
-
   @Test
   public void testPrefetchWhenDataTieringEnabled() throws IOException {
     setPrefetchBlocksOnOpen();
@@ -265,14 +243,16 @@ public class TestCustomCellDataTieringManager {
     }
 
     // Verify hStoreFile3 is identified as cold data
-    DataTieringMethodCallerWithPath methodCallerWithPath = 
DataTieringManager::isHotData;
+    DataTieringMethodCallerWithKey methodCallerWithKey = 
DataTieringManager::isHotData;
     Path hFilePath = hStoreFiles.get(3).getPath();
-    testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, 
false);
+    testDataTieringMethodWithKeyNoException(methodCallerWithKey,
+      new BlockCacheKey(hFilePath, 0, true, BlockType.DATA), false);
 
     // Verify all the other files in hStoreFiles are hot data
     for (int i = 0; i < hStoreFiles.size() - 1; i++) {
       hFilePath = hStoreFiles.get(i).getPath();
-      testDataTieringMethodWithPathNoException(methodCallerWithPath, 
hFilePath, true);
+      testDataTieringMethodWithKeyNoException(methodCallerWithKey,
+        new BlockCacheKey(hFilePath, 0, true, BlockType.DATA), true);
     }
 
     try {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
index 21e2315ae88..d3df0ba3c15 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
@@ -138,11 +138,6 @@ public class TestDataTieringManager {
     defaultConf.setLong(BUCKET_CACHE_SIZE_KEY, 32);
   }
 
-  @FunctionalInterface
-  interface DataTieringMethodCallerWithPath {
-    boolean call(DataTieringManager manager, Path path) throws 
DataTieringException;
-  }
-
   @FunctionalInterface
   interface DataTieringMethodCallerWithKey {
     boolean call(DataTieringManager manager, BlockCacheKey key) throws 
DataTieringException;
@@ -160,49 +155,12 @@ public class TestDataTieringManager {
     // Test with another valid key
     key = new BlockCacheKey(hStoreFiles.get(1).getPath(), 0, true, 
BlockType.DATA);
     testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, false);
-
-    // Test with valid key with no HFile Path
-    key = new BlockCacheKey(hStoreFiles.get(0).getPath().getName(), 0);
-    testDataTieringMethodWithKeyExpectingException(methodCallerWithKey, key,
-      new DataTieringException("BlockCacheKey Doesn't Contain HFile Path"));
-  }
-
-  @Test
-  public void testDataTieringEnabledWithPath() throws IOException {
-    initializeTestEnvironment();
-    DataTieringMethodCallerWithPath methodCallerWithPath = 
DataTieringManager::isDataTieringEnabled;
-
-    // Test with valid path
-    Path hFilePath = hStoreFiles.get(1).getPath();
-    testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, 
false);
-
-    // Test with another valid path
-    hFilePath = hStoreFiles.get(3).getPath();
-    testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, 
true);
-
-    // Test with an incorrect path
-    hFilePath = new Path("incorrectPath");
-    testDataTieringMethodWithPathExpectingException(methodCallerWithPath, 
hFilePath,
-      new DataTieringException("Incorrect HFile Path: " + hFilePath));
-
-    // Test with a non-existing HRegion path
-    Path basePath = 
hStoreFiles.get(0).getPath().getParent().getParent().getParent();
-    hFilePath = new Path(basePath, "incorrectRegion/cf1/filename");
-    testDataTieringMethodWithPathExpectingException(methodCallerWithPath, 
hFilePath,
-      new DataTieringException("HRegion corresponding to " + hFilePath + " 
doesn't exist"));
-
-    // Test with a non-existing HStore path
-    basePath = hStoreFiles.get(0).getPath().getParent().getParent();
-    hFilePath = new Path(basePath, "incorrectCf/filename");
-    testDataTieringMethodWithPathExpectingException(methodCallerWithPath, 
hFilePath,
-      new DataTieringException("HStore corresponding to " + hFilePath + " 
doesn't exist"));
   }
 
   @Test
   public void testHotDataWithKey() throws IOException {
     initializeTestEnvironment();
     DataTieringMethodCallerWithKey methodCallerWithKey = 
DataTieringManager::isHotData;
-
     // Test with valid key
     BlockCacheKey key = new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, 
true, BlockType.DATA);
     testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, true);
@@ -212,25 +170,6 @@ public class TestDataTieringManager {
     testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, false);
   }
 
-  @Test
-  public void testHotDataWithPath() throws IOException {
-    initializeTestEnvironment();
-    DataTieringMethodCallerWithPath methodCallerWithPath = 
DataTieringManager::isHotData;
-
-    // Test with valid path
-    Path hFilePath = hStoreFiles.get(2).getPath();
-    testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, 
true);
-
-    // Test with another valid path
-    hFilePath = hStoreFiles.get(3).getPath();
-    testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, 
false);
-
-    // Test with a filename where corresponding HStoreFile in not present
-    hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(), 
"incorrectFileName");
-    testDataTieringMethodWithPathExpectingException(methodCallerWithPath, 
hFilePath,
-      new DataTieringException("Store file corresponding to " + hFilePath + " 
doesn't exist"));
-  }
-
   @Test
   public void testGracePeriodMakesColdFileHot() throws IOException, 
DataTieringException {
     initializeTestEnvironment();
@@ -253,7 +192,8 @@ public class TestDataTieringManager {
     region.stores.put(Bytes.toBytes("cf1"), hStore);
     testOnlineRegions.put(region.getRegionInfo().getEncodedName(), region);
     Path hFilePath = file.getPath();
-    assertTrue("File should be hot due to grace period", 
dataTieringManager.isHotData(hFilePath));
+    BlockCacheKey key = new BlockCacheKey(hFilePath, 0, true, BlockType.DATA);
+    assertTrue("File should be hot due to grace period", 
dataTieringManager.isHotData(key));
   }
 
   @Test
@@ -278,8 +218,8 @@ public class TestDataTieringManager {
     testOnlineRegions.put(region.getRegionInfo().getEncodedName(), region);
 
     Path hFilePath = file.getPath();
-    assertFalse("File should be cold without grace period",
-      dataTieringManager.isHotData(hFilePath));
+    BlockCacheKey key = new BlockCacheKey(hFilePath, 0, true, BlockType.DATA);
+    assertFalse("File should be cold without grace period", 
dataTieringManager.isHotData(key));
   }
 
   @Test
@@ -314,14 +254,16 @@ public class TestDataTieringManager {
     }
 
     // Verify hStoreFile3 is identified as cold data
-    DataTieringMethodCallerWithPath methodCallerWithPath = 
DataTieringManager::isHotData;
+    DataTieringMethodCallerWithKey methodCallerWithPath = 
DataTieringManager::isHotData;
     Path hFilePath = hStoreFiles.get(3).getPath();
-    testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, 
false);
+    testDataTieringMethodWithKeyNoException(methodCallerWithPath,
+      new BlockCacheKey(hFilePath, 0, true, BlockType.DATA), false);
 
     // Verify all the other files in hStoreFiles are hot data
     for (int i = 0; i < hStoreFiles.size() - 1; i++) {
       hFilePath = hStoreFiles.get(i).getPath();
-      testDataTieringMethodWithPathNoException(methodCallerWithPath, 
hFilePath, true);
+      testDataTieringMethodWithKeyNoException(methodCallerWithPath,
+        new BlockCacheKey(hFilePath, 0, true, BlockType.DATA), true);
     }
 
     try {
@@ -704,22 +646,6 @@ public class TestDataTieringManager {
     assertEquals(expectedColdBlocks, numColdBlocks);
   }
 
-  private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath 
caller, Path path,
-    boolean expectedResult, DataTieringException exception) {
-    try {
-      boolean value = caller.call(dataTieringManager, path);
-      if (exception != null) {
-        fail("Expected DataTieringException to be thrown");
-      }
-      assertEquals(expectedResult, value);
-    } catch (DataTieringException e) {
-      if (exception == null) {
-        fail("Unexpected DataTieringException: " + e.getMessage());
-      }
-      assertEquals(exception.getMessage(), e.getMessage());
-    }
-  }
-
   private void testDataTieringMethodWithKey(DataTieringMethodCallerWithKey 
caller,
     BlockCacheKey key, boolean expectedResult, DataTieringException exception) 
{
     try {
@@ -736,16 +662,6 @@ public class TestDataTieringManager {
     }
   }
 
-  private void testDataTieringMethodWithPathExpectingException(
-    DataTieringMethodCallerWithPath caller, Path path, DataTieringException 
exception) {
-    testDataTieringMethodWithPath(caller, path, false, exception);
-  }
-
-  private void 
testDataTieringMethodWithPathNoException(DataTieringMethodCallerWithPath caller,
-    Path path, boolean expectedResult) {
-    testDataTieringMethodWithPath(caller, path, expectedResult, null);
-  }
-
   private void 
testDataTieringMethodWithKeyExpectingException(DataTieringMethodCallerWithKey 
caller,
     BlockCacheKey key, DataTieringException exception) {
     testDataTieringMethodWithKey(caller, key, false, exception);


Reply via email to