This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new af59297f6f6 HBASE-29742 Compaction scan returns single cells instead
of rows after 10MB (#7562)
af59297f6f6 is described below
commit af59297f6f67e96a6b64577e3777753038f12f40
Author: Duo Zhang <[email protected]>
AuthorDate: Mon Jan 5 22:28:25 2026 +0800
HBASE-29742 Compaction scan returns single cells instead of rows after 10MB
(#7562)
Signed-off-by: Istvan Toth <[email protected]>
(cherry picked from commit 2ec2d750579c960a5829ce8e52efb08b803372af)
---
.../hadoop/hbase/mob/DefaultMobStoreCompactor.java | 5 ++-
.../hadoop/hbase/regionserver/ScannerContext.java | 12 ++++++
.../hbase/regionserver/compactions/Compactor.java | 49 ++++++++++++++--------
.../hadoop/hbase/mob/FaultyMobStoreCompactor.java | 1 +
4 files changed, 47 insertions(+), 20 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index f0beea64761..87248e7bab6 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -351,8 +351,8 @@ public class DefaultMobStoreCompactor extends
DefaultCompactor {
.build();
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ?
(KeyValueScanner) scanner : null;
- long shippedCallSizeLimit =
- (long) request.getFiles().size() *
this.store.getColumnFamilyDescriptor().getBlocksize();
+ long shippedCallSizeLimit = Math.min(compactScannerSizeLimit,
+ (long) request.getFiles().size() *
this.store.getColumnFamilyDescriptor().getBlocksize());
ExtendedCell mobCell = null;
List<String> committedMobWriterFileNames = new ArrayList<>();
@@ -557,6 +557,7 @@ public class DefaultMobStoreCompactor extends
DefaultCompactor {
if (kvs != null && bytesWrittenProgressForShippedCall >
shippedCallSizeLimit) {
((ShipperListener) writer).beforeShipped();
kvs.shipped();
+ scannerContext.clearBlockSizeProgress();
bytesWrittenProgressForShippedCall = 0;
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index c681e91c615..e8e7b6788f2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.errorprone.annotations.RestrictedApi;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -266,6 +267,17 @@ public class ScannerContext {
progress.setFields(0, 0, 0, getBlockSizeProgress());
}
+ /**
+ * Clear away the block size progress. Mainly used in compaction, as we will
use a single
+ * ScannerContext across all the compaction lifetime, and we will call
Shipper.shipped to clear
+ * the block reference, so it is safe to clear the block size progress in
compaction.
+ */
+ @RestrictedApi(explanation = "Should only be called in Compactor", link = "",
+ allowedOnPath =
".*/org/apache/hadoop/hbase/.*/*Compactor.java|.*/src/test/.*")
+ public void clearBlockSizeProgress() {
+ progress.setBlockSize(0);
+ }
+
/**
* Note that this is not a typical setter. This setter returns the {@link
NextState} that was
* passed in so that methods can be invoked against the new state.
Furthermore, this pattern
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 069968294b8..22b2a54693c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -445,8 +445,14 @@ public abstract class Compactor<T extends CellSink> {
throughputController.start(compactionName);
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
- long shippedCallSizeLimit =
- (long) request.getFiles().size() *
this.store.getColumnFamilyDescriptor().getBlocksize();
+ // when hitting the block size limit, i.e, compactScannerSizeLimit, we
need to reset the block
+ // size progress otherwise the scanner will only return 1 cell in the next
method because all
+ // the block scanned are still referenced, so we need to call shipped to
release them.
+ // Usually compactScannerSizeLimit will be much greater than blockSize *
fileSize, but anyway,
+ // let's use Math.min for safety.
+ // See HBASE-29742
+ long shippedCallSizeLimit = Math.min(compactScannerSizeLimit,
+ (long) request.getFiles().size() *
this.store.getColumnFamilyDescriptor().getBlocksize());
try {
do {
// InternalScanner is for CPs so we do not want to leak ExtendedCell
to the interface, but
@@ -488,25 +494,32 @@ public abstract class Compactor<T extends CellSink> {
}
}
writer.appendAll(cells);
- if (shipper != null && bytesWrittenProgressForShippedCall >
shippedCallSizeLimit) {
- if (lastCleanCell != null) {
- // HBASE-16931, set back sequence id to avoid affecting scan order
unexpectedly.
- // ShipperListener will do a clone of the last cells it refer, so
need to set back
- // sequence id before ShipperListener.beforeShipped
- PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
+ if (bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
+ if (shipper != null) {
+ if (lastCleanCell != null) {
+ // HBASE-16931, set back sequence id to avoid affecting scan
order unexpectedly.
+ // ShipperListener will do a clone of the last cells it refer,
so need to set back
+ // sequence id before ShipperListener.beforeShipped
+ PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
+ }
+ // Clone the cells that are in the writer so that they are freed
of references,
+ // if they are holding any.
+ ((ShipperListener) writer).beforeShipped();
+ // The SHARED block references, being read for compaction, will be
kept in prevBlocks
+ // list(See HFileScannerImpl#prevBlocks). In case of scan flow,
after each set of cells
+ // being returned to client, we will call shipped() which can
clear this list. Here by
+ // we are doing the similar thing. In between the compaction
(after every N cells
+ // written with collective size of 'shippedCallSizeLimit') we will
call shipped which
+ // may clear prevBlocks list.
+ shipper.shipped();
}
- // Clone the cells that are in the writer so that they are freed of
references,
- // if they are holding any.
- ((ShipperListener) writer).beforeShipped();
- // The SHARED block references, being read for compaction, will be
kept in prevBlocks
- // list(See HFileScannerImpl#prevBlocks). In case of scan flow,
after each set of cells
- // being returned to client, we will call shipped() which can clear
this list. Here by
- // we are doing the similar thing. In between the compaction (after
every N cells
- // written with collective size of 'shippedCallSizeLimit') we will
call shipped which
- // may clear prevBlocks list.
- shipper.shipped();
+ // clear the block progress in ScannerContext, so we can reuse it.
In normal rpc call,
+ // ScannerContext will be dropped after shipping, so we do not need
to clear the block
+ // progress there
+ scannerContext.clearBlockSizeProgress();
bytesWrittenProgressForShippedCall = 0;
}
+
if (lastCleanCell != null) {
// HBASE-16931, set back sequence id to avoid affecting scan order
unexpectedly
PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
index 4dd5ad1156a..b08a44c04e7 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
@@ -331,6 +331,7 @@ public class FaultyMobStoreCompactor extends
DefaultMobStoreCompactor {
if (kvs != null && bytesWrittenProgressForShippedCall >
shippedCallSizeLimit) {
((ShipperListener) writer).beforeShipped();
kvs.shipped();
+ scannerContext.clearBlockSizeProgress();
bytesWrittenProgressForShippedCall = 0;
}
}