This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 61e4e36c949 HBASE-29254 StoreScanner returns incorrect row after flush
due to topChanged behavior (#6900)
61e4e36c949 is described below
commit 61e4e36c949863f30347a44df27bcb530b028861
Author: Minwoo Kang <[email protected]>
AuthorDate: Wed Apr 30 18:22:08 2025 +0900
HBASE-29254 StoreScanner returns incorrect row after flush due to
topChanged behavior (#6900)
Signed-off-by: Duo Zhang <[email protected]>
(cherry picked from commit 99bd5b57c1edc0800d0c8cd54e1917f6a3aa5c6b)
(cherry picked from commit 524825e28be4ab64dbd8330da22dfe5e0de464d8)
---
.../hadoop/hbase/regionserver/StoreScanner.java | 11 ++-
.../hadoop/hbase/regionserver/TestHStore.java | 98 ++++++++++++++++++++++
2 files changed, 103 insertions(+), 6 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index bc67b69b850..cbc617d2a0b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -736,7 +736,7 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
}
matcher.clearCurrentRow();
seekOrSkipToNextRow(cell);
- NextState stateAfterSeekNextRow = needToReturn(outResult);
+ NextState stateAfterSeekNextRow = needToReturn();
if (stateAfterSeekNextRow != null) {
return
scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
}
@@ -744,7 +744,7 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
case SEEK_NEXT_COL:
seekOrSkipToNextColumn(cell);
- NextState stateAfterSeekNextColumn = needToReturn(outResult);
+ NextState stateAfterSeekNextColumn = needToReturn();
if (stateAfterSeekNextColumn != null) {
return
scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
}
@@ -762,7 +762,7 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
((!scan.isReversed() && difference > 0) || (scan.isReversed()
&& difference < 0))
) {
seekAsDirection(nextKV);
- NextState stateAfterSeekByHint = needToReturn(outResult);
+ NextState stateAfterSeekByHint = needToReturn();
if (stateAfterSeekByHint != null) {
return
scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
}
@@ -819,11 +819,10 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
* memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If
the row of top cell
* is changed, we should return the current cells. Otherwise, we may return
the cells across
* different rows.
- * @param outResult the cells which are visible for user scan
* @return null is the top cell doesn't change. Otherwise, the NextState to
return
*/
- private NextState needToReturn(List<Cell> outResult) {
- if (!outResult.isEmpty() && topChanged) {
+ private NextState needToReturn() {
+ if (topChanged) {
return heap.peek() == null ? NextState.NO_MORE_VALUES :
NextState.MORE_VALUES;
}
return null;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 83856cda513..eca38d0bc23 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.IntBinaryOperator;
+import java.util.function.IntConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -1273,6 +1275,12 @@ public class TestHStore {
return c;
}
+ private ExtendedCell createDeleteCell(byte[] row, byte[] qualifier, long ts,
long sequenceId) {
+ return
ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
+
.setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.DeleteColumn)
+ .setSequenceId(sequenceId).build();
+ }
+
@Test
public void testFlushBeforeCompletingScanWoFilter() throws IOException,
InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
@@ -1417,6 +1425,74 @@ public class TestHStore {
}
}
+ @Test
+ public void testFlushBeforeCompletingScanWithDeleteCell() throws IOException
{
+ final Configuration conf = HBaseConfiguration.create();
+
+ byte[] r1 = Bytes.toBytes("row1");
+ byte[] r2 = Bytes.toBytes("row2");
+
+ byte[] value1 = Bytes.toBytes("value1");
+ byte[] value2 = Bytes.toBytes("value2");
+
+ final MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
+ final long ts = EnvironmentEdgeManager.currentTime();
+ final long seqId = 100;
+
+ init(name.getMethodName(), conf,
TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
+
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
+ new MyStoreHook() {
+ @Override
+ long getSmallestReadPoint(HStore store) {
+ return seqId + 3;
+ }
+ });
+
+ store.add(createCell(r1, qf1, ts + 1, seqId + 1, value2), memStoreSizing);
+ store.add(createCell(r1, qf2, ts + 1, seqId + 1, value2), memStoreSizing);
+ store.add(createCell(r1, qf3, ts + 1, seqId + 1, value2), memStoreSizing);
+
+ store.add(createDeleteCell(r1, qf1, ts + 2, seqId + 2), memStoreSizing);
+ store.add(createDeleteCell(r1, qf2, ts + 2, seqId + 2), memStoreSizing);
+ store.add(createDeleteCell(r1, qf3, ts + 2, seqId + 2), memStoreSizing);
+
+ store.add(createCell(r2, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
+ store.add(createCell(r2, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
+ store.add(createCell(r2, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
+
+ Scan scan = new Scan().withStartRow(r1);
+
+ try (final InternalScanner scanner =
+ new StoreScanner(store, store.getScanInfo(), scan, null, seqId + 3) {
+ @Override
+ protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner>
scanners,
+ CellComparator comparator) throws IOException {
+ return new MyKeyValueHeap(scanners, comparator,
recordBlockSizeCallCount -> {
+ if (recordBlockSizeCallCount == 6) {
+ try {
+ flushStore(store, id++);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+ }) {
+ List<Cell> cellResult = new ArrayList<>();
+
+ scanner.next(cellResult);
+ assertEquals(0, cellResult.size());
+
+ cellResult.clear();
+
+ scanner.next(cellResult);
+ assertEquals(3, cellResult.size());
+ for (Cell cell : cellResult) {
+ assertArrayEquals(r2, CellUtil.cloneRow(cell));
+ }
+ }
+ }
+
@Test
public void testCreateScannerAndSnapshotConcurrently() throws IOException,
InterruptedException {
Configuration conf = HBaseConfiguration.create();
@@ -3134,6 +3210,28 @@ public class TestHStore {
}
}
+ private interface MyKeyValueHeapHook {
+ void onRecordBlockSize(int recordBlockSizeCallCount);
+ }
+
+ private static class MyKeyValueHeap extends KeyValueHeap {
+ private final MyKeyValueHeapHook hook;
+ private int recordBlockSizeCallCount;
+
+ public MyKeyValueHeap(List<? extends KeyValueScanner> scanners,
CellComparator comparator,
+ MyKeyValueHeapHook hook) throws IOException {
+ super(scanners, comparator);
+ this.hook = hook;
+ }
+
+ @Override
+ public void recordBlockSize(IntConsumer blockSizeConsumer) {
+ recordBlockSizeCallCount++;
+ hook.onRecordBlockSize(recordBlockSizeCallCount);
+ super.recordBlockSize(blockSizeConsumer);
+ }
+ }
+
public static class MyCompactingMemStore2 extends CompactingMemStore {
private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";