This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 61e4e36c949 HBASE-29254 StoreScanner returns incorrect row after flush 
due to topChanged behavior (#6900)
61e4e36c949 is described below

commit 61e4e36c949863f30347a44df27bcb530b028861
Author: Minwoo Kang <[email protected]>
AuthorDate: Wed Apr 30 18:22:08 2025 +0900

    HBASE-29254 StoreScanner returns incorrect row after flush due to 
topChanged behavior (#6900)
    
    Signed-off-by: Duo Zhang <[email protected]>
    (cherry picked from commit 99bd5b57c1edc0800d0c8cd54e1917f6a3aa5c6b)
    (cherry picked from commit 524825e28be4ab64dbd8330da22dfe5e0de464d8)
---
 .../hadoop/hbase/regionserver/StoreScanner.java    | 11 ++-
 .../hadoop/hbase/regionserver/TestHStore.java      | 98 ++++++++++++++++++++++
 2 files changed, 103 insertions(+), 6 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index bc67b69b850..cbc617d2a0b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -736,7 +736,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
             }
             matcher.clearCurrentRow();
             seekOrSkipToNextRow(cell);
-            NextState stateAfterSeekNextRow = needToReturn(outResult);
+            NextState stateAfterSeekNextRow = needToReturn();
             if (stateAfterSeekNextRow != null) {
               return 
scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
             }
@@ -744,7 +744,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
 
           case SEEK_NEXT_COL:
             seekOrSkipToNextColumn(cell);
-            NextState stateAfterSeekNextColumn = needToReturn(outResult);
+            NextState stateAfterSeekNextColumn = needToReturn();
             if (stateAfterSeekNextColumn != null) {
               return 
scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
             }
@@ -762,7 +762,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
                 ((!scan.isReversed() && difference > 0) || (scan.isReversed() 
&& difference < 0))
               ) {
                 seekAsDirection(nextKV);
-                NextState stateAfterSeekByHint = needToReturn(outResult);
+                NextState stateAfterSeekByHint = needToReturn();
                 if (stateAfterSeekByHint != null) {
                   return 
scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
                 }
@@ -819,11 +819,10 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
    * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If 
the row of top cell
    * is changed, we should return the current cells. Otherwise, we may return 
the cells across
    * different rows.
-   * @param outResult the cells which are visible for user scan
    * @return null is the top cell doesn't change. Otherwise, the NextState to 
return
    */
-  private NextState needToReturn(List<Cell> outResult) {
-    if (!outResult.isEmpty() && topChanged) {
+  private NextState needToReturn() {
+    if (topChanged) {
       return heap.peek() == null ? NextState.NO_MORE_VALUES : 
NextState.MORE_VALUES;
     }
     return null;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 83856cda513..eca38d0bc23 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.IntBinaryOperator;
+import java.util.function.IntConsumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -1273,6 +1275,12 @@ public class TestHStore {
     return c;
   }
 
+  private ExtendedCell createDeleteCell(byte[] row, byte[] qualifier, long ts, 
long sequenceId) {
+    return 
ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
+      
.setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.DeleteColumn)
+      .setSequenceId(sequenceId).build();
+  }
+
   @Test
   public void testFlushBeforeCompletingScanWoFilter() throws IOException, 
InterruptedException {
     final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
@@ -1417,6 +1425,74 @@ public class TestHStore {
     }
   }
 
+  @Test
+  public void testFlushBeforeCompletingScanWithDeleteCell() throws IOException 
{
+    final Configuration conf = HBaseConfiguration.create();
+
+    byte[] r1 = Bytes.toBytes("row1");
+    byte[] r2 = Bytes.toBytes("row2");
+
+    byte[] value1 = Bytes.toBytes("value1");
+    byte[] value2 = Bytes.toBytes("value2");
+
+    final MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
+    final long ts = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+
+    init(name.getMethodName(), conf, 
TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
+      
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
+      new MyStoreHook() {
+        @Override
+        long getSmallestReadPoint(HStore store) {
+          return seqId + 3;
+        }
+      });
+
+    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value2), memStoreSizing);
+    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value2), memStoreSizing);
+    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value2), memStoreSizing);
+
+    store.add(createDeleteCell(r1, qf1, ts + 2, seqId + 2), memStoreSizing);
+    store.add(createDeleteCell(r1, qf2, ts + 2, seqId + 2), memStoreSizing);
+    store.add(createDeleteCell(r1, qf3, ts + 2, seqId + 2), memStoreSizing);
+
+    store.add(createCell(r2, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
+    store.add(createCell(r2, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
+    store.add(createCell(r2, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
+
+    Scan scan = new Scan().withStartRow(r1);
+
+    try (final InternalScanner scanner =
+      new StoreScanner(store, store.getScanInfo(), scan, null, seqId + 3) {
+        @Override
+        protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> 
scanners,
+          CellComparator comparator) throws IOException {
+          return new MyKeyValueHeap(scanners, comparator, 
recordBlockSizeCallCount -> {
+            if (recordBlockSizeCallCount == 6) {
+              try {
+                flushStore(store, id++);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+        }
+      }) {
+      List<Cell> cellResult = new ArrayList<>();
+
+      scanner.next(cellResult);
+      assertEquals(0, cellResult.size());
+
+      cellResult.clear();
+
+      scanner.next(cellResult);
+      assertEquals(3, cellResult.size());
+      for (Cell cell : cellResult) {
+        assertArrayEquals(r2, CellUtil.cloneRow(cell));
+      }
+    }
+  }
+
   @Test
   public void testCreateScannerAndSnapshotConcurrently() throws IOException, 
InterruptedException {
     Configuration conf = HBaseConfiguration.create();
@@ -3134,6 +3210,28 @@ public class TestHStore {
     }
   }
 
+  private interface MyKeyValueHeapHook {
+    void onRecordBlockSize(int recordBlockSizeCallCount);
+  }
+
+  private static class MyKeyValueHeap extends KeyValueHeap {
+    private final MyKeyValueHeapHook hook;
+    private int recordBlockSizeCallCount;
+
+    public MyKeyValueHeap(List<? extends KeyValueScanner> scanners, 
CellComparator comparator,
+      MyKeyValueHeapHook hook) throws IOException {
+      super(scanners, comparator);
+      this.hook = hook;
+    }
+
+    @Override
+    public void recordBlockSize(IntConsumer blockSizeConsumer) {
+      recordBlockSizeCallCount++;
+      hook.onRecordBlockSize(recordBlockSizeCallCount);
+      super.recordBlockSize(blockSizeConsumer);
+    }
+  }
+
   public static class MyCompactingMemStore2 extends CompactingMemStore {
     private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
     private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";

Reply via email to