This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new af59297f6f6 HBASE-29742 Compaction scan returns single cells instead 
of rows after 10MB (#7562)
af59297f6f6 is described below

commit af59297f6f67e96a6b64577e3777753038f12f40
Author: Duo Zhang <[email protected]>
AuthorDate: Mon Jan 5 22:28:25 2026 +0800

    HBASE-29742 Compaction scan returns single cells instead of rows after 10MB 
(#7562)
    
    Signed-off-by: Istvan Toth <[email protected]>
    (cherry picked from commit 2ec2d750579c960a5829ce8e52efb08b803372af)
---
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java |  5 ++-
 .../hadoop/hbase/regionserver/ScannerContext.java  | 12 ++++++
 .../hbase/regionserver/compactions/Compactor.java  | 49 ++++++++++++++--------
 .../hadoop/hbase/mob/FaultyMobStoreCompactor.java  |  1 +
 4 files changed, 47 insertions(+), 20 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index f0beea64761..87248e7bab6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -351,8 +351,8 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
       .build();
     throughputController.start(compactionName);
     KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? 
(KeyValueScanner) scanner : null;
-    long shippedCallSizeLimit =
-      (long) request.getFiles().size() * 
this.store.getColumnFamilyDescriptor().getBlocksize();
+    long shippedCallSizeLimit = Math.min(compactScannerSizeLimit,
+      (long) request.getFiles().size() * 
this.store.getColumnFamilyDescriptor().getBlocksize());
 
     ExtendedCell mobCell = null;
     List<String> committedMobWriterFileNames = new ArrayList<>();
@@ -557,6 +557,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
           if (kvs != null && bytesWrittenProgressForShippedCall > 
shippedCallSizeLimit) {
             ((ShipperListener) writer).beforeShipped();
             kvs.shipped();
+            scannerContext.clearBlockSizeProgress();
             bytesWrittenProgressForShippedCall = 0;
           }
         }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index c681e91c615..e8e7b6788f2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -266,6 +267,17 @@ public class ScannerContext {
     progress.setFields(0, 0, 0, getBlockSizeProgress());
   }
 
+  /**
+   * Clear away the block size progress. Mainly used in compaction, as we will 
use a single
+   * ScannerContext across all the compaction lifetime, and we will call 
Shipper.shipped to clear
+   * the block reference, so it is safe to clear the block size progress in 
compaction.
+   */
+  @RestrictedApi(explanation = "Should only be called in Compactor", link = "",
+      allowedOnPath = 
".*/org/apache/hadoop/hbase/.*/*Compactor.java|.*/src/test/.*")
+  public void clearBlockSizeProgress() {
+    progress.setBlockSize(0);
+  }
+
   /**
    * Note that this is not a typical setter. This setter returns the {@link 
NextState} that was
    * passed in so that methods can be invoked against the new state. 
Furthermore, this pattern
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 069968294b8..22b2a54693c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -445,8 +445,14 @@ public abstract class Compactor<T extends CellSink> {
 
     throughputController.start(compactionName);
     Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
-    long shippedCallSizeLimit =
-      (long) request.getFiles().size() * 
this.store.getColumnFamilyDescriptor().getBlocksize();
+    // when hitting the block size limit, i.e, compactScannerSizeLimit, we 
need to reset the block
+    // size progress otherwise the scanner will only return 1 cell in the next 
method because all
+    // the block scanned are still referenced, so we need to call shipped to 
release them.
+    // Usually compactScannerSizeLimit will be much greater than blockSize * 
fileSize, but anyway,
+    // let's use Math.min for safety.
+    // See HBASE-29742
+    long shippedCallSizeLimit = Math.min(compactScannerSizeLimit,
+      (long) request.getFiles().size() * 
this.store.getColumnFamilyDescriptor().getBlocksize());
     try {
       do {
         // InternalScanner is for CPs so we do not want to leak ExtendedCell 
to the interface, but
@@ -488,25 +494,32 @@ public abstract class Compactor<T extends CellSink> {
           }
         }
         writer.appendAll(cells);
-        if (shipper != null && bytesWrittenProgressForShippedCall > 
shippedCallSizeLimit) {
-          if (lastCleanCell != null) {
-            // HBASE-16931, set back sequence id to avoid affecting scan order 
unexpectedly.
-            // ShipperListener will do a clone of the last cells it refer, so 
need to set back
-            // sequence id before ShipperListener.beforeShipped
-            PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
+        if (bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
+          if (shipper != null) {
+            if (lastCleanCell != null) {
+              // HBASE-16931, set back sequence id to avoid affecting scan 
order unexpectedly.
+              // ShipperListener will do a clone of the last cells it refer, 
so need to set back
+              // sequence id before ShipperListener.beforeShipped
+              PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
+            }
+            // Clone the cells that are in the writer so that they are freed 
of references,
+            // if they are holding any.
+            ((ShipperListener) writer).beforeShipped();
+            // The SHARED block references, being read for compaction, will be 
kept in prevBlocks
+            // list(See HFileScannerImpl#prevBlocks). In case of scan flow, 
after each set of cells
+            // being returned to client, we will call shipped() which can 
clear this list. Here by
+            // we are doing the similar thing. In between the compaction 
(after every N cells
+            // written with collective size of 'shippedCallSizeLimit') we will 
call shipped which
+            // may clear prevBlocks list.
+            shipper.shipped();
           }
-          // Clone the cells that are in the writer so that they are freed of 
references,
-          // if they are holding any.
-          ((ShipperListener) writer).beforeShipped();
-          // The SHARED block references, being read for compaction, will be 
kept in prevBlocks
-          // list(See HFileScannerImpl#prevBlocks). In case of scan flow, 
after each set of cells
-          // being returned to client, we will call shipped() which can clear 
this list. Here by
-          // we are doing the similar thing. In between the compaction (after 
every N cells
-          // written with collective size of 'shippedCallSizeLimit') we will 
call shipped which
-          // may clear prevBlocks list.
-          shipper.shipped();
+          // clear the block progress in ScannerContext, so we can reuse it. 
In normal rpc call,
+          // ScannerContext will be dropped after shipping, so we do not need 
to clear the block
+          // progress there
+          scannerContext.clearBlockSizeProgress();
           bytesWrittenProgressForShippedCall = 0;
         }
+
         if (lastCleanCell != null) {
           // HBASE-16931, set back sequence id to avoid affecting scan order 
unexpectedly
           PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
index 4dd5ad1156a..b08a44c04e7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
@@ -331,6 +331,7 @@ public class FaultyMobStoreCompactor extends 
DefaultMobStoreCompactor {
           if (kvs != null && bytesWrittenProgressForShippedCall > 
shippedCallSizeLimit) {
             ((ShipperListener) writer).beforeShipped();
             kvs.shipped();
+            scannerContext.clearBlockSizeProgress();
             bytesWrittenProgressForShippedCall = 0;
           }
         }

Reply via email to