This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch HBASE-29585
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-29585 by this push:
     new 27cdb52aa33 HBASE-29669 Implement basic row cache (#7901)
27cdb52aa33 is described below

commit 27cdb52aa337413397239872584054a238a26327
Author: EungsopYoo <[email protected]>
AuthorDate: Mon Jun 8 18:53:49 2026 +0900

    HBASE-29669 Implement basic row cache (#7901)
    
    Signed-off-by: Wellington Chevreuil <[email protected]>
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |  11 +
 .../io/encoding/BufferedDataBlockEncoder.java      |   8 +-
 .../regionserver/MetricsRegionServerSource.java    |   7 +
 .../MetricsRegionServerSourceImpl.java             |   6 +
 .../regionserver/MetricsRegionServerWrapper.java   |  10 +
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  77 ++-
 .../MetricsRegionServerWrapperImpl.java            |  32 ++
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  22 +-
 .../apache/hadoop/hbase/regionserver/RowCache.java | 250 +++++++--
 .../hadoop/hbase/regionserver/RowCacheKey.java     |   8 +-
 .../hbase/regionserver/RowCacheStrategy.java       |   4 +-
 .../apache/hadoop/hbase/regionserver/RowCells.java |  37 +-
 .../regionserver/TinyLfuRowCacheStrategy.java      | 119 +++++
 .../MetricsRegionServerWrapperStub.java            |  25 +
 .../regionserver/TestMetricsRegionServer.java      |   5 +
 .../hadoop/hbase/regionserver/TestRowCache.java    | 563 +++++++++++++++++++++
 .../regionserver/TestRowCacheCanCacheRow.java      | 266 ++++++++++
 .../regionserver/TestRowCacheConfiguration.java    |  81 +++
 .../regionserver/TestRowCacheEvictOnClose.java     | 129 +++++
 .../hbase/regionserver/TestRowCacheHRegion.java    |  97 ++++
 ...owCacheWithBucketCacheAndDataBlockEncoding.java | 154 ++++++
 .../hbase/regionserver/TestRowCacheWithMock.java   | 398 +++++++++++++++
 .../hbase/tool/TestRowCacheBulkLoadHFiles.java     | 199 ++++++++
 23 files changed, 2450 insertions(+), 58 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 6a51172e9a7..329b8b4908b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1029,6 +1029,17 @@ public final class HConstants {
   public static final String ROW_CACHE_ENABLED_KEY = "row.cache.enabled";
   public static final boolean ROW_CACHE_ENABLED_DEFAULT = false;
 
+  /**
+   * Configuration key for the evict the row cache on close
+   */
+  public static final String ROW_CACHE_EVICT_ON_CLOSE_KEY = 
"row.cache.evictOnClose";
+  public static final boolean ROW_CACHE_EVICT_ON_CLOSE_DEFAULT = false;
+
+  /**
+   * Configuration key for the row cache strategy class
+   */
+  public static final String ROW_CACHE_STRATEGY_CLASS_KEY = 
"row.cache.strategy.class";
+
   /**
    * Configuration key for the memory size of the block cache
    */
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 5ec39fa5803..54505dfce95 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -547,8 +547,8 @@ abstract class BufferedDataBlockEncoder extends 
AbstractDataBlockEncoder {
 
     @Override
     public ExtendedCell deepClone() {
-      // This is not used in actual flow. Throwing 
UnsupportedOperationException
-      throw new UnsupportedOperationException();
+      // To garbage collect the objects referenced by this cell, we need to 
deep clone it
+      return ExtendedCell.super.deepClone();
     }
   }
 
@@ -796,8 +796,8 @@ abstract class BufferedDataBlockEncoder extends 
AbstractDataBlockEncoder {
 
     @Override
     public ExtendedCell deepClone() {
-      // This is not used in actual flow. Throwing 
UnsupportedOperationException
-      throw new UnsupportedOperationException();
+      // To cache row, we need to deep clone it
+      return super.deepClone();
     }
   }
 
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index c88a77b5140..166484fe899 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -430,6 +430,13 @@ public interface MetricsRegionServerSource extends 
BaseSource, JvmPauseMonitorSo
   String L2_CACHE_HIT_RATIO_DESC = "L2 cache hit ratio.";
   String L2_CACHE_MISS_RATIO = "l2CacheMissRatio";
   String L2_CACHE_MISS_RATIO_DESC = "L2 cache miss ratio.";
+
+  String ROW_CACHE_HIT_COUNT = "rowCacheHitCount";
+  String ROW_CACHE_MISS_COUNT = "rowCacheMissCount";
+  String ROW_CACHE_EVICTED_ROW_COUNT = "rowCacheEvictedRowCount";
+  String ROW_CACHE_SIZE = "rowCacheSize";
+  String ROW_CACHE_COUNT = "rowCacheCount";
+
   String RS_START_TIME_NAME = "regionServerStartTime";
   String ZOOKEEPER_QUORUM_NAME = "zookeeperQuorum";
   String SERVER_NAME_NAME = "serverName";
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index b214c8f8f4e..90ea2a1165c 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -452,6 +452,12 @@ public class MetricsRegionServerSourceImpl extends 
BaseSourceImpl
         .addCounter(Interns.info(BLOCK_CACHE_DELETE_FAMILY_BLOOM_HIT_COUNT, 
""),
           rsWrap.getDeleteFamilyBloomHitCount())
         .addCounter(Interns.info(BLOCK_CACHE_TRAILER_HIT_COUNT, ""), 
rsWrap.getTrailerHitCount())
+        .addCounter(Interns.info(ROW_CACHE_HIT_COUNT, ""), 
rsWrap.getRowCacheHitCount())
+        .addCounter(Interns.info(ROW_CACHE_MISS_COUNT, ""), 
rsWrap.getRowCacheMissCount())
+        .addCounter(Interns.info(ROW_CACHE_EVICTED_ROW_COUNT, ""),
+          rsWrap.getRowCacheEvictedRowCount())
+        .addGauge(Interns.info(ROW_CACHE_SIZE, ""), rsWrap.getRowCacheSize())
+        .addGauge(Interns.info(ROW_CACHE_COUNT, ""), rsWrap.getRowCacheCount())
         .addCounter(Interns.info(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC),
           rsWrap.getUpdatesBlockedTime())
         .addCounter(Interns.info(FLUSHED_CELLS, FLUSHED_CELLS_DESC), 
rsWrap.getFlushedCellsCount())
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
index 5b957d9bf08..68e43b276ee 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
@@ -635,6 +635,16 @@ public interface MetricsRegionServerWrapper {
 
   long getTrailerHitCount();
 
+  long getRowCacheHitCount();
+
+  long getRowCacheMissCount();
+
+  long getRowCacheSize();
+
+  long getRowCacheCount();
+
+  long getRowCacheEvictedRowCount();
+
   long getTotalRowActionRequestCount();
 
   long getByteBuffAllocatorHeapAllocationBytes();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 60bd4cee6b7..89ff3b88ec5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
+import static 
org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_DEFAULT;
+import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_KEY;
 import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
 import static 
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
 import static 
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ROW_LOCK_READ_LOCK_KEY;
@@ -145,6 +147,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcCallContext;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerCall;
 import org.apache.hadoop.hbase.mob.MobFileCache;
@@ -946,7 +949,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     this.isRowCacheEnabled = checkRowCacheConfig();
   }
 
-  private boolean checkRowCacheConfig() {
+  boolean checkRowCacheConfig() {
     Boolean fromDescriptor = htableDescriptor.getRowCacheEnabled();
     // The setting from TableDescriptor has higher priority than the global 
configuration
     return fromDescriptor != null
@@ -954,6 +957,12 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       : conf.getBoolean(HConstants.ROW_CACHE_ENABLED_KEY, 
HConstants.ROW_CACHE_ENABLED_DEFAULT);
   }
 
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  void setRowCache(RowCache rowCache) {
+    this.rowCache = rowCache;
+  }
+
   private void setHTableSpecificConf() {
     if (this.htableDescriptor == null) {
       return;
@@ -1963,6 +1972,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         }
       }
 
+      evictRowCache();
+
       status.setStatus("Writing region close event to WAL");
       // Always write close marker to wal even for read only table. This is 
not a big problem as we
       // do not write any data into the region; it is just a meta edit in the 
WAL file.
@@ -2003,6 +2014,22 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     }
   }
 
+  private void evictRowCache() {
+    boolean evictOnClose = 
getReadOnlyConfiguration().getBoolean(ROW_CACHE_EVICT_ON_CLOSE_KEY,
+      ROW_CACHE_EVICT_ON_CLOSE_DEFAULT);
+
+    if (!evictOnClose) {
+      return;
+    }
+
+    if (!(rsServices instanceof HRegionServer regionServer)) {
+      return;
+    }
+
+    RowCache rowCache = 
regionServer.getRSRpcServices().getServer().getRowCache();
+    rowCache.evictRowsByRegion(getRegionInfo().getEncodedName());
+  }
+
   /** Wait for all current flushes and compactions of the region to complete */
   // TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or 
give alternate way for
   // Phoenix needs.
@@ -3259,8 +3286,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     return getScanner(scan, null);
   }
 
-  RegionScannerImpl getScannerWithResults(Get get, Scan scan, List<Cell> 
results)
-    throws IOException {
+  RegionScannerImpl getScannerWithResults(Get get, Scan scan, List<Cell> 
results,
+    RpcCallContext context) throws IOException {
     if (!rowCache.canCacheRow(get, this)) {
       return getScannerWithResults(scan, results);
     }
@@ -3268,12 +3295,23 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     // Try get from row cache
     RowCacheKey key = new RowCacheKey(this, get.getRow());
     if (rowCache.tryGetFromCache(key, get, results)) {
+      addReadRequestsCount(1);
+      if (getMetrics() != null) {
+        getMetrics().updateReadRequestCount();
+      }
+
       // Cache is hit, and then no scanner is created
       return null;
     }
 
     RegionScannerImpl scanner = getScannerWithResults(scan, results);
-    rowCache.populateCache(results, key);
+
+    // When results came from memstore only, do not populate the row cache
+    boolean readFromMemStoreOnly = context.getBlockBytesScanned() < 1;
+    if (!readFromMemStoreOnly) {
+      rowCache.cache(results, key);
+    }
+
     return scanner;
   }
 
@@ -4811,7 +4849,12 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     // checkAndMutate.
     // * coprocessor calls (see ex. BulkDeleteEndpoint).
     // So nonces are not really ever used by HBase. They could be by coprocs, 
and checkAnd...
-    return batchMutate(new MutationBatchOperation(this, mutations, atomic, 
nonceGroup, nonce));
+    if (rowCache == null) {
+      return batchMutate(new MutationBatchOperation(this, mutations, atomic, 
nonceGroup, nonce));
+    }
+
+    return rowCache.mutateWithRowCacheBarrier(this, Arrays.asList(mutations),
+      () -> batchMutate(new MutationBatchOperation(this, mutations, atomic, 
nonceGroup, nonce)));
   }
 
   @Override
@@ -4823,10 +4866,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   }
 
   OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic) throws 
IOException {
-    OperationStatus[] operationStatuses =
-      rowCache.mutateWithRowCacheBarrier(this, Arrays.asList(mutations),
-        () -> this.batchMutate(mutations, atomic, HConstants.NO_NONCE, 
HConstants.NO_NONCE));
-    return TraceUtil.trace(() -> operationStatuses, () -> 
createRegionSpan("Region.batchMutate"));
+    return TraceUtil.trace(
+      () -> batchMutate(mutations, atomic, HConstants.NO_NONCE, 
HConstants.NO_NONCE),
+      () -> createRegionSpan("Region.batchMutate"));
   }
 
   /**
@@ -5111,8 +5153,17 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
   public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate, 
long nonceGroup,
     long nonce) throws IOException {
-    CheckAndMutateResult checkAndMutateResult = 
rowCache.mutateWithRowCacheBarrier(this,
-      checkAndMutate.getRow(), () -> this.checkAndMutate(checkAndMutate, 
nonceGroup, nonce));
+    CheckAndMutateResult checkAndMutateResult =
+      rowCache.mutateWithRowCacheBarrier(this, checkAndMutate.getRow(),
+        () -> this.checkAndMutateInternal(checkAndMutate, nonceGroup, nonce));
+    return TraceUtil.trace(() -> checkAndMutateResult,
+      () -> createRegionSpan("Region.checkAndMutate"));
+  }
+
+  public CheckAndMutateResult checkAndMutate(List<Mutation> mutations,
+    CheckAndMutate checkAndMutate, long nonceGroup, long nonce) throws 
IOException {
+    CheckAndMutateResult checkAndMutateResult = 
rowCache.mutateWithRowCacheBarrier(this, mutations,
+      () -> this.checkAndMutateInternal(checkAndMutate, nonceGroup, nonce));
     return TraceUtil.trace(() -> checkAndMutateResult,
       () -> createRegionSpan("Region.checkAndMutate"));
   }
@@ -5312,6 +5363,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
   private OperationStatus mutate(Mutation mutation, boolean atomic, long 
nonceGroup, long nonce)
     throws IOException {
+    if (rowCache == null) {
+      return this.mutateInternal(mutation, atomic, nonceGroup, nonce);
+    }
+
     return rowCache.mutateWithRowCacheBarrier(this, mutation.getRow(),
       () -> this.mutateInternal(mutation, atomic, nonceGroup, nonce));
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index c8f7f96a033..ef80e2ee580 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -68,6 +68,7 @@ class MetricsRegionServerWrapperImpl implements 
MetricsRegionServerWrapper {
   private BlockCache l1Cache = null;
   private BlockCache l2Cache = null;
   private MobFileCache mobFileCache;
+  private RowCache rowCache;
   private CacheStats cacheStats;
   private CacheStats l1Stats = null;
   private CacheStats l2Stats = null;
@@ -99,6 +100,7 @@ class MetricsRegionServerWrapperImpl implements 
MetricsRegionServerWrapper {
     this.regionServer = regionServer;
     initBlockCache();
     initMobFileCache();
+    initRowCache();
     this.excludeDatanodeManager = 
this.regionServer.getWalFactory().getExcludeDatanodeManager();
 
     this.period = 
regionServer.getConfiguration().getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
@@ -149,6 +151,11 @@ class MetricsRegionServerWrapperImpl implements 
MetricsRegionServerWrapper {
     this.mobFileCache = this.regionServer.getMobFileCache().orElse(null);
   }
 
+  private void initRowCache() {
+    RSRpcServices rsRpcServices = this.regionServer.getRSRpcServices();
+    this.rowCache = rsRpcServices == null ? null : 
rsRpcServices.getServer().getRowCache();
+  }
+
   @Override
   public String getClusterId() {
     return regionServer.getClusterId();
@@ -1194,6 +1201,31 @@ class MetricsRegionServerWrapperImpl implements 
MetricsRegionServerWrapper {
     return this.cacheStats != null ? this.cacheStats.getTrailerHitCount() : 0L;
   }
 
+  @Override
+  public long getRowCacheHitCount() {
+    return this.rowCache != null ? this.rowCache.getHitCount() : 0L;
+  }
+
+  @Override
+  public long getRowCacheMissCount() {
+    return this.rowCache != null ? this.rowCache.getMissCount() : 0L;
+  }
+
+  @Override
+  public long getRowCacheSize() {
+    return this.rowCache != null ? this.rowCache.getSize() : 0L;
+  }
+
+  @Override
+  public long getRowCacheCount() {
+    return this.rowCache != null ? this.rowCache.getCount() : 0L;
+  }
+
+  @Override
+  public long getRowCacheEvictedRowCount() {
+    return this.rowCache != null ? this.rowCache.getEvictedRowCount() : 0L;
+  }
+
   @Override
   public long getByteBuffAllocatorHeapAllocationBytes() {
     return ByteBuffAllocator.getHeapAllocationBytes(allocator, 
ByteBuffAllocator.HEAP);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 35371cb74ae..584d4084896 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -668,7 +668,7 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
           result = 
region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
         }
         if (result == null) {
-          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
+          result = region.checkAndMutate(mutations, checkAndMutate, 
nonceGroup, nonce);
           if (region.getCoprocessorHost() != null) {
             result = 
region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
           }
@@ -2347,8 +2347,22 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
       return bulkLoadHFileInternal(request);
     }
 
-    // TODO: implement row cache logic for bulk load
-    return bulkLoadHFileInternal(request);
+    RowCache rowCache = region.getRegionServerServices().getRowCache();
+    String encodedRegionName = region.getRegionInfo().getEncodedName();
+
+    // Since bulkload modifies the store files, the row cache should be 
disabled until the bulkload
+    // is finished.
+    rowCache.createRegionLevelBarrier(encodedRegionName);
+    try {
+      // We do not invalidate the entire row cache directly, as it contains a 
large number of
+      // entries and takes a long time. Instead, we increment rowCacheSeqNum, 
which is used when
+      // constructing a RowCacheKey, thereby making the existing row cache 
entries stale.
+      rowCache.increaseRowCacheSeqNum(region);
+      return bulkLoadHFileInternal(request);
+    } finally {
+      // The row cache for the region has been enabled again
+      rowCache.removeRegionLevelBarrier(encodedRegionName);
+    }
   }
 
   BulkLoadHFileResponse bulkLoadHFileInternal(final BulkLoadHFileRequest 
request)
@@ -2609,7 +2623,7 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
     RegionScannerImpl scanner = null;
     long blockBytesScannedBefore = context.getBlockBytesScanned();
     try {
-      scanner = region.getScannerWithResults(get, scan, results);
+      scanner = region.getScannerWithResults(get, scan, results, context);
     } finally {
       if (scanner != null) {
         if (closeCallBack == null) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java
index d5198c751fa..473c4ea1166 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java
@@ -18,36 +18,59 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
  * Facade for row-level caching in the RegionServer.
- *
- * <p>{@code RowCache} coordinates cache access for Get operations and
- * enforces cache consistency during mutations. It delegates actual
- * storage and eviction policy decisions (e.g., LRU, LFU) to a
- * {@link RowCacheStrategy} implementation.</p>
- *
- * <p>This class is responsible for:
+ * <p>
+ * {@code RowCache} coordinates cache access for Get operations and enforces 
cache consistency
+ * during mutations. It delegates actual storage and eviction policy decisions 
(e.g., LRU, LFU) to a
+ * {@link RowCacheStrategy} implementation.
+ * </p>
+ * <p>
+ * This class is responsible for:
  * <ul>
- *   <li>Determining whether row cache is enabled for a region</li>
- *   <li>Attempting cache lookups before falling back to the normal read 
path</li>
- *   <li>Populating the cache after successful reads</li>
- *   <li>Evicting affected rows on mutations to maintain correctness</li>
+ * <li>Determining whether row cache is enabled for a region</li>
+ * <li>Attempting cache lookups before falling back to the normal read 
path</li>
+ * <li>Populating the cache after successful reads</li>
+ * <li>Evicting affected rows on mutations to maintain correctness</li>
  * </ul>
- *
- * <p>{@code RowCache} does not implement caching policy or storage directly;
- * those concerns are encapsulated by {@code RowCacheStrategy}.</p>
+ * <p>
+ * {@code RowCache} does not implement caching policy or storage directly; 
those concerns are
+ * encapsulated by {@code RowCacheStrategy}.
+ * </p>
  */
 @org.apache.yetus.audience.InterfaceAudience.Private
 public class RowCache {
+  /**
+   * A barrier that prevents the row cache from being populated during region 
operations, such as
+   * bulk loads. It is implemented as a counter to address issues that arise 
when the same region is
+   * updated concurrently. Keyed by the encoded region name.
+   */
+  private final Map<String, AtomicInteger> regionLevelBarrierMap = new 
ConcurrentHashMap<>();
+  /**
+   * A barrier that prevents the row cache from being populated during row 
mutations. It is
+   * implemented as a counter to address issues that arise when the same row 
is mutated
+   * concurrently.
+   */
+  private final Map<RowCacheKey, AtomicInteger> rowLevelBarrierMap = new 
ConcurrentHashMap<>();
+
   private final boolean enabledByConf;
   private final RowCacheStrategy rowCacheStrategy;
 
@@ -63,8 +86,10 @@ public class RowCache {
   RowCache(Configuration conf) {
     enabledByConf =
       conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, 
HConstants.ROW_CACHE_SIZE_DEFAULT) > 0;
-    // TODO: implement row cache
-    rowCacheStrategy = null;
+    Class<? extends RowCacheStrategy> strategyClass = conf.getClass(
+      HConstants.ROW_CACHE_STRATEGY_CLASS_KEY, TinyLfuRowCacheStrategy.class,
+      RowCacheStrategy.class);
+    rowCacheStrategy = ReflectionUtils.newInstance(strategyClass, conf);
   }
 
   <R> R mutateWithRowCacheBarrier(HRegion region, byte[] row, RowOperation<R> 
operation)
@@ -74,9 +99,39 @@ public class RowCache {
     }
 
     RowCacheKey key = new RowCacheKey(region, row);
-    // TODO: implement mutate with row cache barrier logic
-    evictRow(key);
-    return execute(operation);
+    try {
+      // Creates a barrier that prevents the row cache from being populated 
for this row
+      // during mutation. Reads for the row can instead be served from HFiles 
or the block cache.
+      createRowLevelBarrier(key);
+
+      // After creating the barrier, evict the existing row cache for this row,
+      // as it becomes invalid after the mutation
+      evictRow(key);
+
+      return execute(operation);
+    } finally {
+      // Remove the barrier after mutation to allow the row cache to be 
populated again
+      removeRowLevelBarrier(key);
+    }
+  }
+
+  /**
+   * Remove the barrier after mutation to allow the row cache to be populated 
again
+   * @param key the cache key of the row
+   */
+  void removeRowLevelBarrier(RowCacheKey key) {
+    rowLevelBarrierMap.computeIfPresent(key, (k, counter) -> {
+      int remaining = counter.decrementAndGet();
+      return (remaining <= 0) ? null : counter;
+    });
+  }
+
+  /**
+   * Creates a barrier to prevent the row cache from being populated for this 
row during mutation
+   * @param key the cache key of the row
+   */
+  void createRowLevelBarrier(RowCacheKey key) {
+    rowLevelBarrierMap.computeIfAbsent(key, k -> new 
AtomicInteger(0)).incrementAndGet();
   }
 
   <R> R mutateWithRowCacheBarrier(HRegion region, List<Mutation> mutations,
@@ -85,21 +140,88 @@ public class RowCache {
       return operation.execute();
     }
 
-    // TODO: implement mutate with row cache barrier logic
     Set<RowCacheKey> rowCacheKeys = new HashSet<>(mutations.size());
-    mutations.forEach(mutation -> rowCacheKeys.add(new RowCacheKey(region, 
mutation.getRow())));
-    rowCacheKeys.forEach(this::evictRow);
+    try {
+      // Evict the entire row cache
+      mutations.forEach(mutation -> rowCacheKeys.add(new RowCacheKey(region, 
mutation.getRow())));
+      rowCacheKeys.forEach(key -> {
+        // Creates a barrier that prevents the row cache from being populated 
for this row
+        // during mutation. Reads for the row can instead be served from 
HFiles or the block cache.
+        createRowLevelBarrier(key);
 
-    return execute(operation);
+        // After creating the barrier, evict the existing row cache for this 
row,
+        // as it becomes invalid after the mutation
+        evictRow(key);
+      });
+
+      return execute(operation);
+    } finally {
+      // Remove the barrier after mutation to allow the row cache to be 
populated again
+      rowCacheKeys.forEach(this::removeRowLevelBarrier);
+    }
   }
 
   void evictRow(RowCacheKey key) {
     rowCacheStrategy.evictRow(key);
   }
 
+  void evictRowsByRegion(String encodedRegionName) {
+    rowCacheStrategy.evictRowsByRegion(encodedRegionName);
+  }
+
+  // @formatter:off
+  /**
+   * Row cache is only enabled when the following conditions are met:
+   *  - Row cache is enabled at the table level.
+   *  - Cache blocks is enabled in the get request.
+   *  - A Get object cannot be distinguished from others except by its row key.
+   *    So we check equality for the following:
+   *    - filter
+   *    - retrieving cells
+   *    - TTL
+   *    - attributes
+   *    - CheckExistenceOnly
+   *    - ColumnFamilyTimeRange
+   *    - Consistency
+   *    - MaxResultsPerColumnFamily
+   *    - ReplicaId
+   *    - RowOffsetPerColumnFamily
+   * @param get the Get request
+   * @param region the Region
+   * @return true if the row can be cached, false otherwise
+   */
+  // @formatter:on
   boolean canCacheRow(Get get, Region region) {
-    // TODO: implement logic to determine if the row can be cached
-    return false;
+    return enabledByConf && region.isRowCacheEnabled() && get.getCacheBlocks()
+      && get.getFilter() == null && isRetrieveAllCells(get, region) && 
isDefaultTtl(region)
+      && get.getAttributesMap().isEmpty() && !get.isCheckExistenceOnly()
+      && get.getColumnFamilyTimeRange().isEmpty() && get.getConsistency() == 
Consistency.STRONG
+      && get.getMaxResultsPerColumnFamily() == -1 && get.getReplicaId() == -1
+      && get.getRowOffsetPerColumnFamily() == 0 && 
get.getTimeRange().isAllTime();
+  }
+
+  private static boolean isRetrieveAllCells(Get get, Region region) {
+    if (region.getTableDescriptor().getColumnFamilyCount() != 
get.numFamilies()) {
+      return false;
+    }
+
+    boolean hasQualifier = 
get.getFamilyMap().values().stream().anyMatch(Objects::nonNull);
+    return !hasQualifier;
+  }
+
+  private static boolean isDefaultTtl(Region region) {
+    return Arrays.stream(region.getTableDescriptor().getColumnFamilies())
+      .allMatch(cfd -> cfd.getTimeToLive() == 
ColumnFamilyDescriptorBuilder.DEFAULT_TTL);
+  }
+
+  // For testing only
+  public RowCells getRow(RowCacheKey key) {
+    return getRow(key, true);
+  }
+
+  // For testing only
+  RowCells getRow(RowCacheKey key, boolean caching) {
+    return rowCacheStrategy.getRow(key, caching);
   }
 
   boolean tryGetFromCache(RowCacheKey key, Get get, List<Cell> results) {
@@ -109,17 +231,81 @@ public class RowCache {
       return false;
     }
 
+    if (row.isExpired(EnvironmentEdgeManager.currentTime())) {
+      // A cell in the cached row has expired by its cell-level TTL. Drop the 
row from the cache
+      // and treat this as a miss so the caller falls back to the normal read 
path.
+      evictRow(key);
+      return false;
+    }
+
     results.addAll(row.getCells());
-    // TODO: implement update of metrics
     return true;
   }
 
-  void populateCache(List<Cell> results, RowCacheKey key) {
-    // TODO: implement with barrier to avoid cache read during mutation
-    try {
-      rowCacheStrategy.cacheRow(key, new RowCells(results));
-    } catch (CloneNotSupportedException ignored) {
-      // Not able to cache row cells, ignore
+  void cache(List<Cell> results, RowCacheKey key) {
+    if (results.isEmpty()) {
+      // Nothing to cache; avoid creating an empty entry that would just be a 
cache hit returning
+      // an empty row.
+      return;
     }
+    // The row cache is populated only when no region level barriers remain
+    regionLevelBarrierMap.computeIfAbsent(key.getEncodedRegionName(), t -> {
+      // The row cache is populated only when no row level barriers remain
+      rowLevelBarrierMap.computeIfAbsent(key, k -> {
+        try {
+          rowCacheStrategy.cacheRow(key, new RowCells(results));
+        } catch (CloneNotSupportedException ignored) {
+          // Not able to cache row cells, ignore
+        }
+        return null;
+      });
+      return null;
+    });
+  }
+
+  void createRegionLevelBarrier(String encodedRegionName) {
+    regionLevelBarrierMap.computeIfAbsent(encodedRegionName, k -> new 
AtomicInteger(0))
+      .incrementAndGet();
+  }
+
+  void increaseRowCacheSeqNum(HRegion region) {
+    region.increaseRowCacheSeqNum();
+  }
+
+  void removeRegionLevelBarrier(String encodedRegionName) {
+    regionLevelBarrierMap.computeIfPresent(encodedRegionName, (k, counter) -> {
+      int remaining = counter.decrementAndGet();
+      return (remaining <= 0) ? null : counter;
+    });
+  }
+
+  long getHitCount() {
+    return rowCacheStrategy.getHitCount();
+  }
+
+  long getMissCount() {
+    return rowCacheStrategy.getMissCount();
+  }
+
+  long getSize() {
+    return rowCacheStrategy.getSize();
+  }
+
+  long getCount() {
+    return rowCacheStrategy.getCount();
+  }
+
+  long getEvictedRowCount() {
+    return rowCacheStrategy.getEvictedRowCount();
+  }
+
+  // For testing only
+  AtomicInteger getRowLevelBarrier(RowCacheKey key) {
+    return rowLevelBarrierMap.get(key);
+  }
+
+  // For testing only
+  AtomicInteger getRegionLevelBarrier(String encodedRegionName) {
+    return regionLevelBarrierMap.get(encodedRegionName);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java
index 09ec68194ea..4d0e51fa1e8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java
@@ -41,6 +41,10 @@ public class RowCacheKey implements HeapSize {
     this.rowCacheSeqNum = region.getRowCacheSeqNum();
   }
 
+  String getEncodedRegionName() {
+    return encodedRegionName;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (o == null || getClass() != o.getClass()) return false;
@@ -65,7 +69,7 @@ public class RowCacheKey implements HeapSize {
     return FIXED_OVERHEAD + ClassSize.align(rowKey.length);
   }
 
-  boolean isSameRegion(HRegion region) {
-    return 
this.encodedRegionName.equals(region.getRegionInfo().getEncodedName());
+  boolean isSameRegion(String encodedRegionName) {
+    return this.encodedRegionName.equals(encodedRegionName);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheStrategy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheStrategy.java
index 0b95f7eb957..edb81a552d3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheStrategy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheStrategy.java
@@ -48,9 +48,9 @@ public interface RowCacheStrategy {
   /**
    * Evict all rows belonging to the specified region. This is heavy operation 
as it iterates the
    * entire RowCache key set.
-   * @param region the region whose rows should be evicted
+   * @param encodedRegionName the encoded name of the region whose rows should 
be evicted
    */
-  void evictRowsByRegion(HRegion region);
+  void evictRowsByRegion(String encodedRegionName);
 
   /**
    * Get the number of rows in the cache.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java
index 2f44058e0a2..7b29de61c9c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java
@@ -18,9 +18,13 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.util.ClassSize;
 
@@ -29,20 +33,47 @@ public class RowCells implements HeapSize {
   public static final long FIXED_OVERHEAD = 
ClassSize.estimateBase(RowCells.class, false);
 
   private final List<Cell> cells = new ArrayList<>();
+  /**
+   * Earliest expiration time among contained cells, derived from cell-level 
TTL tags. Set to
+   * {@link Long#MAX_VALUE} when no cell carries a TTL tag, which lets the row 
cache short-circuit
+   * the expiration check on every hit.
+   */
+  private final long earliestExpirationMs;
 
   public RowCells(List<Cell> cells) throws CloneNotSupportedException {
+    long earliest = Long.MAX_VALUE;
     for (Cell cell : cells) {
       if (!(cell instanceof ExtendedCell extCell)) {
         throw new CloneNotSupportedException("Cell is not an ExtendedCell");
       }
       try {
         // To garbage collect the objects referenced by the cells
-        this.cells.add(extCell.deepClone());
+        ExtendedCell cloned = extCell.deepClone();
+        this.cells.add(cloned);
+        long exp = expirationTimeOf(cloned);
+        if (exp < earliest) {
+          earliest = exp;
+        }
       } catch (RuntimeException e) {
-        // throw new CloneNotSupportedException("Deep clone failed");
-        this.cells.add(extCell);
+        throw new CloneNotSupportedException("Deep clone failed");
       }
     }
+    this.earliestExpirationMs = earliest;
+  }
+
+  private static long expirationTimeOf(ExtendedCell cell) {
+    Iterator<Tag> i = PrivateCellUtil.tagsIterator(cell);
+    while (i.hasNext()) {
+      Tag t = i.next();
+      if (TagType.TTL_TAG_TYPE == t.getType()) {
+        return cell.getTimestamp() + Tag.getValueAsLong(t);
+      }
+    }
+    return Long.MAX_VALUE;
+  }
+
+  public boolean isExpired(long now) {
+    return earliestExpirationMs < now;
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TinyLfuRowCacheStrategy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TinyLfuRowCacheStrategy.java
new file mode 100644
index 00000000000..9e1c7bf17a2
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TinyLfuRowCacheStrategy.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Policy;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
[email protected]
+public class TinyLfuRowCacheStrategy implements RowCacheStrategy {
+  private final class EvictionListener
+    implements RemovalListener<@NonNull RowCacheKey, @NonNull RowCells> {
+    @Override
+    public void onRemoval(RowCacheKey key, RowCells value, @NonNull 
RemovalCause cause) {
+      evictedRowCount.increment();
+    }
+  }
+
+  private final Cache<@NonNull RowCacheKey, RowCells> cache;
+
+  // Cache.stats() does not provide eviction count for entries, so we maintain 
our own counter.
+  private final LongAdder evictedRowCount = new LongAdder();
+
+  public TinyLfuRowCacheStrategy(Configuration conf) {
+    this(MemorySizeUtil.getRowCacheSize(conf));
+  }
+
+  private TinyLfuRowCacheStrategy(long maxSizeBytes) {
+    if (maxSizeBytes <= 0) {
+      cache = Caffeine.newBuilder().maximumSize(0).build();
+      return;
+    }
+
+    cache =
+      Caffeine.newBuilder().maximumWeight(maxSizeBytes).removalListener(new 
EvictionListener())
+        .weigher((RowCacheKey key,
+          RowCells value) -> (int) Math.min(key.heapSize() + value.heapSize(), 
Integer.MAX_VALUE))
+        .recordStats().build();
+  }
+
+  @Override
+  public void cacheRow(RowCacheKey key, RowCells value) {
+    cache.put(key, value);
+  }
+
+  @Override
+  public void evictRow(RowCacheKey key) {
+    cache.asMap().remove(key);
+  }
+
+  @Override
+  public void evictRowsByRegion(String encodedRegionName) {
+    cache.asMap().keySet().removeIf(key -> 
key.isSameRegion(encodedRegionName));
+  }
+
+  @Override
+  public long getCount() {
+    return cache.estimatedSize();
+  }
+
+  @Override
+  public long getEvictedRowCount() {
+    return evictedRowCount.sum();
+  }
+
+  @Override
+  public long getHitCount() {
+    return cache.stats().hitCount();
+  }
+
+  @Override
+  public long getMaxSize() {
+    Optional<Long> result = 
cache.policy().eviction().map(Policy.Eviction::getMaximum);
+    return result.orElse(-1L);
+  }
+
+  @Override
+  public long getMissCount() {
+    return cache.stats().missCount();
+  }
+
+  @Override
+  public RowCells getRow(RowCacheKey key, boolean caching) {
+    if (!caching) {
+      return null;
+    }
+
+    return cache.getIfPresent(key);
+  }
+
+  @Override
+  public long getSize() {
+    Optional<OptionalLong> result = 
cache.policy().eviction().map(Policy.Eviction::weightedSize);
+    return result.orElse(OptionalLong.of(-1L)).orElse(-1L);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
index f1b6efe50a9..6b677f2d122 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
@@ -662,6 +662,31 @@ public class MetricsRegionServerWrapperStub implements 
MetricsRegionServerWrappe
     return 0;
   }
 
+  @Override
+  public long getRowCacheHitCount() {
+    return 2;
+  }
+
+  @Override
+  public long getRowCacheMissCount() {
+    return 1;
+  }
+
+  @Override
+  public long getRowCacheEvictedRowCount() {
+    return 0;
+  }
+
+  @Override
+  public long getRowCacheSize() {
+    return 1;
+  }
+
+  @Override
+  public long getRowCacheCount() {
+    return 2;
+  }
+
   @Override
   public int getSplitQueueSize() {
     return 0;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
index aac2a5922b9..76c2a8ad6e4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
@@ -148,6 +148,11 @@ public class TestMetricsRegionServer {
     HELPER.assertGauge("l2CacheHitRatio", 90, serverSource);
     HELPER.assertGauge("l2CacheMissRatio", 10, serverSource);
     HELPER.assertCounter("updatesBlockedTime", 419, serverSource);
+    HELPER.assertCounter("rowCacheHitCount", 2, serverSource);
+    HELPER.assertCounter("rowCacheMissCount", 1, serverSource);
+    HELPER.assertCounter("rowCacheEvictedRowCount", 0, serverSource);
+    HELPER.assertGauge("rowCacheSize", 1, serverSource);
+    HELPER.assertGauge("rowCacheCount", 2, serverSource);
   }
 
   @Test
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCache.java
new file mode 100644
index 00000000000..239c40f0168
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCache.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
+import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.ROW_CACHE_EVICTED_ROW_COUNT;
+import static 
org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.ROW_CACHE_HIT_COUNT;
+import static 
org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.ROW_CACHE_MISS_COUNT;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+@Tag(RegionServerTests.TAG)
+@Tag(MediumTests.TAG)
+public class TestRowCache {
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  private static final byte[] CF1 = Bytes.toBytes("cf1");
+  private static final byte[] CF2 = Bytes.toBytes("cf2");
+  private static final byte[] Q1 = Bytes.toBytes("q1");
+  private static final byte[] Q2 = Bytes.toBytes("q2");
+
+  private static MetricsAssertHelper metricsHelper;
+  private static MetricsRegionServer regionServerMetrics;
+  private static MetricsRegionServerSource serverSource;
+
+  private static Admin admin;
+  private static RowCache rowCache;
+
+  private TableName tableName;
+  private Table table;
+  HRegion region;
+  private final Map<String, Long> counterBase = new HashMap<>();
+
+  @BeforeAll
+  public static void beforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+
+    // Enable row cache but reduce the block cache size to fit in 80% of the 
heap
+    conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f);
+    conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f);
+
+    SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster();
+    cluster.waitForActiveAndReadyMaster();
+    admin = TEST_UTIL.getAdmin();
+
+    metricsHelper = 
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+    HRegionServer regionServer = cluster.getRegionServer(0);
+    regionServerMetrics = regionServer.getMetrics();
+    serverSource = regionServerMetrics.getMetricsSource();
+
+    rowCache = regionServer.getRSRpcServices().getServer().getRowCache();
+  }
+
+  @AfterAll
+  public static void afterClass() throws Exception {
+    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @BeforeEach
+  public void beforeTestMethod(TestInfo testInfo) throws Exception {
+    ColumnFamilyDescriptor cf1 = 
ColumnFamilyDescriptorBuilder.newBuilder(CF1).build();
+    // To test data block encoding
+    ColumnFamilyDescriptor cf2 = ColumnFamilyDescriptorBuilder.newBuilder(CF2)
+      .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF).build();
+
+    tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
+    TableDescriptor td = 
TableDescriptorBuilder.newBuilder(tableName).setRowCacheEnabled(true)
+      .setColumnFamily(cf1).setColumnFamily(cf2).build();
+    admin.createTable(td);
+    table = admin.getConnection().getTable(tableName);
+    region = 
TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegions().stream()
+      .filter(r -> 
r.getRegionInfo().getTable().equals(tableName)).findFirst().orElseThrow();
+  }
+
+  @AfterEach
+  public void afterTestMethod() throws Exception {
+    counterBase.clear();
+
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+  }
+
+  private void setCounterBase(String metric, long value) {
+    counterBase.put(metric, value);
+  }
+
+  private void assertCounterDiff(String metric, long diff) {
+    Long base = counterBase.get(metric);
+    if (base == null) {
+      throw new IllegalStateException(
+        "base counter of " + metric + " metric should have been set before by 
setCounterBase()");
+    }
+    long newValue = base + diff;
+    metricsHelper.assertCounter(metric, newValue, serverSource);
+    counterBase.put(metric, newValue);
+  }
+
+  private static void recomputeMetrics() {
+    regionServerMetrics.getRegionServerWrapper().forceRecompute();
+  }
+
+  @Test
+  public void testGetWithRowCache() throws IOException {
+    byte[] rowKey = "row".getBytes();
+    Get get = new Get(rowKey);
+    Result result;
+
+    RowCacheKey rowCacheKey = new RowCacheKey(region, rowKey);
+
+    // Initialize metrics
+    recomputeMetrics();
+    setCounterBase("Get_num_ops", metricsHelper.getCounter("Get_num_ops", 
serverSource));
+    setCounterBase(ROW_CACHE_HIT_COUNT,
+      metricsHelper.getCounter(ROW_CACHE_HIT_COUNT, serverSource));
+    setCounterBase(ROW_CACHE_MISS_COUNT,
+      metricsHelper.getCounter(ROW_CACHE_MISS_COUNT, serverSource));
+    setCounterBase(ROW_CACHE_EVICTED_ROW_COUNT,
+      metricsHelper.getCounter(ROW_CACHE_EVICTED_ROW_COUNT, serverSource));
+
+    // Put a row
+    Put put = new Put(rowKey);
+    put.addColumn(CF1, Q1, Bytes.toBytes(0L));
+    put.addColumn(CF1, Q2, "12".getBytes());
+    put.addColumn(CF2, Q1, "21".getBytes());
+    put.addColumn(CF2, Q2, "22".getBytes());
+    table.put(put);
+    admin.flush(tableName);
+    recomputeMetrics();
+    assertCounterDiff(ROW_CACHE_HIT_COUNT, 0);
+    assertCounterDiff(ROW_CACHE_MISS_COUNT, 0);
+    assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0);
+
+    // First get to populate the row cache
+    result = table.get(get);
+    recomputeMetrics();
+    assertArrayEquals(rowKey, result.getRow());
+    assertArrayEquals(Bytes.toBytes(0L), result.getValue(CF1, Q1));
+    assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2));
+    assertArrayEquals("21".getBytes(), result.getValue(CF2, Q1));
+    assertArrayEquals("22".getBytes(), result.getValue(CF2, Q2));
+    assertCounterDiff("Get_num_ops", 1);
+    // Ensure the get operation from HFile without row cache
+    assertCounterDiff(ROW_CACHE_HIT_COUNT, 0);
+    assertCounterDiff(ROW_CACHE_MISS_COUNT, 1);
+    assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0);
+
+    // Get from the row cache
+    result = table.get(get);
+    recomputeMetrics();
+    assertArrayEquals(rowKey, result.getRow());
+    assertArrayEquals(Bytes.toBytes(0L), result.getValue(CF1, Q1));
+    assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2));
+    assertArrayEquals("21".getBytes(), result.getValue(CF2, Q1));
+    assertArrayEquals("22".getBytes(), result.getValue(CF2, Q2));
+    assertCounterDiff("Get_num_ops", 1);
+    // Ensure the get operation from the row cache
+    assertCounterDiff(ROW_CACHE_HIT_COUNT, 1);
+    assertCounterDiff(ROW_CACHE_MISS_COUNT, 0);
+    assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0);
+
+    // Row cache is invalidated by the put operation
+    assertNotNull(rowCache.getRow(rowCacheKey));
+    table.put(put);
+    recomputeMetrics();
+    assertCounterDiff(ROW_CACHE_HIT_COUNT, 1);
+    assertCounterDiff(ROW_CACHE_MISS_COUNT, 0);
+    assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 1);
+
+    // Get is executed without the row cache; however, the cache is 
re-populated as a result
+    result = table.get(get);
+    recomputeMetrics();
+    assertArrayEquals(rowKey, result.getRow());
+    assertCounterDiff("Get_num_ops", 1);
+    // Ensure the get operation not from the row cache
+    assertCounterDiff(ROW_CACHE_HIT_COUNT, 0);
+    assertCounterDiff(ROW_CACHE_MISS_COUNT, 1);
+    assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0);
+
+    // Get again with the row cache
+    result = table.get(get);
+    recomputeMetrics();
+    assertArrayEquals(rowKey, result.getRow());
+    assertCounterDiff("Get_num_ops", 1);
+    // Ensure the get operation from the row cache
+    assertCounterDiff(ROW_CACHE_HIT_COUNT, 1);
+    assertCounterDiff(ROW_CACHE_MISS_COUNT, 0);
+    assertCounterDiff(ROW_CACHE_EVICTED_ROW_COUNT, 0);
+
+    // Row cache is invalidated by the increment operation
+    assertNotNull(rowCache.getRow(rowCacheKey));
+    table.incrementColumnValue(rowKey, CF1, Q1, 1);
+    assertNull(rowCache.getRow(rowCacheKey));
+
+    // Get is executed without the row cache; however, the cache is 
re-populated as a result
+    table.get(get);
+    assertNotNull(rowCache.getRow(rowCacheKey));
+
+    // Row cache is invalidated by the append operation
+    assertNotNull(rowCache.getRow(rowCacheKey));
+    Append append = new Append(rowKey);
+    append.addColumn(CF1, Q1, Bytes.toBytes(0L));
+    table.append(append);
+    assertNull(rowCache.getRow(rowCacheKey));
+
+    // Get is executed without the row cache; however, the cache is 
re-populated as a result
+    table.get(get);
+    assertNotNull(rowCache.getRow(rowCacheKey));
+
+    // Row cache is invalidated by the delete operation
+    assertNotNull(rowCache.getRow(rowCacheKey));
+    Delete delete = new Delete(rowKey);
+    delete.addColumn(CF1, Q1);
+    table.delete(delete);
+    assertNull(rowCache.getRow(rowCacheKey));
+  }
+
+  @Test
+  public void testPutWithTTL() throws Exception {
+    // Cell-level TTL set via Put.setTTL is supported: the cached row is 
invalidated on hit when
+    // the cell has expired.
+    byte[] rowKey = "row".getBytes();
+    RowCacheKey rowCacheKey = new RowCacheKey(region, rowKey);
+
+    Put put = new Put(rowKey).addColumn(CF1, Q1, "v".getBytes());
+    put.setTTL(60_000);
+    table.put(put);
+    // Flush so that the next Get reads from HFile (memstore-only reads do not 
populate the cache)
+    admin.flush(tableName);
+
+    // First Get populates the cache
+    Result first = table.get(new Get(rowKey));
+    assertFalse(first.isEmpty());
+    assertNotNull(rowCache.getRow(rowCacheKey));
+
+    // Advance time beyond the cell TTL
+    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
+    edge.setValue(EnvironmentEdgeManager.currentTime() + 120_000);
+    EnvironmentEdgeManager.injectEdge(edge);
+    try {
+      // Cache hit detects expiration, evicts the row, and falls back to the 
read path. The
+      // storage path also filters the expired cell, so the result is empty.
+      Result second = table.get(new Get(rowKey));
+      assertTrue(second.isEmpty());
+      assertNull(rowCache.getRow(rowCacheKey));
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
+  @Test
+  public void testCheckAndMutate() throws IOException {
+    byte[] rowKey = "row".getBytes();
+    Get get = new Get(rowKey);
+    Result result;
+    CheckAndMutate cam;
+    CheckAndMutateResult camResult;
+
+    RowCacheKey rowCacheKey = new RowCacheKey(region, rowKey);
+
+    // Put a row
+    Put put1 = new Put(rowKey);
+    put1.addColumn(CF1, Q1, "11".getBytes());
+    put1.addColumn(CF1, Q2, "12".getBytes());
+    table.put(put1);
+    admin.flush(tableName);
+
+    // Validate that the row cache is populated
+    result = table.get(get);
+    assertNotNull(rowCache.getRow(rowCacheKey));
+    assertArrayEquals("11".getBytes(), result.getValue(CF1, Q1));
+    assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2));
+
+    // The row cache is not invalidated when a checkAndMutate operation fails
+    Put put2 = new Put(rowKey);
+    put2.addColumn(CF1, Q2, "1212".getBytes());
+    cam = CheckAndMutate.newBuilder(rowKey).ifEquals(CF1, Q2, 
"00".getBytes()).build(put2);
+    camResult = table.checkAndMutate(cam);
+    assertFalse(camResult.isSuccess());
+    assertNull(rowCache.getRow(rowCacheKey));
+
+    // Validate that the row cache is populated
+    result = table.get(get);
+    assertNotNull(rowCache.getRow(rowCacheKey));
+    assertArrayEquals("11".getBytes(), result.getValue(CF1, Q1));
+    assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2));
+
+    // The row cache is invalidated by a checkAndMutate operation
+    cam = CheckAndMutate.newBuilder(rowKey).ifEquals(CF1, Q2, 
"12".getBytes()).build(put2);
+    camResult = table.checkAndMutate(cam);
+    assertTrue(camResult.isSuccess());
+    assertNull(rowCache.getRow(rowCacheKey));
+  }
+
+  @Test
+  public void testCheckAndMutates() throws IOException {
+    byte[] rowKey1 = "row1".getBytes();
+    byte[] rowKey2 = "row2".getBytes();
+    Get get1 = new Get(rowKey1);
+    Get get2 = new Get(rowKey2);
+    Result result1, result2;
+    List<CheckAndMutate> cams;
+    List<CheckAndMutateResult> camResults;
+
+    RowCacheKey rowCacheKey1 = new RowCacheKey(region, rowKey1);
+    RowCacheKey rowCacheKey2 = new RowCacheKey(region, rowKey2);
+
+    // Put rows
+    Put put1 = new Put(rowKey1);
+    put1.addColumn(CF1, Q1, "111".getBytes());
+    put1.addColumn(CF1, Q2, "112".getBytes());
+    table.put(put1);
+    Put put2 = new Put(rowKey2);
+    put2.addColumn(CF1, Q1, "211".getBytes());
+    put2.addColumn(CF1, Q2, "212".getBytes());
+    table.put(put2);
+    admin.flush(tableName);
+
+    // Validate that the row caches are populated
+    result1 = table.get(get1);
+    assertNotNull(rowCache.getRow(rowCacheKey1));
+    assertArrayEquals("111".getBytes(), result1.getValue(CF1, Q1));
+    assertArrayEquals("112".getBytes(), result1.getValue(CF1, Q2));
+    result2 = table.get(get2);
+    assertNotNull(rowCache.getRow(rowCacheKey2));
+    assertArrayEquals("211".getBytes(), result2.getValue(CF1, Q1));
+    assertArrayEquals("212".getBytes(), result2.getValue(CF1, Q2));
+
+    // The row caches are invalidated by checkAndMutate operations
+    cams = new ArrayList<>();
+    cams.add(CheckAndMutate.newBuilder(rowKey1).ifEquals(CF1, Q2, 
"112".getBytes()).build(put1));
+    cams.add(CheckAndMutate.newBuilder(rowKey2).ifEquals(CF1, Q2, 
"212".getBytes()).build(put2));
+    camResults = table.checkAndMutate(cams);
+    assertTrue(camResults.get(0).isSuccess());
+    assertTrue(camResults.get(1).isSuccess());
+    assertNull(rowCache.getRow(rowCacheKey1));
+    assertNull(rowCache.getRow(rowCacheKey2));
+  }
+
+  @Test
+  public void testRowMutations() throws IOException {
+    byte[] rowKey1 = "row1".getBytes();
+    byte[] rowKey2 = "row2".getBytes();
+    Get get1 = new Get(rowKey1);
+    Get get2 = new Get(rowKey2);
+    Result result1, result2;
+
+    RowCacheKey rowCacheKey1 = new RowCacheKey(region, rowKey1);
+    RowCacheKey rowCacheKey2 = new RowCacheKey(region, rowKey2);
+
+    // Put rows
+    Put put1 = new Put(rowKey1);
+    put1.addColumn(CF1, Q1, "111".getBytes());
+    put1.addColumn(CF1, Q2, "112".getBytes());
+    table.put(put1);
+    Put put2 = new Put(rowKey2);
+    put2.addColumn(CF1, Q1, "211".getBytes());
+    put2.addColumn(CF1, Q2, "212".getBytes());
+    table.put(put2);
+    admin.flush(tableName);
+
+    // Validate that the row caches are populated
+    result1 = table.get(get1);
+    assertNotNull(rowCache.getRow(rowCacheKey1));
+    assertArrayEquals("111".getBytes(), result1.getValue(CF1, Q1));
+    assertArrayEquals("112".getBytes(), result1.getValue(CF1, Q2));
+    result2 = table.get(get2);
+    assertNotNull(rowCache.getRow(rowCacheKey1));
+    assertArrayEquals("211".getBytes(), result2.getValue(CF1, Q1));
+    assertArrayEquals("212".getBytes(), result2.getValue(CF1, Q2));
+
+    // The row caches are invalidated by batch operation
+    Put put12 = new Put(rowKey1);
+    put12.addColumn(CF1, Q1, "111111".getBytes());
+    Put put13 = new Put(rowKey1);
+    put13.addColumn(CF1, Q2, "112112".getBytes());
+    RowMutations rms = new RowMutations(rowKey1);
+    rms.add(put12);
+    rms.add(put13);
+    CheckAndMutate cam =
+      CheckAndMutate.newBuilder(rowKey1).ifEquals(CF1, Q1, 
"111".getBytes()).build(rms);
+    table.checkAndMutate(cam);
+    assertNull(rowCache.getRow(rowCacheKey1));
+    assertNotNull(rowCache.getRow(rowCacheKey2));
+
+    // Validate that the row caches are populated
+    result1 = table.get(get1);
+    assertNotNull(rowCache.getRow(rowCacheKey1));
+    assertArrayEquals("111111".getBytes(), result1.getValue(CF1, Q1));
+    assertArrayEquals("112112".getBytes(), result1.getValue(CF1, Q2));
+    result2 = table.get(get2);
+    assertNotNull(rowCache.getRow(rowCacheKey1));
+    assertArrayEquals("211".getBytes(), result2.getValue(CF1, Q1));
+    assertArrayEquals("212".getBytes(), result2.getValue(CF1, Q2));
+  }
+
+  @Test
+  public void testBatch() throws IOException, InterruptedException {
+    byte[] rowKey1 = "row1".getBytes();
+    byte[] rowKey2 = "row2".getBytes();
+    byte[] rowKey3 = "row3".getBytes();
+    Get get1 = new Get(rowKey1);
+    Get get2 = new Get(rowKey2);
+    Get get3 = new Get(rowKey3);
+    List<Row> batchOperations;
+    Object[] results;
+
+    RowCacheKey rowCacheKey1 = new RowCacheKey(region, rowKey1);
+    RowCacheKey rowCacheKey2 = new RowCacheKey(region, rowKey2);
+    RowCacheKey rowCacheKey3 = new RowCacheKey(region, rowKey3);
+
+    // Put rows
+    batchOperations = new ArrayList<>();
+    Put put1 = new Put(rowKey1);
+    put1.addColumn(CF1, Q1, "111".getBytes());
+    put1.addColumn(CF1, Q2, "112".getBytes());
+    batchOperations.add(put1);
+    Put put2 = new Put(rowKey2);
+    put2.addColumn(CF1, Q1, "211".getBytes());
+    put2.addColumn(CF1, Q2, "212".getBytes());
+    batchOperations.add(put2);
+    Put put3 = new Put(rowKey3);
+    put3.addColumn(CF1, Q1, "311".getBytes());
+    put3.addColumn(CF1, Q2, "312".getBytes());
+    batchOperations.add(put3);
+    results = new Result[batchOperations.size()];
+    table.batch(batchOperations, results);
+    admin.flush(tableName);
+
+    // Validate that the row caches are populated
+    batchOperations = new ArrayList<>();
+    batchOperations.add(get1);
+    batchOperations.add(get2);
+    batchOperations.add(get3);
+    results = new Object[batchOperations.size()];
+    table.batch(batchOperations, results);
+    assertEquals(3, results.length);
+    assertNotNull(rowCache.getRow(rowCacheKey1));
+    assertArrayEquals("111".getBytes(), ((Result) results[0]).getValue(CF1, 
Q1));
+    assertArrayEquals("112".getBytes(), ((Result) results[0]).getValue(CF1, 
Q2));
+    assertNotNull(rowCache.getRow(rowCacheKey2));
+    assertArrayEquals("211".getBytes(), ((Result) results[1]).getValue(CF1, 
Q1));
+    assertArrayEquals("212".getBytes(), ((Result) results[1]).getValue(CF1, 
Q2));
+    assertNotNull(rowCache.getRow(rowCacheKey3));
+    assertArrayEquals("311".getBytes(), ((Result) results[2]).getValue(CF1, 
Q1));
+    assertArrayEquals("312".getBytes(), ((Result) results[2]).getValue(CF1, 
Q2));
+
+    // The row caches are invalidated by batch operation
+    batchOperations = new ArrayList<>();
+    batchOperations.add(put1);
+    Put put2New = new Put(rowKey2);
+    put2New.addColumn(CF1, Q1, "211211".getBytes());
+    put2New.addColumn(CF1, Q2, "212".getBytes());
+    CheckAndMutate cam =
+      CheckAndMutate.newBuilder(rowKey2).ifEquals(CF1, Q1, 
"211".getBytes()).build(put2New);
+    batchOperations.add(cam);
+    results = new Object[batchOperations.size()];
+    table.batch(batchOperations, results);
+    assertEquals(2, results.length);
+    assertNull(rowCache.getRow(rowCacheKey1));
+    assertNull(rowCache.getRow(rowCacheKey2));
+    assertNotNull(rowCache.getRow(rowCacheKey3));
+  }
+
+  @Test
+  public void testGetFromMemstoreOnly() throws IOException, 
InterruptedException {
+    byte[] rowKey = "row".getBytes();
+    RowCacheKey rowCacheKey = new RowCacheKey(region, rowKey);
+
+    // Put a row into memstore only, not flushed to HFile yet
+    Put put = new Put(rowKey);
+    put.addColumn(CF1, Q1, Bytes.toBytes(0L));
+    table.put(put);
+
+    // Get from memstore only
+    Get get = new Get(rowKey);
+    table.get(get);
+
+    // Validate that the row cache is not populated
+    assertNull(rowCache.getRow(rowCacheKey));
+
+    // Flush memstore to HFile, then get again
+    admin.flush(tableName);
+    get = new Get(rowKey);
+    table.get(get);
+
+    // Validate that the row cache is populated now
+    assertNotNull(rowCache.getRow(rowCacheKey));
+
+    // Put another qualifier. And now the cells are in both memstore and HFile.
+    put = new Put(rowKey);
+    put.addColumn(CF1, Q2, Bytes.toBytes(0L));
+    table.put(put);
+
+    // Validate that the row cache is invalidated
+    assertNull(rowCache.getRow(rowCacheKey));
+
+    // Get from memstore and HFile
+    get = new Get(rowKey);
+    table.get(get);
+    assertNotNull(rowCache.getRow(rowCacheKey));
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheCanCacheRow.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheCanCacheRow.java
new file mode 100644
index 00000000000..ea3ed188b75
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheCanCacheRow.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.security.visibility.Authorizations;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestRowCacheCanCacheRow {
+  private static final byte[] CF1 = "cf1".getBytes();
+  private static final byte[] CF2 = "cf2".getBytes();
+  private static final byte[] ROW_KEY = "row".getBytes();
+  private static final TableName TABLE_NAME = TableName.valueOf("test");
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRowCacheCanCacheRow.class);
+
+  @Test
+  public void testRowCacheEnabledByTable() {
+    Region region = Mockito.mock(Region.class);
+    ColumnFamilyDescriptor cfd = 
ColumnFamilyDescriptorBuilder.newBuilder(CF1).build();
+    TableDescriptor td;
+
+    Get get = new Get(ROW_KEY);
+    get.addFamily(CF1);
+
+    td = 
TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true).setColumnFamily(cfd)
+      .build();
+    Mockito.when(region.getTableDescriptor()).thenReturn(td);
+    Configuration conf = HBaseConfiguration.create();
+    conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f);
+    Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf);
+    
Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled());
+
+    RowCache rowCache = new RowCache(conf);
+    Assert.assertTrue(rowCache.canCacheRow(get, region));
+
+    // Disable row cache, expect false
+    td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(cfd)
+      .setRowCacheEnabled(false).build();
+    
Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled());
+    Assert.assertFalse(rowCache.canCacheRow(get, region));
+  }
+
+  @Test
+  public void testRowCacheDisabledByConfig() {
+    Region region = Mockito.mock(Region.class);
+    Configuration conf = HBaseConfiguration.create();
+    Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf);
+
+    ColumnFamilyDescriptor cfd = 
ColumnFamilyDescriptorBuilder.newBuilder(CF1).build();
+    TableDescriptor td;
+
+    Get get = new Get(ROW_KEY);
+    get.addFamily(CF1);
+
+    // Row cache enabled at table level, but disabled by row cache size 0, 
expect false
+    td = 
TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true).setColumnFamily(cfd)
+      .build();
+    Mockito.when(region.getTableDescriptor()).thenReturn(td);
+
+    RowCache rowCache = new RowCache(conf);
+    Assert.assertFalse(rowCache.canCacheRow(get, region));
+  }
+
+  @Test
+  public void testRetrieveAllCells() {
+    Region region = Mockito.mock(Region.class);
+    ColumnFamilyDescriptor cfd1 = 
ColumnFamilyDescriptorBuilder.newBuilder(CF1).build();
+    ColumnFamilyDescriptor cfd2 = 
ColumnFamilyDescriptorBuilder.newBuilder(CF2).build();
+    TableDescriptor td = 
TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true)
+      .setColumnFamily(cfd1).setColumnFamily(cfd2).build();
+    Mockito.when(region.getTableDescriptor()).thenReturn(td);
+    
Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled());
+    Configuration conf = HBaseConfiguration.create();
+    conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f);
+    Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf);
+    RowCache rowCache = new RowCache(conf);
+
+    // Not all CFs, expect false
+    Get get = new Get(ROW_KEY);
+    get.addFamily(CF1);
+    Assert.assertFalse(rowCache.canCacheRow(get, region));
+
+    // All CFs, expect true
+    get.addFamily(CF2);
+    Assert.assertTrue(rowCache.canCacheRow(get, region));
+
+    // Not all qualifiers, expect false
+    get.addColumn(CF1, "q1".getBytes());
+    Assert.assertFalse(rowCache.canCacheRow(get, region));
+  }
+
+  @Test
+  public void testTtl() {
+    ColumnFamilyDescriptor cfd1;
+    ColumnFamilyDescriptor cfd2;
+    TableDescriptor td;
+    Region region = Mockito.mock(Region.class);
+    Configuration conf = HBaseConfiguration.create();
+    conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f);
+    Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf);
+    RowCache rowCache = new RowCache(conf);
+
+    Get get = new Get(ROW_KEY);
+    get.addFamily(CF1);
+    get.addFamily(CF2);
+
+    // Ttl is set, expect false
+    cfd1 = 
ColumnFamilyDescriptorBuilder.newBuilder(CF1).setTimeToLive(1).build();
+    cfd2 = ColumnFamilyDescriptorBuilder.newBuilder(CF2).build();
+    td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true)
+      .setColumnFamily(cfd1).setColumnFamily(cfd2).build();
+    Mockito.when(region.getTableDescriptor()).thenReturn(td);
+    
Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled());
+    Assert.assertFalse(rowCache.canCacheRow(get, region));
+
+    // Ttl is not set, expect true
+    cfd1 = ColumnFamilyDescriptorBuilder.newBuilder(CF1).build();
+    td = TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true)
+      .setColumnFamily(cfd1).setColumnFamily(cfd2).build();
+    Mockito.when(region.getTableDescriptor()).thenReturn(td);
+    
Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled());
+    Assert.assertTrue(rowCache.canCacheRow(get, region));
+  }
+
+  @Test
+  public void testFilter() {
+    testWith(
+      get -> get.setFilter(new RowFilter(CompareOperator.EQUAL, new 
BinaryComparator(ROW_KEY))));
+  }
+
+  @Test
+  public void testCacheBlock() {
+    testWith(get -> get.setCacheBlocks(false));
+  }
+
+  @Test
+  public void testAttribute() {
+    testWith(get -> get.setAttribute("test", "value".getBytes()));
+  }
+
+  @Test
+  public void testCheckExistenceOnly() {
+    testWith(get -> get.setCheckExistenceOnly(true));
+  }
+
+  @Test
+  public void testColumnFamilyTimeRange() {
+    testWith(get -> get.setColumnFamilyTimeRange(CF1, 1000, 2000));
+  }
+
+  @Test
+  public void testConsistency() {
+    testWith(get -> get.setConsistency(Consistency.TIMELINE));
+  }
+
+  @Test
+  public void testAuthorizations() {
+    testWith(get -> get.setAuthorizations(new Authorizations("foo")));
+  }
+
+  @Test
+  public void testId() {
+    testWith(get -> get.setId("test"));
+  }
+
+  @Test
+  public void testIsolationLevel() {
+    testWith(get -> get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED));
+  }
+
+  @Test
+  public void testMaxResultsPerColumnFamily() {
+    testWith(get -> get.setMaxResultsPerColumnFamily(2));
+  }
+
+  @Test
+  public void testReplicaId() {
+    testWith(get -> get.setReplicaId(1));
+  }
+
+  @Test
+  public void testRowOffsetPerColumnFamily() {
+    testWith(get -> get.setRowOffsetPerColumnFamily(1));
+  }
+
+  @Test
+  public void testTimeRange() {
+    testWith(get -> {
+      try {
+        return get.setTimeRange(1, 2);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @Test
+  public void testTimestamp() {
+    testWith(get -> get.setTimestamp(1));
+  }
+
+  private static void testWith(Function<Get, Get> func) {
+    Region region = Mockito.mock(Region.class);
+    ColumnFamilyDescriptor cfd = 
ColumnFamilyDescriptorBuilder.newBuilder(CF1).build();
+    TableDescriptor td = 
TableDescriptorBuilder.newBuilder(TABLE_NAME).setRowCacheEnabled(true)
+      .setColumnFamily(cfd).build();
+    Mockito.when(region.getTableDescriptor()).thenReturn(td);
+    
Mockito.when(region.isRowCacheEnabled()).thenReturn(td.getRowCacheEnabled());
+
+    Configuration conf = HBaseConfiguration.create();
+    conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f);
+    Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf);
+    RowCache rowCache = new RowCache(conf);
+
+    Get get = new Get(ROW_KEY);
+    get.addFamily(CF1);
+    Assert.assertTrue(rowCache.canCacheRow(get, region));
+
+    // noinspection unused
+    var unused = func.apply(get);
+
+    // expect false
+    Assert.assertFalse(rowCache.canCacheRow(get, region));
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheConfiguration.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheConfiguration.java
new file mode 100644
index 00000000000..02bba6fddf8
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestRowCacheConfiguration {
+  private static final byte[] CF1 = "cf1".getBytes();
+  private static final TableName TABLE_NAME = TableName.valueOf("table");
+  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+  @Test
+  public void testDetermineRowCacheEnabled() throws IOException {
+    Configuration conf = TEST_UTIL.getConfiguration();
+
+    HRegion region;
+
+    // Set global config to false
+    conf.setBoolean(HConstants.ROW_CACHE_ENABLED_KEY, false);
+
+    region = createRegion(null);
+    assertFalse(region.checkRowCacheConfig());
+
+    region = createRegion(false);
+    assertFalse(region.checkRowCacheConfig());
+
+    region = createRegion(true);
+    assertTrue(region.checkRowCacheConfig());
+
+    // Set global config to true
+    conf.setBoolean(HConstants.ROW_CACHE_ENABLED_KEY, true);
+
+    region = createRegion(null);
+    assertTrue(region.checkRowCacheConfig());
+
+    region = createRegion(false);
+    assertFalse(region.checkRowCacheConfig());
+
+    region = createRegion(true);
+    assertTrue(region.checkRowCacheConfig());
+  }
+
+  private HRegion createRegion(Boolean rowCacheEnabled) throws IOException {
+    ColumnFamilyDescriptor cfd = 
ColumnFamilyDescriptorBuilder.newBuilder(CF1).build();
+    TableDescriptorBuilder tdb = 
TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(cfd);
+    if (rowCacheEnabled != null) {
+      tdb.setRowCacheEnabled(rowCacheEnabled);
+    }
+    return TEST_UTIL.createLocalHRegion(tdb.build(), "".getBytes(), 
"1".getBytes());
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheEvictOnClose.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheEvictOnClose.java
new file mode 100644
index 00000000000..4b3a1419f93
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheEvictOnClose.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
+import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_EVICT_ON_CLOSE_KEY;
+import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+@RunWith(Parameterized.class)
+public class TestRowCacheEvictOnClose {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRowCacheEvictOnClose.class);
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  private static final byte[] CF1 = Bytes.toBytes("cf1");
+  private static final byte[] Q1 = Bytes.toBytes("q1");
+  private static final byte[] Q2 = Bytes.toBytes("q2");
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Parameterized.Parameter
+  public boolean evictOnClose;
+
+  @Parameterized.Parameters
+  public static List<Object[]> params() {
+    return Arrays.asList(new Object[][] { { true }, { false } });
+  }
+
+  @Test
+  public void testEvictOnClose() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+
+    // Enable row cache
+    conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f);
+    conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f);
+
+    // Set ROW_CACHE_EVICT_ON_CLOSE
+    conf.setBoolean(ROW_CACHE_EVICT_ON_CLOSE_KEY, evictOnClose);
+
+    // Start cluster
+    SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster();
+    cluster.waitForActiveAndReadyMaster();
+    Admin admin = TEST_UTIL.getAdmin();
+
+    RowCache rowCache = 
cluster.getRegionServer(0).getRSRpcServices().getServer().getRowCache();
+
+    // Create table with row cache enabled
+    ColumnFamilyDescriptor cf1 = 
ColumnFamilyDescriptorBuilder.newBuilder(CF1).build();
+    TableName tableName = 
TableName.valueOf(testName.getMethodName().replaceAll("[\\[\\]]", "_"));
+    TableDescriptor td = 
TableDescriptorBuilder.newBuilder(tableName).setRowCacheEnabled(true)
+      .setColumnFamily(cf1).build();
+    admin.createTable(td);
+    Table table = admin.getConnection().getTable(tableName);
+
+    int numRows = 10;
+
+    // Put rows
+    for (int i = 0; i < numRows; i++) {
+      byte[] rowKey = ("row" + i).getBytes();
+      Put put = new Put(rowKey);
+      put.addColumn(CF1, Q1, Bytes.toBytes(0L));
+      put.addColumn(CF1, Q2, "12".getBytes());
+      table.put(put);
+    }
+    // Need to flush because the row cache is not populated when reading only 
from the memstore.
+    admin.flush(tableName);
+
+    // Populate row caches
+    for (int i = 0; i < numRows; i++) {
+      byte[] rowKey = ("row" + i).getBytes();
+      Get get = new Get(rowKey);
+      Result result = table.get(get);
+      assertArrayEquals(rowKey, result.getRow());
+      assertArrayEquals(Bytes.toBytes(0L), result.getValue(CF1, Q1));
+      assertArrayEquals("12".getBytes(), result.getValue(CF1, Q2));
+    }
+
+    // Verify row cache has some entries
+    assertEquals(numRows, rowCache.getCount());
+
+    // Disable table
+    admin.disableTable(tableName);
+
+    // Verify row cache is cleared on table close
+    assertEquals(evictOnClose ? 0 : numRows, rowCache.getCount());
+
+    admin.deleteTable(tableName);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheHRegion.java
new file mode 100644
index 00000000000..a8c59dc6ccb
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheHRegion.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestRowCacheHRegion {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRowCacheHRegion.class);
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  public static final byte[] CF = Bytes.toBytes("cf1");
+
+  @Rule
+  public TestName currentTest = new TestName();
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testOpenHRegion() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    WALFactory walFactory = new WALFactory(conf,
+      ServerName.valueOf(currentTest.getMethodName(), 16010, 
EnvironmentEdgeManager.currentTime())
+        .toString());
+    WAL wal = walFactory.getWAL(null);
+    Path hbaseRootDir = CommonFSUtils.getRootDir(conf);
+    TableName tableName = TableName.valueOf(currentTest.getMethodName());
+    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
+    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
+    HRegionServer regionServer = 
TEST_UTIL.getHBaseCluster().getRegionServer(0);
+    HRegion region = HRegion.openHRegion(conf, FileSystem.get(conf), 
hbaseRootDir, hri, htd, wal,
+      regionServer, null);
+
+    // Verify that rowCacheSeqNum is initialized correctly
+    assertNotEquals(HConstants.NO_SEQNUM, region.getRowCacheSeqNum());
+    assertEquals(region.getOpenSeqNum(), region.getRowCacheSeqNum());
+
+    region.close();
+    walFactory.close();
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithBucketCacheAndDataBlockEncoding.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithBucketCacheAndDataBlockEncoding.java
new file mode 100644
index 00000000000..dafbfbdf6f8
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithBucketCacheAndDataBlockEncoding.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
+import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@Category(MediumTests.class)
+@RunWith(Parameterized.class)
+public class TestRowCacheWithBucketCacheAndDataBlockEncoding {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestRowCacheWithBucketCacheAndDataBlockEncoding.class);
+
+  @Parameterized.Parameter
+  public static boolean uesBucketCache;
+
+  @Parameterized.Parameters
+  public static List<Object[]> params() {
+    return Arrays.asList(new Object[][] { { true }, { false } });
+  }
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final byte[] ROW_KEY = Bytes.toBytes("checkRow");
+  private static final byte[] CF = Bytes.toBytes("CF");
+  private static final byte[] QUALIFIER = Bytes.toBytes("cq");
+  private static final byte[] VALUE = Bytes.toBytes("checkValue");
+  private static HBaseTestingUtil testingUtil;
+  private static Admin admin = null;
+  private static RowCache rowCache;
+
+  @Before
+  public void beforeClass() throws Exception {
+    testingUtil = new HBaseTestingUtil();
+    Configuration conf = testingUtil.getConfiguration();
+
+    // Use bucket cache
+    if (uesBucketCache) {
+      conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 1);
+      conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
+      conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64);
+    }
+
+    // Use row cache
+    conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f);
+    conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f);
+    testingUtil.startMiniCluster();
+    admin = testingUtil.getAdmin();
+
+    rowCache = testingUtil.getHBaseCluster().getRegionServer(0).getRowCache();
+  }
+
+  @After
+  public void afterClass() throws Exception {
+    testingUtil.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testRowCacheNoEncode() throws Exception {
+    testRowCache(name.getMethodName(), DataBlockEncoding.NONE);
+  }
+
+  @Test
+  public void testRowCacheEncode() throws Exception {
+    testRowCache(name.getMethodName(), DataBlockEncoding.FAST_DIFF);
+  }
+
+  private void testRowCache(String methodName, DataBlockEncoding dbe) throws 
Exception {
+    TableName tableName = TableName.valueOf(methodName.replaceAll("[\\[\\]]", 
"_"));
+    try (Table testTable = createTable(tableName, dbe)) {
+      Put put = new Put(ROW_KEY);
+      put.addColumn(CF, QUALIFIER, VALUE);
+      testTable.put(put);
+      admin.flush(testTable.getName());
+
+      long countBase = rowCache.getCount();
+      long hitCountBase = rowCache.getHitCount();
+
+      Result result;
+
+      // First get should not hit the row cache, and populate it
+      Get get = new Get(ROW_KEY);
+      result = testTable.get(get);
+      assertArrayEquals(ROW_KEY, result.getRow());
+      assertArrayEquals(VALUE, result.getValue(CF, QUALIFIER));
+      assertEquals(1, rowCache.getCount() - countBase);
+      assertEquals(0, rowCache.getHitCount() - hitCountBase);
+
+      // Second get should hit the row cache
+      result = testTable.get(get);
+      assertArrayEquals(ROW_KEY, result.getRow());
+      assertArrayEquals(VALUE, result.getValue(CF, QUALIFIER));
+      assertEquals(1, rowCache.getCount() - countBase);
+      assertEquals(1, rowCache.getHitCount() - hitCountBase);
+    }
+  }
+
+  private Table createTable(TableName tableName, DataBlockEncoding dbe) throws 
IOException {
+    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).setBlocksize(100)
+        .setDataBlockEncoding(dbe).build())
+      .setRowCacheEnabled(true).build();
+    return testingUtil.createTable(td, null);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithMock.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithMock.java
new file mode 100644
index 00000000000..dff476d76b5
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheWithMock.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestRowCacheWithMock {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRowCacheWithMock.class);
+
+  @Test
+  public void testBarrier() throws IOException {
+    // Mocking dependencies to create rowCache instance
+    RegionInfo regionInfo = Mockito.mock(RegionInfo.class);
+    Mockito.when(regionInfo.getEncodedName()).thenReturn("region1");
+    TableName tableName = TableName.valueOf("table1");
+    Mockito.when(regionInfo.getTable()).thenReturn(tableName);
+
+    List<HStore> stores = new ArrayList<>();
+    HStore hStore = Mockito.mock(HStore.class);
+    Mockito.when(hStore.getStorefilesCount()).thenReturn(2);
+    stores.add(hStore);
+
+    ColumnFamilyDescriptor cfd = 
ColumnFamilyDescriptorBuilder.newBuilder("CF1".getBytes()).build();
+    TableDescriptor td = Mockito.mock(TableDescriptor.class);
+    Mockito.when(td.getColumnFamilies()).thenReturn(new 
ColumnFamilyDescriptor[] { cfd });
+
+    byte[] rowKey = "row".getBytes();
+    Get get = new Get(rowKey);
+    Scan scan = new Scan(get);
+    List<Cell> results = new ArrayList<>();
+
+    RegionScannerImpl regionScanner = Mockito.mock(RegionScannerImpl.class);
+
+    RpcCallContext context = Mockito.mock(RpcCallContext.class);
+    Mockito.when(context.getBlockBytesScanned()).thenReturn(1L);
+
+    Configuration conf = HBaseConfiguration.create();
+    conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f);
+
+    RowCache rowCache = new RowCache(conf);
+
+    HRegion region = Mockito.mock(HRegion.class);
+    Mockito.doCallRealMethod().when(region).setRowCache(Mockito.any());
+    region.setRowCache(rowCache);
+    Mockito.when(region.getRegionInfo()).thenReturn(regionInfo);
+    Mockito.when(region.getTableDescriptor()).thenReturn(td);
+    Mockito.when(region.getStores()).thenReturn(stores);
+    Mockito.when(region.getScanner(scan)).thenReturn(regionScanner);
+    Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf);
+    Mockito.when(region.isRowCacheEnabled()).thenReturn(true);
+    Mockito.when(region.getScannerWithResults(get, scan, results, 
context)).thenCallRealMethod();
+
+    RowCacheKey key = new RowCacheKey(region, rowKey);
+    String encodedRegionName = region.getRegionInfo().getEncodedName();
+    results.add(KeyValueTestUtil.create("row", "CF", "q1", 1, "v1"));
+
+    // Verify that row cache populated before creating a row level barrier
+    region.getScannerWithResults(get, scan, results, context);
+    assertNotNull(rowCache.getRow(key));
+    assertNull(rowCache.getRowLevelBarrier(key));
+
+    // Evict the row cache
+    rowCache.evictRow(key);
+    assertNull(rowCache.getRow(key));
+
+    // Create a row level barrier for the row key
+    rowCache.createRowLevelBarrier(key);
+    assertEquals(1, rowCache.getRowLevelBarrier(key).get());
+
+    // Verify that no row cache populated after creating a row level barrier
+    region.getScannerWithResults(get, scan, results, context);
+    assertNull(rowCache.getRow(key));
+
+    // Remove the row level barrier
+    rowCache.removeRowLevelBarrier(key);
+    assertNull(rowCache.getRowLevelBarrier(key));
+
+    // Verify that row cache populated before creating a table level barrier
+    region.getScannerWithResults(get, scan, results, context);
+    assertNotNull(rowCache.getRow(key));
+    assertNull(rowCache.getRegionLevelBarrier(encodedRegionName));
+
+    // Evict the row cache
+    rowCache.evictRow(key);
+    assertNull(rowCache.getRow(key));
+
+    // Create a table level barrier for the row key
+    rowCache.createRegionLevelBarrier(encodedRegionName);
+    assertEquals(1, rowCache.getRegionLevelBarrier(encodedRegionName).get());
+
+    // Verify that no row cache populated after creating a table level barrier
+    region.getScannerWithResults(get, scan, results, context);
+    assertNull(rowCache.getRow(key));
+
+    // Remove the table level barrier
+    rowCache.removeRegionLevelBarrier(encodedRegionName);
+    assertNull(rowCache.getRegionLevelBarrier(encodedRegionName));
+  }
+
+  @Test
+  public void testMutate() throws IOException, ServiceException {
+    // Mocking RowCache and its dependencies
+    TableDescriptor tableDescriptor = Mockito.mock(TableDescriptor.class);
+
+    RegionInfo regionInfo = Mockito.mock(RegionInfo.class);
+    Mockito.when(regionInfo.getEncodedName()).thenReturn("region1");
+
+    RowCache rowCache = Mockito.mock(RowCache.class);
+
+    RegionServerServices rss = Mockito.mock(RegionServerServices.class);
+    Mockito.when(rss.getRowCache()).thenReturn(rowCache);
+
+    HRegion region = Mockito.mock(HRegion.class);
+    Mockito.doCallRealMethod().when(region).setRowCache(Mockito.any());
+    region.setRowCache(rowCache);
+    Mockito.when(region.getTableDescriptor()).thenReturn(tableDescriptor);
+    Mockito.when(region.getRegionInfo()).thenReturn(regionInfo);
+    
Mockito.when(region.getBlockCache()).thenReturn(Mockito.mock(BlockCache.class));
+    Mockito.when(region.isRowCacheEnabled()).thenReturn(true);
+    Mockito.when(region.getRegionServerServices()).thenReturn(rss);
+
+    RSRpcServices rsRpcServices = Mockito.mock(RSRpcServices.class);
+    Mockito.when(rsRpcServices.getRegion(Mockito.any())).thenReturn(region);
+
+    RpcController rpcController = Mockito.mock(RpcController.class);
+
+    CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder("row".getBytes())
+      .ifEquals("CF".getBytes(), "q1".getBytes(), "v1".getBytes()).build(new 
Put("row".getBytes()));
+
+    Put put1 = new Put("row1".getBytes());
+    put1.addColumn("CF".getBytes(), "q1".getBytes(), "v1".getBytes());
+    Put put2 = new Put("row1".getBytes());
+    put2.addColumn("CF".getBytes(), "q1".getBytes(), "v1".getBytes());
+    List<Mutation> mutations = new ArrayList<>();
+    mutations.add(put1);
+    mutations.add(put2);
+
+    Delete del = new Delete("row1".getBytes());
+    Append append = new Append("row1".getBytes());
+    append.addColumn("CF".getBytes(), "q1".getBytes(), "v1".getBytes());
+    Increment increment = new Increment("row1".getBytes());
+    increment.addColumn("CF".getBytes(), "q1".getBytes(), 1L);
+
+    Mutation[] mutationArray = new Mutation[mutations.size()];
+    mutations.toArray(mutationArray);
+
+    // rowCache.mutateWithRowCacheBarrier must run real code so internal calls 
are recorded
+    
Mockito.doCallRealMethod().when(rowCache).mutateWithRowCacheBarrier(Mockito.any(HRegion.class),
+      Mockito.any(byte[].class), Mockito.any());
+    
Mockito.doCallRealMethod().when(rowCache).mutateWithRowCacheBarrier(Mockito.any(HRegion.class),
+      Mockito.anyList(), Mockito.any());
+
+    InOrder inOrder;
+
+    // Put
+    Mockito.doAnswer(invocation -> {
+      Put arg = invocation.getArgument(0);
+      rowCache.mutateWithRowCacheBarrier(region, arg.getRow(), () -> null);
+      return null;
+    }).when(region).put(put1);
+    Mockito.clearInvocations(rowCache);
+    inOrder = Mockito.inOrder(rowCache);
+    region.put(put1);
+    // Verify the sequence of method calls
+    inOrder.verify(rowCache, 
Mockito.times(1)).createRowLevelBarrier(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any());
+    inOrder.verify(rowCache, 
Mockito.times(1)).removeRowLevelBarrier(Mockito.any());
+
+    // Delete
+    Mockito.doAnswer(invocation -> {
+      Delete arg = invocation.getArgument(0);
+      rowCache.mutateWithRowCacheBarrier(region, arg.getRow(), () -> null);
+      return null;
+    }).when(region).delete(del);
+    inOrder = Mockito.inOrder(rowCache);
+    Mockito.clearInvocations(rowCache);
+    region.delete(del);
+    // Verify the sequence of method calls
+    inOrder.verify(rowCache, 
Mockito.times(1)).createRowLevelBarrier(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any());
+    inOrder.verify(rowCache, 
Mockito.times(1)).removeRowLevelBarrier(Mockito.any());
+
+    // Append
+    Mockito.doAnswer(invocation -> {
+      Append arg = invocation.getArgument(0);
+      rowCache.mutateWithRowCacheBarrier(region, arg.getRow(), () -> null);
+      return null;
+    }).when(region).append(append);
+    inOrder = Mockito.inOrder(rowCache);
+    Mockito.clearInvocations(rowCache);
+    region.append(append);
+    // Verify the sequence of method calls
+    inOrder.verify(rowCache, 
Mockito.times(1)).createRowLevelBarrier(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any());
+    inOrder.verify(rowCache, 
Mockito.times(1)).removeRowLevelBarrier(Mockito.any());
+
+    // Increment
+    Mockito.doAnswer(invocation -> {
+      Increment arg = invocation.getArgument(0);
+      rowCache.mutateWithRowCacheBarrier(region, arg.getRow(), () -> null);
+      return null;
+    }).when(region).increment(increment);
+    inOrder = Mockito.inOrder(rowCache);
+    Mockito.clearInvocations(rowCache);
+    region.increment(increment);
+    // Verify the sequence of method calls
+    inOrder.verify(rowCache, 
Mockito.times(1)).createRowLevelBarrier(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any());
+    inOrder.verify(rowCache, 
Mockito.times(1)).removeRowLevelBarrier(Mockito.any());
+
+    // CheckAndMutate
+    Mockito.doAnswer(invocation -> {
+      CheckAndMutate c = invocation.getArgument(0);
+      rowCache.mutateWithRowCacheBarrier(region, c.getRow(), () -> null);
+      return null;
+    }).when(region).checkAndMutate(Mockito.any(CheckAndMutate.class), 
Mockito.anyLong(),
+      Mockito.anyLong());
+    Mockito.clearInvocations(rowCache);
+    inOrder = Mockito.inOrder(rowCache);
+    region.checkAndMutate(checkAndMutate, 0, 0);
+    // Verify the sequence of method calls
+    inOrder.verify(rowCache, 
Mockito.times(1)).createRowLevelBarrier(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any());
+    inOrder.verify(rowCache, 
Mockito.times(1)).removeRowLevelBarrier(Mockito.any());
+
+    // RowMutations
+    Mockito.doAnswer(invocation -> {
+      List<Mutation> muts = invocation.getArgument(0);
+      rowCache.mutateWithRowCacheBarrier(region, muts, () -> null);
+      return null;
+    }).when(region).checkAndMutate(Mockito.anyList(), 
Mockito.any(CheckAndMutate.class),
+      Mockito.anyLong(), Mockito.anyLong());
+    Mockito.clearInvocations(rowCache);
+    inOrder = Mockito.inOrder(rowCache);
+    region.checkAndMutate(mutations, checkAndMutate, 0, 0);
+    // Verify the sequence of method calls
+    inOrder.verify(rowCache, 
Mockito.times(1)).createRowLevelBarrier(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any());
+    inOrder.verify(rowCache, 
Mockito.times(1)).removeRowLevelBarrier(Mockito.any());
+
+    // Batch
+    Mockito.doAnswer(invocation -> {
+      Mutation[] muts = invocation.getArgument(0);
+      rowCache.mutateWithRowCacheBarrier(region, Arrays.asList(muts), () -> 
null);
+      return null;
+    }).when(region).batchMutate(Mockito.any(Mutation[].class), 
Mockito.anyBoolean(),
+      Mockito.anyLong(), Mockito.anyLong());
+    Mockito.clearInvocations(rowCache);
+    inOrder = Mockito.inOrder(rowCache);
+    region.batchMutate(mutationArray, true, 0, 0);
+    // Verify the sequence of method calls
+    inOrder.verify(rowCache, 
Mockito.times(1)).createRowLevelBarrier(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).evictRow(Mockito.any());
+    inOrder.verify(rowCache, Mockito.times(1)).execute(Mockito.any());
+    inOrder.verify(rowCache, 
Mockito.times(1)).removeRowLevelBarrier(Mockito.any());
+
+    // Bulkload
+    HBaseProtos.RegionSpecifier regionSpecifier = 
HBaseProtos.RegionSpecifier.newBuilder()
+      .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)
+      .setValue(ByteString.copyFrom("region".getBytes())).build();
+    ClientProtos.BulkLoadHFileRequest bulkLoadRequest =
+      
ClientProtos.BulkLoadHFileRequest.newBuilder().setRegion(regionSpecifier).build();
+    
Mockito.doCallRealMethod().when(rsRpcServices).bulkLoadHFile(rpcController, 
bulkLoadRequest);
+    Mockito.clearInvocations(rowCache);
+    inOrder = Mockito.inOrder(rowCache);
+    rsRpcServices.bulkLoadHFile(rpcController, bulkLoadRequest);
+    // Verify the sequence of method calls
+    inOrder.verify(rowCache, 
Mockito.times(1)).createRegionLevelBarrier(Mockito.any());
+    inOrder.verify(rowCache, 
Mockito.times(1)).increaseRowCacheSeqNum(Mockito.any());
+    inOrder.verify(rowCache, 
Mockito.times(1)).removeRegionLevelBarrier(Mockito.any());
+  }
+
+  @Test
+  public void testCaching() throws IOException {
+    // Mocking dependencies to create RowCache instance
+    RegionInfo regionInfo = Mockito.mock(RegionInfo.class);
+    Mockito.when(regionInfo.getEncodedName()).thenReturn("region1");
+    TableName tableName = TableName.valueOf("table1");
+    Mockito.when(regionInfo.getTable()).thenReturn(tableName);
+
+    List<HStore> stores = new ArrayList<>();
+    HStore hStore = Mockito.mock(HStore.class);
+    Mockito.when(hStore.getStorefilesCount()).thenReturn(2);
+    stores.add(hStore);
+
+    ColumnFamilyDescriptor cfd = 
ColumnFamilyDescriptorBuilder.newBuilder("CF1".getBytes()).build();
+    TableDescriptor td = Mockito.mock(TableDescriptor.class);
+    Mockito.when(td.getColumnFamilies()).thenReturn(new 
ColumnFamilyDescriptor[] { cfd });
+
+    RpcCallContext context = Mockito.mock(RpcCallContext.class);
+    Mockito.when(context.getBlockBytesScanned()).thenReturn(1L);
+
+    byte[] rowKey = "row".getBytes();
+    RegionScannerImpl regionScanner = Mockito.mock(RegionScannerImpl.class);
+
+    Get get = new Get(rowKey);
+    Scan scan = new Scan(get);
+
+    Configuration conf = HBaseConfiguration.create();
+    conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f);
+    RowCache rowCache = new RowCache(conf);
+
+    HRegion region = Mockito.mock(HRegion.class);
+    Mockito.doCallRealMethod().when(region).setRowCache(Mockito.any());
+    region.setRowCache(rowCache);
+    Mockito.when(region.getRegionInfo()).thenReturn(regionInfo);
+    Mockito.when(region.getTableDescriptor()).thenReturn(td);
+    Mockito.when(region.getStores()).thenReturn(stores);
+    Mockito.when(region.getScanner(scan)).thenReturn(regionScanner);
+    Mockito.when(region.getReadOnlyConfiguration()).thenReturn(conf);
+    Mockito.when(region.isRowCacheEnabled()).thenReturn(true);
+    Mockito.when(region.getScannerWithResults(Mockito.any(Get.class), 
Mockito.any(Scan.class),
+      Mockito.anyList(), Mockito.any())).thenCallRealMethod();
+
+    RowCacheKey key = new RowCacheKey(region, rowKey);
+    List<Cell> results = new ArrayList<>();
+    results.add(KeyValueTestUtil.create("row", "CF", "q1", 1, "v1"));
+
+    // Verify that row cache populated with caching=false
+    // This should be called first not to populate the row cache
+    get.setCacheBlocks(false);
+    region.getScannerWithResults(get, scan, results, context);
+    assertNull(rowCache.getRow(key));
+    assertNull(rowCache.getRow(key));
+
+    // Verify that row cache populated with caching=true
+    get.setCacheBlocks(true);
+    region.getScannerWithResults(get, scan, results, context);
+    assertNotNull(rowCache.getRow(key, true));
+    assertNull(rowCache.getRow(key, false));
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestRowCacheBulkLoadHFiles.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestRowCacheBulkLoadHFiles.java
new file mode 100644
index 00000000000..c5a62935e5e
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestRowCacheBulkLoadHFiles.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
+import static org.apache.hadoop.hbase.HConstants.ROW_CACHE_SIZE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RowCache;
+import org.apache.hadoop.hbase.regionserver.RowCacheKey;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ MiscTests.class, MediumTests.class })
+public class TestRowCacheBulkLoadHFiles {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRowCacheBulkLoadHFiles.class);
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  private static Admin admin;
+
+  final static int NUM_CFS = 2;
+  final static byte[] QUAL = Bytes.toBytes("qual");
+  final static int ROWCOUNT = 10;
+
+  private TableName tableName;
+  private Table table;
+  private HRegion[] regions;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  static String family(int i) {
+    return String.format("family_%04d", i);
+  }
+
+  public static void buildHFiles(FileSystem fs, Path dir) throws IOException {
+    byte[] val = "value".getBytes();
+    for (int i = 0; i < NUM_CFS; i++) {
+      Path testIn = new Path(dir, family(i));
+
+      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
+        Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
+    }
+  }
+
+  private TableDescriptor createTableDesc(TableName name) {
+    TableDescriptorBuilder builder =
+      TableDescriptorBuilder.newBuilder(name).setRowCacheEnabled(true);
+    IntStream.range(0, NUM_CFS).mapToObj(i -> 
ColumnFamilyDescriptorBuilder.of(family(i)))
+      .forEachOrdered(builder::setColumnFamily);
+    return builder.build();
+  }
+
+  private Path buildBulkFiles(TableName table) throws Exception {
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS(table.getNameAsString());
+    Path bulk1 = new Path(dir, table.getNameAsString());
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    buildHFiles(fs, bulk1);
+    return bulk1;
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+
+    // Enable row cache but reduce the block cache size to fit in 80% of the 
heap
+    conf.setFloat(ROW_CACHE_SIZE_KEY, 0.01f);
+    conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f);
+
+    TEST_UTIL.startMiniCluster(1);
+    admin = TEST_UTIL.getAdmin();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws Exception {
+    tableName = TableName.valueOf(testName.getMethodName());
+    // Split the table into 2 regions
+    byte[][] splitKeys = new byte[][] { 
TestHRegionServerBulkLoad.rowkey(ROWCOUNT) };
+    admin.createTable(createTableDesc(tableName), splitKeys);
+    table = TEST_UTIL.getConnection().getTable(tableName);
+    // Sorted by region name
+    regions = 
TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegions().stream()
+      .filter(r -> r.getRegionInfo().getTable().equals(tableName))
+      .sorted(Comparator.comparing(r -> 
r.getRegionInfo().getRegionNameAsString()))
+      .toArray(HRegion[]::new);
+  }
+
+  @After
+  public void after() throws Exception {
+    if (admin.tableExists(tableName)) {
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testRowCache() throws Exception {
+    RowCache rowCache =
+      
TEST_UTIL.getHBaseCluster().getRegionServer(0).getRSRpcServices().getServer().getRowCache();
+
+    // The region to be bulk-loaded
+    byte[] rowKeyRegion0 = TestHRegionServerBulkLoad.rowkey(0);
+    // The region not to be bulk-loaded
+    byte[] rowKeyRegion1 = TestHRegionServerBulkLoad.rowkey(ROWCOUNT);
+
+    // Put a row into each region to populate the row cache
+    Put put0 = new Put(rowKeyRegion0);
+    put0.addColumn(family(0).getBytes(), "q1".getBytes(), "value".getBytes());
+    table.put(put0);
+    Put put1 = new Put(rowKeyRegion1);
+    put1.addColumn(family(0).getBytes(), "q1".getBytes(), "value".getBytes());
+    table.put(put1);
+    admin.flush(tableName);
+
+    // Ensure each region has a row cache
+    Get get0 = new Get(rowKeyRegion0);
+    Result result0 = table.get(get0);
+    assertNotNull(result0);
+    RowCacheKey keyPrev0 = new RowCacheKey(regions[0], get0.getRow());
+    assertNotNull(rowCache.getRow(keyPrev0));
+    Get get1 = new Get(rowKeyRegion1);
+    Result result1 = table.get(get1);
+    assertNotNull(result1);
+    RowCacheKey keyPrev1 = new RowCacheKey(regions[1], get1.getRow());
+    assertNotNull(rowCache.getRow(keyPrev1));
+
+    // Do bulkload to region0 only
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
+    Path dir = buildBulkFiles(tableName);
+    loader.bulkLoad(tableName, dir);
+
+    // Ensure the row cache is removed after bulkload for region0
+    RowCacheKey keyCur0 = new RowCacheKey(regions[0], get0.getRow());
+    assertNotEquals(keyPrev0, keyCur0);
+    assertNull(rowCache.getRow(keyCur0));
+    // Ensure the row cache for keyPrev0 still exists, but it is not used 
anymore.
+    assertNotNull(rowCache.getRow(keyPrev0));
+
+    // Ensure the row cache for region1 is not affected
+    RowCacheKey keyCur1 = new RowCacheKey(regions[1], get1.getRow());
+    assertEquals(keyPrev1, keyCur1);
+    assertNotNull(rowCache.getRow(keyCur1));
+  }
+}

Reply via email to