This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957_rebase
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit c8cf1d6b6a90a1448a12a53bb2d7e0e845377d4c
Author: vinayak hegde <vinayakph...@gmail.com>
AuthorDate: Wed Jul 16 23:11:35 2025 +0530

    HBASE-29441 ReplicationSourceShipper should delegate the empty wal entries 
handling to ReplicationEndpoint (#7145)
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
---
 .../ContinuousBackupReplicationEndpoint.java       |  9 ++++
 .../hbase/replication/EmptyEntriesPolicy.java      | 34 ++++++++++++
 .../hbase/replication/ReplicationEndpoint.java     | 18 +++++++
 .../regionserver/ReplicationSourceShipper.java     | 33 ++++++++++--
 .../regionserver/TestReplicationSource.java        | 63 ++++++++++++++++++++++
 5 files changed, 153 insertions(+), 4 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index bf3fbd531bf..2442e0789a8 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy;
 import org.apache.hadoop.hbase.replication.ReplicationResult;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -205,6 +206,14 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
     notifyStarted();
   }
 
+  @Override
+  public EmptyEntriesPolicy getEmptyEntriesPolicy() {
+    // Since this endpoint writes to S3 asynchronously, an empty entry batch
+    // does not guarantee that all previously submitted entries were persisted.
+    // Hence, avoid committing the WAL position.
+    return EmptyEntriesPolicy.SUBMIT;
+  }
+
   @Override
   public ReplicationResult replicate(ReplicateContext replicateContext) {
     final List<WAL.Entry> entries = replicateContext.getEntries();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java
new file mode 100644
index 00000000000..5a5d8ab754c
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Policy that defines what a replication endpoint should do when the entry 
batch is empty. This is
+ * used to determine whether the replication source should consider an empty 
batch as: -
+ * {@code COMMIT}: Consider the position as fully committed, and update the 
WAL position. -
+ * {@code SUBMIT}: Treat it as submitted but not committed, i.e., do not 
advance the WAL position.
+ * Some endpoints may buffer entries (e.g., in open files on S3) and delay 
actual persistence. In
+ * such cases, an empty batch should not result in WAL position commit.
+ */
+@InterfaceAudience.Private
+public enum EmptyEntriesPolicy {
+  COMMIT,
+  SUBMIT
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index fc5c2bf6265..fbb6b6b9ef1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -291,4 +291,22 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
    * @throws IllegalStateException if this service's state isn't FAILED.
    */
   Throwable failureCause();
+
+  /**
+   * Defines the behavior when the replication source encounters an empty 
entry batch.
+   * <p>
+   * By default, this method returns {@link EmptyEntriesPolicy#COMMIT}, 
meaning the replication
+   * source can safely consider the WAL position as committed and move on.
+   * </p>
+   * <p>
+   * However, certain endpoints like backup or asynchronous S3 writers may 
delay persistence (e.g.,
+   * writing to temporary files or buffers). In those cases, returning
+   * {@link EmptyEntriesPolicy#SUBMIT} avoids incorrectly advancing WAL 
position and risking data
+   * loss.
+   * </p>
+   * @return the {@link EmptyEntriesPolicy} to apply for empty entry batches.
+   */
+  default EmptyEntriesPolicy getEmptyEntriesPolicy() {
+    return EmptyEntriesPolicy.COMMIT;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index ee819faa77b..f45c8762683 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
 import static 
org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
@@ -27,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -150,13 +152,25 @@ public class ReplicationSourceShipper extends Thread {
   }
 
   /**
-   * Do the shipping logic
+   * Do the shipping logic.
    */
-  private void shipEdits(WALEntryBatch entryBatch) {
+  @RestrictedApi(
+      explanation = "Package-private for test visibility only. Do not use 
outside tests.",
+      link = "",
+      allowedOnPath = 
"(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)")
+  void shipEdits(WALEntryBatch entryBatch) {
     List<Entry> entries = entryBatch.getWalEntries();
     int sleepMultiplier = 0;
     if (entries.isEmpty()) {
-      updateLogPosition(entryBatch, ReplicationResult.COMMITTED);
+      /*
+       * Delegate to the endpoint to decide how to treat empty entry batches. 
In most replication
+       * flows, receiving an empty entry batch means that everything so far 
has been successfully
+       * replicated and committed — so it's safe to mark the WAL position as 
committed (COMMIT).
+       * However, some endpoints (e.g., asynchronous S3 backups) may buffer 
writes and delay actual
+       * persistence. In such cases, we must avoid committing the WAL position 
prematurely.
+       */
+      final ReplicationResult result = getReplicationResult();
+      updateLogPosition(entryBatch, result);
       return;
     }
     int currentSize = (int) entryBatch.getHeapSize();
@@ -232,6 +246,13 @@ public class ReplicationSourceShipper extends Thread {
     }
   }
 
+  private ReplicationResult getReplicationResult() {
+    EmptyEntriesPolicy policy = 
source.getReplicationEndpoint().getEmptyEntriesPolicy();
+    return (policy == EmptyEntriesPolicy.COMMIT)
+      ? ReplicationResult.COMMITTED
+      : ReplicationResult.SUBMITTED;
+  }
+
   private void cleanUpHFileRefs(WALEdit edit) throws IOException {
     String peerId = source.getPeerId();
     if (peerId.contains("-")) {
@@ -256,7 +277,11 @@ public class ReplicationSourceShipper extends Thread {
     }
   }
 
-  private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult 
replicated) {
+  @RestrictedApi(
+      explanation = "Package-private for test visibility only. Do not use 
outside tests.",
+      link = "",
+      allowedOnPath = 
"(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)")
+  boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) 
{
     boolean updated = false;
     // if end of file is true, then the logPositionAndCleanOldLogs method will 
remove the file
     // record on zk, so let's call it. The last wal position maybe zero if end 
of file is true and
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 37af52eb93b..25eef51ff68 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -53,11 +53,13 @@ import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueueData;
 import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -492,6 +494,67 @@ public class TestReplicationSource {
 
   }
 
+  /**
+   * Custom ReplicationEndpoint that simulates an asynchronous target like S3 
or cloud storage. In
+   * this case, empty entry batches should not cause WAL position to be 
committed immediately.
+   */
+  public static class AsyncReplicationEndpoint extends 
DoNothingReplicationEndpoint {
+    @Override
+    public EmptyEntriesPolicy getEmptyEntriesPolicy() {
+      return EmptyEntriesPolicy.SUBMIT;
+    }
+  }
+
+  /**
+   * Default synchronous ReplicationEndpoint that treats empty entry batches 
as a signal to commit
+   * WAL position, assuming all entries pushed before were safely replicated.
+   */
+  public static class SyncReplicationEndpoint extends 
DoNothingReplicationEndpoint {
+    // Inherits default COMMIT behavior
+  }
+
+  /**
+   * Verifies that ReplicationSourceShipper commits the WAL position when 
using a synchronous
+   * endpoint and the entry batch is empty.
+   */
+  @Test
+  public void testEmptyBatchCommitsPositionForCommitEndpoint() {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    Mockito.when(source.getReplicationEndpoint()).thenReturn(new 
SyncReplicationEndpoint());
+
+    ReplicationSourceShipper shipper =
+      Mockito.spy(new ReplicationSourceShipper(conf, "testGroup", source, 
null));
+
+    WALEntryBatch emptyBatch = new WALEntryBatch(0, new Path("test-wal"));
+
+    shipper.shipEdits(emptyBatch);
+
+    // With default (COMMIT) policy, empty entry batch should advance WAL 
position
+    Mockito.verify(shipper).updateLogPosition(emptyBatch, 
ReplicationResult.COMMITTED);
+  }
+
+  /**
+   * Verifies that ReplicationSourceShipper does NOT commit the WAL position 
when using an
+   * asynchronous endpoint and the entry batch is empty.
+   */
+  @Test
+  public void testEmptyBatchSubmitsPositionForSubmitEndpoint() {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    Mockito.when(source.getReplicationEndpoint()).thenReturn(new 
AsyncReplicationEndpoint());
+
+    ReplicationSourceShipper shipper =
+      Mockito.spy(new ReplicationSourceShipper(conf, "testGroup", source, 
null));
+
+    WALEntryBatch emptyBatch = new WALEntryBatch(0, new Path("test-wal"));
+
+    shipper.shipEdits(emptyBatch);
+
+    // With SUBMIT policy, empty entry batch should NOT advance WAL position
+    Mockito.verify(shipper).updateLogPosition(emptyBatch, 
ReplicationResult.SUBMITTED);
+  }
+
   private RegionServerServices setupForAbortTests(ReplicationSource rs, 
Configuration conf,
     String endpointName) throws IOException {
     conf.setInt("replication.source.maxretriesmultiplier", 1);

Reply via email to