This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new ce077a87728 HBASE-29987 Replication position corruption when WAL file 
switch detected in ReplicationSourceWALReader run loop (#7909)
ce077a87728 is described below

commit ce077a87728789108dd8acdd8a6ad9f38b5cbf5b
Author: Siddharth Khillon <[email protected]>
AuthorDate: Thu Mar 12 20:38:47 2026 -0700

    HBASE-29987 Replication position corruption when WAL file switch detected 
in ReplicationSourceWALReader run loop (#7909)
    
    When ReplicationSourceWALReader.run() detects a WAL file switch via the
    switched() check, it enqueues an EOF batch but does not update
    currentPosition. If the outer loop restarts (e.g., due to
    WALEntryFilterRetryableException), the new WALEntryStream is created
    with the stale position from the old file, applied to the new file.
    This causes an infinite retry loop (EOFException: Cannot seek after EOF)
    and the corrupted position may be persisted to ZK, surviving restarts.
    
    The fix resets currentPosition to entryStream.getPosition() (which
    returns 0 after dequeueCurrentLog()) before enqueuing the EOF batch.
    
    Includes a regression test that reproduces the bug by using
    nb.capacity=1 to force EOF detection at line 153 (not inside
    readWALEntries), combined with a WALEntryFilterRetryableException on
    the first entry of the new file to trigger the outer loop restart.
    
    Co-authored-by: skhillon <[email protected]>
    Signed-off-by: Duo Zhang <[email protected]>
    (cherry picked from commit e4f9c65e80dd7d5ffb103f782f054e0cc18f5878)
---
 .../regionserver/ReplicationSourceWALReader.java   |  1 +
 .../regionserver/TestBasicWALEntryStream.java      | 52 ++++++++++++++++++++++
 2 files changed, 53 insertions(+)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index e617fe6d016..89635ad000c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -158,6 +158,7 @@ class ReplicationSourceWALReader extends Thread {
           // first, check if we have switched a file, if so, we need to 
manually add an EOF entry
           // batch to the queue
           if (currentPath != null && switched(entryStream, currentPath)) {
+            currentPosition = entryStream.getPosition();
             entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath));
             continue;
           }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index b1e2f2c1634..b3265da3887 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -881,6 +881,58 @@ public abstract class TestBasicWALEntryStream extends 
WALEntryStreamTestBase {
     }
   }
 
+  /**
+   * Verify that when a WAL file switch is detected via the switched() check in
+   * ReplicationSourceWALReader.run(), currentPosition is reset so that a 
subsequent
+   * WALEntryFilterRetryableException does not cause the new file to be opened 
at the old file's
+   * position.
+   */
+  @Test
+  public void testPositionResetOnFileSwitchWithRetryableFilter() throws 
Exception {
+    appendEntriesToLogAndSync(3);
+    log.rollWriter();
+    AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
+    Waiter.waitFor(CONF, 5000,
+      (Waiter.Predicate<Exception>) () -> 
abstractWAL.getInflightWALCloseCount() == 0);
+    appendEntriesToLogAndSync(3);
+
+    // Batch capacity of 1 ensures EOF on WAL A is detected by hasNext() in 
the run loop
+    // (not inside readWALEntries), which triggers the switched() path.
+    Configuration conf = new Configuration(CONF);
+    conf.setInt("replication.source.nb.capacity", 1);
+
+    AtomicInteger totalFilterCalls = new AtomicInteger(0);
+    AtomicBoolean threwOnce = new AtomicBoolean(false);
+    WALEntryFilter filter = entry -> {
+      int callNum = totalFilterCalls.incrementAndGet();
+      if (callNum > 3 && !threwOnce.get()) {
+        threwOnce.set(true);
+        throw new WALEntryFilterRetryableException("simulated filter failure 
after file switch");
+      }
+      return entry;
+    };
+
+    ReplicationSource source = mockReplicationSource(false, conf);
+    when(source.isPeerEnabled()).thenReturn(true);
+    ReplicationSourceWALReader reader =
+      new ReplicationSourceWALReader(fs, conf, logQueue, 0, filter, source, 
fakeWalGroupId);
+    reader.start();
+
+    int totalEntries = 0;
+    long deadline = System.currentTimeMillis() + 30000;
+    while (totalEntries < 6) {
+      long remaining = deadline - System.currentTimeMillis();
+      assertTrue("Reader appears stuck - likely position corruption. Only got 
" + totalEntries
+        + " of 6 entries", remaining > 0);
+      WALEntryBatch batch = reader.poll(1);
+      if (batch != null && batch != WALEntryBatch.NO_MORE_DATA) {
+        totalEntries += batch.getNbEntries();
+      }
+    }
+    assertEquals(6, totalEntries);
+    assertTrue("Filter should have thrown at least once", threwOnce.get());
+  }
+
   private static class PartialWALEntryFailingWALEntryFilter implements 
WALEntryFilter {
     private int filteredWALEntryCount = -1;
     private int walEntryCount = 0;

Reply via email to