This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new f446a96b9e7 HBASE-29647 Restore preWALRestore and postWALRestore 
coprocessor hooks (#7369)
f446a96b9e7 is described below

commit f446a96b9e73ebf54e95dde391dfce7480d76131
Author: Istvan Toth <[email protected]>
AuthorDate: Thu Oct 9 06:18:18 2025 +0200

    HBASE-29647 Restore preWALRestore and postWALRestore coprocessor hooks 
(#7369)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../hadoop/hbase/coprocessor/RegionObserver.java   | 16 ++++++++
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 13 +++++++
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 20 ++++++++++
 .../hbase/regionserver/RegionCoprocessorHost.java  | 25 ++++++++++++
 .../coprocessor/SampleRegionWALCoprocessor.java    | 32 +++++++++++++++
 .../hbase/coprocessor/SimpleRegionObserver.java    | 36 +++++++++++++++++
 .../coprocessor/TestRegionObserverInterface.java   | 45 ++++++++++++++++++++--
 .../hadoop/hbase/coprocessor/TestWALObserver.java  |  2 +
 8 files changed, 185 insertions(+), 4 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 65fe524d0a4..7b7f7e208f5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1412,6 +1412,22 @@ public interface RegionObserver {
     RegionInfo info, Path edits) throws IOException {
   }
 
+  /**
+   * Called before a {@link WALEdit} replayed for this region.
+   * @param ctx the environment provided by the region server
+   */
+  default void preWALRestore(ObserverContext<? extends 
RegionCoprocessorEnvironment> ctx,
+    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+  }
+
+  /**
+   * Called after a {@link WALEdit} replayed for this region.
+   * @param ctx the environment provided by the region server
+   */
+  default void postWALRestore(ObserverContext<? extends 
RegionCoprocessorEnvironment> ctx,
+    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+  }
+
   /**
    * Called before bulkLoadHFile. Users can create a StoreFile instance to 
access the contents of a
    * HFile.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 7936197ff8d..9b7daee0f66 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5752,6 +5752,15 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           currentReplaySeqId =
             (key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : 
currentEditSeqId;
 
+          // Start coprocessor replay here. The coprocessor is for each WALEdit
+          // instead of a KeyValue.
+          if (coprocessorHost != null) {
+            status.setStatus("Running pre-WAL-restore hook in coprocessors");
+            if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) 
{
+              // if bypass this wal entry, ignore it ...
+              continue;
+            }
+          }
           boolean checkRowWithinBoundary = false;
           // Check this edit is for this region.
           if (
@@ -5822,6 +5831,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
             internalFlushcache(null, currentEditSeqId, stores.values(), 
status, false,
               FlushLifeCycleTracker.DUMMY);
           }
+
+          if (coprocessorHost != null) {
+            coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
+          }
         }
 
         if (coprocessorHost != null) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 11d5917dda6..fdfea375e09 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2114,6 +2114,7 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
         ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
           ? region.getCoprocessorHost()
           : null; // do not invoke coprocessors if this is a secondary region 
replica
+      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
 
       // Skip adding the edits to WAL if this is a secondary region replica
       boolean isPrimary = 
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
@@ -2135,6 +2136,18 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
         Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : 
new Pair<>();
         List<MutationReplay> edits =
           WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, 
durability);
+        if (coprocessorHost != null) {
+          // Start coprocessor replay here. The coprocessor is for each 
WALEdit instead of a
+          // KeyValue.
+          if (
+            coprocessorHost.preWALRestore(region.getRegionInfo(), 
walEntry.getFirst(),
+              walEntry.getSecond())
+          ) {
+            // if bypass this log entry, ignore it ...
+            continue;
+          }
+          walEntries.add(walEntry);
+        }
         if (edits != null && !edits.isEmpty()) {
           // HBASE-17924
           // sort to improve lock efficiency
@@ -2157,6 +2170,13 @@ public class RSRpcServices extends 
HBaseRpcServicesBase<HRegionServer>
       if (wal != null) {
         wal.sync();
       }
+
+      if (coprocessorHost != null) {
+        for (Pair<WALKey, WALEdit> entry : walEntries) {
+          coprocessorHost.postWALRestore(region.getRegionInfo(), 
entry.getFirst(),
+            entry.getSecond());
+        }
+      }
       return ReplicateWALEntryResponse.newBuilder().build();
     } catch (IOException ie) {
       throw new ServiceException(ie);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 703f06141bf..b300496e1d7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1426,6 +1426,31 @@ public class RegionCoprocessorHost
     });
   }
 
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @return true if default behavior should be bypassed, false otherwise
+   */
+  public boolean preWALRestore(final RegionInfo info, final WALKey logKey, 
final WALEdit logEdit)
+    throws IOException {
+    return execOperation(
+      coprocEnvironments.isEmpty() ? null : new 
RegionObserverOperationWithoutResult(true) {
+        @Override
+        public void call(RegionObserver observer) throws IOException {
+          observer.preWALRestore(this, info, logKey, logEdit);
+        }
+      });
+  }
+
+  public void postWALRestore(final RegionInfo info, final WALKey logKey, final 
WALEdit logEdit)
+    throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new 
RegionObserverOperationWithoutResult() {
+      @Override
+      public void call(RegionObserver observer) throws IOException {
+        observer.postWALRestore(this, info, logKey, logEdit);
+      }
+    });
+  }
+
   /**
    * @param familyPaths pairs of { CF, file path } submitted for bulk load
    */
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
index 17ab26c6a58..8d6d363daa6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
@@ -53,6 +53,8 @@ public class SampleRegionWALCoprocessor
 
   private boolean preWALWriteCalled = false;
   private boolean postWALWriteCalled = false;
+  private boolean preWALRestoreCalled = false;
+  private boolean postWALRestoreCalled = false;
   private boolean preWALRollCalled = false;
   private boolean postWALRollCalled = false;
   private boolean preReplayWALsCalled = false;
@@ -74,6 +76,8 @@ public class SampleRegionWALCoprocessor
     this.changedQualifier = chq;
     preWALWriteCalled = false;
     postWALWriteCalled = false;
+    preWALRestoreCalled = false;
+    postWALRestoreCalled = false;
     preWALRollCalled = false;
     postWALRollCalled = false;
   }
@@ -130,6 +134,15 @@ public class SampleRegionWALCoprocessor
     }
   }
 
+  /**
+   * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} 
when WAL is Restoreed.
+   */
+  @Override
+  public void preWALRestore(ObserverContext<? extends 
RegionCoprocessorEnvironment> env,
+    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+    preWALRestoreCalled = true;
+  }
+
   @Override
   public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> 
ctx, Path oldPath,
     Path newPath) throws IOException {
@@ -142,6 +155,15 @@ public class SampleRegionWALCoprocessor
     postWALRollCalled = true;
   }
 
+  /**
+   * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when 
WAL is Restoreed.
+   */
+  @Override
+  public void postWALRestore(ObserverContext<? extends 
RegionCoprocessorEnvironment> env,
+    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+    postWALRestoreCalled = true;
+  }
+
   @Override
   public void preReplayWALs(ObserverContext<? extends 
RegionCoprocessorEnvironment> ctx,
     RegionInfo info, Path edits) throws IOException {
@@ -162,6 +184,16 @@ public class SampleRegionWALCoprocessor
     return postWALWriteCalled;
   }
 
+  public boolean isPreWALRestoreCalled() {
+    LOG.debug(SampleRegionWALCoprocessor.class.getName() + 
".isPreWALRestoreCalled is called.");
+    return preWALRestoreCalled;
+  }
+
+  public boolean isPostWALRestoreCalled() {
+    LOG.debug(SampleRegionWALCoprocessor.class.getName() + 
".isPostWALRestoreCalled is called.");
+    return postWALRestoreCalled;
+  }
+
   public boolean isPreWALRollCalled() {
     return preWALRollCalled;
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index 42ddf84b877..ec32a1d2c4d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -128,6 +128,8 @@ public class SimpleRegionObserver implements 
RegionCoprocessor, RegionObserver {
   final AtomicInteger ctPostBatchMutate = new AtomicInteger(0);
   final AtomicInteger ctPreReplayWALs = new AtomicInteger(0);
   final AtomicInteger ctPostReplayWALs = new AtomicInteger(0);
+  final AtomicInteger ctPreWALRestore = new AtomicInteger(0);
+  final AtomicInteger ctPostWALRestore = new AtomicInteger(0);
   final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0);
   final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0);
   final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0);
@@ -696,6 +698,24 @@ public class SimpleRegionObserver implements 
RegionCoprocessor, RegionObserver {
     ctPostReplayWALs.incrementAndGet();
   }
 
+  @Override
+  public void preWALRestore(ObserverContext<? extends 
RegionCoprocessorEnvironment> env,
+    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+    String tableName = logKey.getTableName().getNameAsString();
+    if (tableName.equals(TABLE_SKIPPED)) {
+      // skip recovery of TABLE_SKIPPED for testing purpose
+      env.bypass();
+      return;
+    }
+    ctPreWALRestore.incrementAndGet();
+  }
+
+  @Override
+  public void postWALRestore(ObserverContext<? extends 
RegionCoprocessorEnvironment> env,
+    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+    ctPostWALRestore.incrementAndGet();
+  }
+
   @Override
   public StoreFileReader preStoreFileReaderOpen(
     ObserverContext<? extends RegionCoprocessorEnvironment> ctx, FileSystem 
fs, Path p,
@@ -912,6 +932,14 @@ public class SimpleRegionObserver implements 
RegionCoprocessor, RegionObserver {
     return ctPostReplayWALs.get() > 0;
   }
 
+  public boolean hadPreWALRestore() {
+    return ctPreWALRestore.get() > 0;
+  }
+
+  public boolean hadPostWALRestore() {
+    return ctPostWALRestore.get() > 0;
+  }
+
   public boolean wasScannerNextCalled() {
     return ctPreScannerNext.get() > 0 && ctPostScannerNext.get() > 0;
   }
@@ -1024,6 +1052,14 @@ public class SimpleRegionObserver implements 
RegionCoprocessor, RegionObserver {
     return ctPostReplayWALs.get();
   }
 
+  public int getCtPreWALRestore() {
+    return ctPreWALRestore.get();
+  }
+
+  public int getCtPostWALRestore() {
+    return ctPostWALRestore.get();
+  }
+
   public int getCtPreWALAppend() {
     return ctPreWALAppend.get();
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 81b51659571..df57add1708 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -768,8 +768,9 @@ public class TestRegionObserverInterface {
         tableName, new Boolean[] { false, false, true, true, true, true, false 
});
 
       verifyMethodResult(SimpleRegionObserver.class,
-        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", 
"getCtPrePut", "getCtPostPut" },
-        tableName, new Integer[] { 0, 0, 2, 2 });
+        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", 
"getCtPreWALRestore",
+          "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
+        tableName, new Integer[] { 0, 0, 0, 0, 2, 2 });
 
       cluster.killRegionServer(rs1.getRegionServer().getServerName());
       Threads.sleep(1000); // Let the kill soak in.
@@ -777,14 +778,50 @@ public class TestRegionObserverInterface {
       LOG.info("All regions assigned");
 
       verifyMethodResult(SimpleRegionObserver.class,
-        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", 
"getCtPrePut", "getCtPostPut" },
-        tableName, new Integer[] { 1, 1, 0, 0 });
+        new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", 
"getCtPreWALRestore",
+          "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
+        tableName, new Integer[] { 1, 1, 2, 2, 0, 0 });
     } finally {
       util.deleteTable(tableName);
       table.close();
     }
   }
 
+  @Test
+  public void testPreWALRestoreSkip() throws Exception {
+    LOG.info(TestRegionObserverInterface.class.getName() + "." + 
name.getMethodName());
+    TableName tableName = 
TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED);
+    Table table = util.createTable(tableName, new byte[][] { A, B, C });
+
+    try (RegionLocator locator = 
util.getConnection().getRegionLocator(tableName)) {
+      JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
+      ServerName sn2 = rs1.getRegionServer().getServerName();
+      String regEN = 
locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
+
+      util.getAdmin().move(Bytes.toBytes(regEN), sn2);
+      while 
(!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
+        Thread.sleep(100);
+      }
+
+      Put put = new Put(ROW);
+      put.addColumn(A, A, A);
+      put.addColumn(B, B, B);
+      put.addColumn(C, C, C);
+      table.put(put);
+
+      cluster.killRegionServer(rs1.getRegionServer().getServerName());
+      Threads.sleep(20000); // just to be sure that the kill has fully started.
+      util.waitUntilAllRegionsAssigned(tableName);
+    }
+
+    verifyMethodResult(SimpleRegionObserver.class,
+      new String[] { "getCtPreWALRestore", "getCtPostWALRestore", }, tableName,
+      new Integer[] { 0, 0 });
+
+    util.deleteTable(tableName);
+    table.close();
+  }
+
   // called from testPreWALAppendIsWrittenToWAL
   private void testPreWALAppendHook(Table table, TableName tableName) throws 
IOException {
     int expectedCalls = 0;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index 989110e41d9..6e7c6ff400a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -357,6 +357,8 @@ public class TestWALObserver {
         SampleRegionWALCoprocessor cp2 =
           
region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class);
         assertNotNull(cp2);
+        assertTrue(cp2.isPreWALRestoreCalled());
+        assertTrue(cp2.isPostWALRestoreCalled());
         assertTrue(cp2.isPreReplayWALsCalled());
         assertTrue(cp2.isPostReplayWALsCalled());
         region.close();

Reply via email to