This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 006529af18aee5dfa179a19c00040df9a6c19096
Author: vinayak hegde <vinayakph...@gmail.com>
AuthorDate: Tue Feb 18 23:15:34 2025 +0530

    HBASE-28996: Implement Custom ReplicationEndpoint to Enable WAL Backup to 
External Storage (#6633)
    
    * HBASE-28996: Implement Custom ReplicationEndpoint to Enable WAL Backup to 
External Storage
    
    * fix spotless error
---
 .../replication/BackupFileSystemManager.java       |  71 +++
 .../backup/replication/BulkLoadProcessor.java      |  96 ++++
 .../ContinuousBackupReplicationEndpoint.java       | 440 ++++++++++++++++++
 .../replication/ObjectStoreProtobufWalWriter.java  |  73 +++
 .../hadoop/hbase/backup/replication/Utils.java     |  48 +-
 .../TestContinuousBackupReplicationEndpoint.java   | 513 +++++++++++++++++++++
 .../hbase/replication/ReplicationEndpoint.java     |  18 +-
 .../hbase/replication/ReplicationResult.java}      |  53 +--
 .../VerifyWALEntriesReplicationEndpoint.java       |   4 +-
 .../HBaseInterClusterReplicationEndpoint.java      |  13 +-
 .../regionserver/ReplicationSource.java            |  35 +-
 .../regionserver/ReplicationSourceInterface.java   |   9 +-
 .../regionserver/ReplicationSourceShipper.java     |  23 +-
 .../visibility/VisibilityReplicationEndpoint.java  |   3 +-
 .../replication/DummyReplicationEndpoint.java      |   4 +-
 .../replication/SerialReplicationTestBase.java     |   4 +-
 .../replication/TestHBaseReplicationEndpoint.java  |   8 +-
 .../TestNonHBaseReplicationEndpoint.java           |   4 +-
 .../hbase/replication/TestReplicationBase.java     |   2 +-
 .../hbase/replication/TestReplicationEndpoint.java |  22 +-
 .../TestVerifyCellsReplicationEndpoint.java        |   2 +-
 ...InterClusterReplicationEndpointFilterEdits.java |   4 +-
 .../TestRaceWhenCreatingReplicationSource.java     |   5 +-
 .../regionserver/TestReplicationSourceManager.java |   5 +-
 .../replication/regionserver/TestReplicator.java   |   3 +-
 .../TestVisibilityLabelsReplication.java           |   5 +-
 26 files changed, 1320 insertions(+), 147 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
new file mode 100644
index 00000000000..225d3217276
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Initializes and organizes backup directories for continuous Write-Ahead 
Logs (WALs) and
+ * bulk-loaded files within the specified backup root directory.
+ */
+@InterfaceAudience.Private
+public class BackupFileSystemManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BackupFileSystemManager.class);
+
+  public static final String WALS_DIR = "WALs";
+  public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
+  private final String peerId;
+  private final FileSystem backupFs;
+  private final Path backupRootDir;
+  private final Path walsDir;
+  private final Path bulkLoadFilesDir;
+
+  public BackupFileSystemManager(String peerId, Configuration conf, String 
backupRootDirStr)
+    throws IOException {
+    this.peerId = peerId;
+    this.backupRootDir = new Path(backupRootDirStr);
+    this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
+    this.walsDir = createDirectory(WALS_DIR);
+    this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
+  }
+
+  private Path createDirectory(String dirName) throws IOException {
+    Path dirPath = new Path(backupRootDir, dirName);
+    backupFs.mkdirs(dirPath);
+    LOG.info("{} Initialized directory: {}", Utils.logPeerId(peerId), dirPath);
+    return dirPath;
+  }
+
+  public Path getWalsDir() {
+    return walsDir;
+  }
+
+  public Path getBulkLoadFilesDir() {
+    return bulkLoadFilesDir;
+  }
+
+  public FileSystem getBackupFs() {
+    return backupFs;
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
new file mode 100644
index 00000000000..6e1271313bc
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase 
replication.
+ * <p>
+ * This utility class extracts and constructs the file paths of bulk-loaded 
files based on WAL
+ * entries. It processes bulk load descriptors and their associated store 
descriptors to generate
+ * the paths for each bulk-loaded file.
+ * <p>
+ * The class is designed for scenarios where replicable bulk load operations 
need to be parsed and
+ * their file paths need to be determined programmatically.
+ * </p>
+ */
+@InterfaceAudience.Private
+public final class BulkLoadProcessor {
+  private BulkLoadProcessor() {
+  }
+
+  public static List<Path> processBulkLoadFiles(List<WAL.Entry> walEntries) 
throws IOException {
+    List<Path> bulkLoadFilePaths = new ArrayList<>();
+
+    for (WAL.Entry entry : walEntries) {
+      WALEdit edit = entry.getEdit();
+      for (Cell cell : edit.getCells()) {
+        if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+          TableName tableName = entry.getKey().getTableName();
+          String namespace = tableName.getNamespaceAsString();
+          String table = tableName.getQualifierAsString();
+          bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, 
table));
+        }
+      }
+    }
+    return bulkLoadFilePaths;
+  }
+
+  private static List<Path> processBulkLoadDescriptor(Cell cell, String 
namespace, String table)
+    throws IOException {
+    List<Path> bulkLoadFilePaths = new ArrayList<>();
+    WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+
+    if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() == 
null) {
+      return bulkLoadFilePaths; // Skip if not replicable
+    }
+
+    String regionName = bld.getEncodedRegionName().toStringUtf8();
+    for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) {
+      bulkLoadFilePaths
+        .addAll(processStoreDescriptor(storeDescriptor, namespace, table, 
regionName));
+    }
+
+    return bulkLoadFilePaths;
+  }
+
+  private static List<Path> processStoreDescriptor(WALProtos.StoreDescriptor 
storeDescriptor,
+    String namespace, String table, String regionName) {
+    List<Path> paths = new ArrayList<>();
+    String columnFamily = storeDescriptor.getFamilyName().toStringUtf8();
+
+    for (String storeFile : storeDescriptor.getStoreFileList()) {
+      paths.add(new Path(namespace,
+        new Path(table, new Path(regionName, new Path(columnFamily, 
storeFile)))));
+    }
+
+    return paths;
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
new file mode 100644
index 00000000000..c973af8102e
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ContinuousBackupReplicationEndpoint is responsible for replicating WAL 
entries to a backup
+ * storage. It organizes WAL entries by day and periodically flushes the data, 
ensuring that WAL
+ * files do not exceed the configured size. The class includes mechanisms for 
handling the WAL
+ * files, performing bulk load backups, and ensuring that the replication 
process is safe.
+ */
+@InterfaceAudience.Private
+public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(ContinuousBackupReplicationEndpoint.class);
+  public static final String CONF_PEER_UUID = 
"hbase.backup.wal.replication.peerUUID";
+  public static final String CONF_BACKUP_ROOT_DIR = "hbase.backup.root.dir";
+  public static final String CONF_BACKUP_MAX_WAL_SIZE = 
"hbase.backup.max.wal.size";
+  public static final long DEFAULT_MAX_WAL_SIZE = 128 * 1024 * 1024;
+
+  public static final String CONF_STAGED_WAL_FLUSH_INITIAL_DELAY =
+    "hbase.backup.staged.wal.flush.initial.delay.seconds";
+  public static final int DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS = 5 * 
60; // 5 minutes
+  public static final String CONF_STAGED_WAL_FLUSH_INTERVAL =
+    "hbase.backup.staged.wal.flush.interval.seconds";
+  public static final int DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS = 5 * 60; 
// 5 minutes
+  public static final int EXECUTOR_TERMINATION_TIMEOUT_SECONDS = 60; // TODO: 
configurable??
+
+  private final Map<Long, FSHLogProvider.Writer> walWriters = new 
ConcurrentHashMap<>();
+  private final ReentrantLock lock = new ReentrantLock();
+
+  private ReplicationSourceInterface replicationSource;
+  private Configuration conf;
+  private BackupFileSystemManager backupFileSystemManager;
+  private UUID peerUUID;
+  private String peerId;
+  private ScheduledExecutorService flushExecutor;
+
+  public static final long ONE_DAY_IN_MILLISECONDS = TimeUnit.DAYS.toMillis(1);
+  public static final String WAL_FILE_PREFIX = "wal_file.";
+  public static final String DATE_FORMAT = "yyyy-MM-dd";
+
+  @Override
+  public void init(Context context) throws IOException {
+    super.init(context);
+    this.replicationSource = context.getReplicationSource();
+    this.peerId = context.getPeerId();
+    this.conf = HBaseConfiguration.create(context.getConfiguration());
+
+    initializePeerUUID();
+    initializeBackupFileSystemManager();
+    startWalFlushExecutor();
+    LOG.info("{} Initialization complete", Utils.logPeerId(peerId));
+  }
+
+  private void initializePeerUUID() throws IOException {
+    String peerUUIDStr = conf.get(CONF_PEER_UUID);
+    if (peerUUIDStr == null || peerUUIDStr.isEmpty()) {
+      throw new IOException("Peer UUID is not specified. Please configure " + 
CONF_PEER_UUID);
+    }
+    try {
+      this.peerUUID = UUID.fromString(peerUUIDStr);
+      LOG.info("{} Peer UUID initialized to {}", Utils.logPeerId(peerId), 
peerUUID);
+    } catch (IllegalArgumentException e) {
+      throw new IOException("Invalid Peer UUID format: " + peerUUIDStr, e);
+    }
+  }
+
+  private void initializeBackupFileSystemManager() throws IOException {
+    String backupRootDir = conf.get(CONF_BACKUP_ROOT_DIR);
+    if (backupRootDir == null || backupRootDir.isEmpty()) {
+      throw new IOException(
+        "Backup root directory is not specified. Configure " + 
CONF_BACKUP_ROOT_DIR);
+    }
+
+    try {
+      this.backupFileSystemManager = new BackupFileSystemManager(peerId, conf, 
backupRootDir);
+      LOG.info("{} BackupFileSystemManager initialized successfully for {}",
+        Utils.logPeerId(peerId), backupRootDir);
+    } catch (IOException e) {
+      throw new IOException("Failed to initialize BackupFileSystemManager", e);
+    }
+  }
+
+  private void startWalFlushExecutor() {
+    int initialDelay = conf.getInt(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY,
+      DEFAULT_STAGED_WAL_FLUSH_INITIAL_DELAY_SECONDS);
+    int flushInterval =
+      conf.getInt(CONF_STAGED_WAL_FLUSH_INTERVAL, 
DEFAULT_STAGED_WAL_FLUSH_INTERVAL_SECONDS);
+
+    flushExecutor = Executors.newSingleThreadScheduledExecutor();
+    flushExecutor.scheduleAtFixedRate(this::flushAndBackupSafely, 
initialDelay, flushInterval,
+      TimeUnit.SECONDS);
+    LOG.info("{} Scheduled WAL flush executor started with initial delay {}s 
and interval {}s",
+      Utils.logPeerId(peerId), initialDelay, flushInterval);
+  }
+
+  private void flushAndBackupSafely() {
+    lock.lock();
+    try {
+      LOG.info("{} Periodic WAL flush triggered", Utils.logPeerId(peerId));
+      flushWriters();
+      replicationSource.persistOffsets();
+      LOG.info("{} Periodic WAL flush and offset persistence completed 
successfully",
+        Utils.logPeerId(peerId));
+    } catch (IOException e) {
+      LOG.error("{} Error during WAL flush: {}", Utils.logPeerId(peerId), 
e.getMessage(), e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void flushWriters() throws IOException {
+    LOG.info("{} Flushing {} WAL writers", Utils.logPeerId(peerId), 
walWriters.size());
+    for (Map.Entry<Long, FSHLogProvider.Writer> entry : walWriters.entrySet()) 
{
+      FSHLogProvider.Writer writer = entry.getValue();
+      if (writer != null) {
+        LOG.debug("{} Closing WAL writer for day: {}", 
Utils.logPeerId(peerId), entry.getKey());
+        try {
+          writer.close();
+          LOG.debug("{} Successfully closed WAL writer for day: {}", 
Utils.logPeerId(peerId),
+            entry.getKey());
+        } catch (IOException e) {
+          LOG.error("{} Failed to close WAL writer for day: {}. Error: {}", 
Utils.logPeerId(peerId),
+            entry.getKey(), e.getMessage(), e);
+          throw e;
+        }
+      }
+    }
+    walWriters.clear();
+    LOG.info("{} WAL writers flushed and cleared", Utils.logPeerId(peerId));
+  }
+
+  @Override
+  public UUID getPeerUUID() {
+    return peerUUID;
+  }
+
+  @Override
+  public void start() {
+    LOG.info("{} Starting ContinuousBackupReplicationEndpoint", 
Utils.logPeerId(peerId));
+    startAsync();
+  }
+
+  @Override
+  protected void doStart() {
+    LOG.info("{} ContinuousBackupReplicationEndpoint started successfully.",
+      Utils.logPeerId(peerId));
+    notifyStarted();
+  }
+
+  @Override
+  public ReplicationResult replicate(ReplicateContext replicateContext) {
+    final List<WAL.Entry> entries = replicateContext.getEntries();
+    if (entries.isEmpty()) {
+      LOG.debug("{} No WAL entries to replicate", Utils.logPeerId(peerId));
+      return ReplicationResult.SUBMITTED;
+    }
+
+    LOG.debug("{} Received {} WAL entries for replication", 
Utils.logPeerId(peerId),
+      entries.size());
+
+    Map<Long, List<WAL.Entry>> groupedEntries = groupEntriesByDay(entries);
+    LOG.debug("{} Grouped WAL entries by day: {}", Utils.logPeerId(peerId),
+      groupedEntries.keySet());
+
+    lock.lock();
+    try {
+      for (Map.Entry<Long, List<WAL.Entry>> entry : groupedEntries.entrySet()) 
{
+        LOG.debug("{} Backing up {} WAL entries for day {}", 
Utils.logPeerId(peerId),
+          entry.getValue().size(), entry.getKey());
+        backupWalEntries(entry.getKey(), entry.getValue());
+      }
+
+      if (isAnyWriterFull()) {
+        LOG.debug("{} Some WAL writers reached max size, triggering flush",
+          Utils.logPeerId(peerId));
+        flushWriters();
+        LOG.debug("{} Replication committed after WAL flush", 
Utils.logPeerId(peerId));
+        return ReplicationResult.COMMITTED;
+      }
+
+      LOG.debug("{} Replication submitted successfully", 
Utils.logPeerId(peerId));
+      return ReplicationResult.SUBMITTED;
+    } catch (IOException e) {
+      LOG.error("{} Replication failed. Error details: {}", 
Utils.logPeerId(peerId), e.getMessage(),
+        e);
+      return ReplicationResult.FAILED;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private Map<Long, List<WAL.Entry>> groupEntriesByDay(List<WAL.Entry> 
entries) {
+    return entries.stream().collect(
+      Collectors.groupingBy(entry -> (entry.getKey().getWriteTime() / 
ONE_DAY_IN_MILLISECONDS)
+        * ONE_DAY_IN_MILLISECONDS));
+  }
+
+  private boolean isAnyWriterFull() {
+    return walWriters.values().stream().anyMatch(this::isWriterFull);
+  }
+
+  private boolean isWriterFull(FSHLogProvider.Writer writer) {
+    long maxWalSize = conf.getLong(CONF_BACKUP_MAX_WAL_SIZE, 
DEFAULT_MAX_WAL_SIZE);
+    return writer.getLength() >= maxWalSize;
+  }
+
+  private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws 
IOException {
+    LOG.debug("{} Starting backup of {} WAL entries for day {}", 
Utils.logPeerId(peerId),
+      walEntries.size(), day);
+
+    try {
+      FSHLogProvider.Writer walWriter = walWriters.computeIfAbsent(day, 
this::createWalWriter);
+      List<Path> bulkLoadFiles = 
BulkLoadProcessor.processBulkLoadFiles(walEntries);
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("{} Processed {} bulk load files for WAL entries", 
Utils.logPeerId(peerId),
+          bulkLoadFiles.size());
+        LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId),
+          
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
+      }
+
+      for (WAL.Entry entry : walEntries) {
+        walWriter.append(entry);
+      }
+      walWriter.sync(true);
+      uploadBulkLoadFiles(bulkLoadFiles);
+    } catch (UncheckedIOException e) {
+      String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create 
WAL Writer for " + day;
+      LOG.error("{} Backup failed for day {}. Error: {}", 
Utils.logPeerId(peerId), day,
+        e.getMessage(), e);
+      throw new IOException(errorMsg, e);
+    }
+  }
+
+  private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
+    // Convert dayInMillis to "yyyy-MM-dd" format
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+    String dayDirectoryName = dateFormat.format(new Date(dayInMillis));
+
+    FileSystem fs = backupFileSystemManager.getBackupFs();
+    Path walsDir = backupFileSystemManager.getWalsDir();
+
+    try {
+      // Create a directory for the day
+      Path dayDir = new Path(walsDir, dayDirectoryName);
+      fs.mkdirs(dayDir);
+
+      // Generate a unique WAL file name
+      long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+      String walFileName = WAL_FILE_PREFIX + currentTime + "." + 
UUID.randomUUID();
+      Path walFilePath = new Path(dayDir, walFileName);
+
+      // Initialize the WAL writer
+      FSHLogProvider.Writer writer =
+        
ObjectStoreProtobufWalWriter.class.getDeclaredConstructor().newInstance();
+      writer.init(fs, walFilePath, conf, true, WALUtil.getWALBlockSize(conf, 
fs, walFilePath),
+        StreamSlowMonitor.create(conf, walFileName));
+
+      LOG.info("{} WAL writer created: {}", Utils.logPeerId(peerId), 
walFilePath);
+      return writer;
+    } catch (Exception e) {
+      throw new UncheckedIOException(
+        Utils.logPeerId(peerId) + " Failed to initialize WAL Writer for day: " 
+ dayDirectoryName,
+        new IOException(e));
+    }
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("{} Stopping ContinuousBackupReplicationEndpoint...", 
Utils.logPeerId(peerId));
+    stopAsync();
+  }
+
+  @Override
+  protected void doStop() {
+    close();
+    LOG.info("{} ContinuousBackupReplicationEndpoint stopped successfully.",
+      Utils.logPeerId(peerId));
+    notifyStopped();
+  }
+
+  private void close() {
+    LOG.info("{} Closing WAL replication component...", 
Utils.logPeerId(peerId));
+    shutdownFlushExecutor();
+    lock.lock();
+    try {
+      flushWriters();
+      replicationSource.persistOffsets();
+    } catch (IOException e) {
+      LOG.error("{} Failed to Flush Open Wal Writers: {}", 
Utils.logPeerId(peerId), e.getMessage(),
+        e);
+    } finally {
+      lock.unlock();
+      LOG.info("{} WAL replication component closed.", 
Utils.logPeerId(peerId));
+    }
+  }
+
+  private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws 
IOException {
+    LOG.debug("{} Starting upload of {} bulk load files", 
Utils.logPeerId(peerId),
+      bulkLoadFiles.size());
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
+        
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
+    }
+    for (Path file : bulkLoadFiles) {
+      Path sourcePath = getBulkLoadFileStagingPath(file);
+      Path destPath = new Path(backupFileSystemManager.getBulkLoadFilesDir(), 
file);
+
+      try {
+        LOG.debug("{} Copying bulk load file from {} to {}", 
Utils.logPeerId(peerId), sourcePath,
+          destPath);
+
+        FileUtil.copy(CommonFSUtils.getRootDirFileSystem(conf), sourcePath,
+          backupFileSystemManager.getBackupFs(), destPath, false, conf);
+
+        LOG.info("{} Bulk load file {} successfully backed up to {}", 
Utils.logPeerId(peerId), file,
+          destPath);
+      } catch (IOException e) {
+        LOG.error("{} Failed to back up bulk load file {}: {}", 
Utils.logPeerId(peerId), file,
+          e.getMessage(), e);
+        throw e;
+      }
+    }
+
+    LOG.debug("{} Completed upload of bulk load files", 
Utils.logPeerId(peerId));
+  }
+
+  private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) 
throws IOException {
+    FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
+    Path rootDir = CommonFSUtils.getRootDir(conf);
+    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
+    Path baseNamespaceDir = new Path(rootDir, baseNSDir);
+    Path hFileArchiveDir =
+      new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, 
baseNSDir));
+
+    LOG.debug("{} Searching for bulk load file: {} in paths: {}, {}", 
Utils.logPeerId(peerId),
+      relativePathFromNamespace, baseNamespaceDir, hFileArchiveDir);
+
+    Path result =
+      findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir, 
relativePathFromNamespace);
+
+    if (result == null) {
+      LOG.error("{} No bulk loaded file found in relative path: {}", 
Utils.logPeerId(peerId),
+        relativePathFromNamespace);
+      throw new IOException(
+        "No Bulk loaded file found in relative path: " + 
relativePathFromNamespace);
+    }
+
+    LOG.debug("{} Bulk load file found at {}", Utils.logPeerId(peerId), 
result);
+    return result;
+  }
+
+  private static Path findExistingPath(FileSystem rootFs, Path 
baseNamespaceDir,
+    Path hFileArchiveDir, Path filePath) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Checking for bulk load file at: {} and {}", new 
Path(baseNamespaceDir, filePath),
+        new Path(hFileArchiveDir, filePath));
+    }
+
+    for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath),
+      new Path(hFileArchiveDir, filePath) }) {
+      if (rootFs.exists(candidate)) {
+        LOG.debug("Found bulk load file at: {}", candidate);
+        return candidate;
+      }
+    }
+    return null;
+  }
+
+  private void shutdownFlushExecutor() {
+    if (flushExecutor != null) {
+      LOG.info("{} Initiating WAL flush executor shutdown.", 
Utils.logPeerId(peerId));
+
+      flushExecutor.shutdown();
+      try {
+        if (
+          
!flushExecutor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)
+        ) {
+          LOG.warn("{} Flush executor did not terminate within timeout, 
forcing shutdown.",
+            Utils.logPeerId(peerId));
+          flushExecutor.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        flushExecutor.shutdownNow();
+        LOG.warn("{} Flush executor shutdown was interrupted.", 
Utils.logPeerId(peerId), e);
+      }
+      LOG.info("{} WAL flush thread stopped.", Utils.logPeerId(peerId));
+    }
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ObjectStoreProtobufWalWriter.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ObjectStoreProtobufWalWriter.java
new file mode 100644
index 00000000000..27f4fbdc027
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ObjectStoreProtobufWalWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
+import org.apache.hadoop.hbase.util.AtomicUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A custom implementation of {@link ProtobufLogWriter} that provides support 
for writing
+ * protobuf-based WAL (Write-Ahead Log) entries to object store-backed files.
+ * <p>
+ * This class overrides the {@link ProtobufLogWriter#sync(boolean)} and
+ * {@link ProtobufLogWriter#initOutput(FileSystem, Path, boolean, int, short, 
long, StreamSlowMonitor, boolean)}
+ * methods to ensure compatibility with object stores, while ignoring specific 
capability checks
+ * such as HFLUSH and HSYNC. These checks are often not supported by some 
object stores, and
+ * bypassing them ensures smooth operation in such environments.
+ * </p>
+ */
+@InterfaceAudience.Private
+public class ObjectStoreProtobufWalWriter extends ProtobufLogWriter {
+  private final AtomicLong syncedLength = new AtomicLong(0);
+
+  @Override
+  public void sync(boolean forceSync) throws IOException {
+    FSDataOutputStream fsDataOutputstream = this.output;
+    if (fsDataOutputstream == null) {
+      return; // Presume closed
+    }
+    // Special case for Hadoop S3: Unlike traditional file systems, where 
flush() ensures data is
+    // durably written, in Hadoop S3, flush() only writes data to the internal 
buffer and does not
+    // immediately persist it to S3. The actual upload to S3 happens 
asynchronously, typically when
+    // a block is full or when close() is called, which finalizes the upload 
process.
+    fsDataOutputstream.flush();
+    AtomicUtils.updateMax(this.syncedLength, fsDataOutputstream.getPos());
+  }
+
+  @Override
+  protected void initOutput(FileSystem fs, Path path, boolean overwritable, 
int bufferSize,
+    short replication, long blockSize, StreamSlowMonitor monitor, boolean 
noLocalWrite)
+    throws IOException {
+    try {
+      super.initOutput(fs, path, overwritable, bufferSize, replication, 
blockSize, monitor,
+        noLocalWrite);
+    } catch (CommonFSUtils.StreamLacksCapabilityException e) {
+      // Ignore capability check for HFLUSH and HSYNC capabilities
+      // Some object stores may not support these capabilities, so we bypass 
the exception handling
+      // to ensure compatibility with such stores.
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java
similarity index 53%
copy from 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
copy to 
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java
index e6a39e7fede..69365674acc 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/Utils.java
@@ -15,54 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.replication;
+package org.apache.hadoop.hbase.backup.replication;
 
-import java.util.UUID;
 import org.apache.yetus.audience.InterfaceAudience;
 
-/**
- * A dummy replication endpoint that does nothing, for test use only.
- */
 @InterfaceAudience.Private
-public class DummyReplicationEndpoint extends BaseReplicationEndpoint {
-
-  @Override
-  public boolean canReplicateToSameCluster() {
-    return true;
-  }
-
-  @Override
-  public UUID getPeerUUID() {
-    return ctx.getClusterId();
-  }
-
-  @Override
-  public WALEntryFilter getWALEntryfilter() {
-    return null;
-  }
-
-  @Override
-  public boolean replicate(ReplicateContext replicateContext) {
-    return true;
-  }
-
-  @Override
-  public void start() {
-    startAsync();
-  }
-
-  @Override
-  public void stop() {
-    stopAsync();
-  }
-
-  @Override
-  protected void doStart() {
-    notifyStarted();
+public final class Utils {
+  private Utils() {
   }
 
-  @Override
-  protected void doStop() {
-    notifyStopped();
+  public static String logPeerId(String peerId) {
+    return "[Source for peer " + peerId + "]:";
   }
 }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
new file mode 100644
index 00000000000..cd1f758f760
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import static 
org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestContinuousBackupReplicationEndpoint {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestContinuousBackupReplicationEndpoint.class);
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestContinuousBackupReplicationEndpoint.class);
+
+  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  private static final Configuration conf = TEST_UTIL.getConfiguration();
+  private static Admin admin;
+
+  private final String replicationEndpoint = 
ContinuousBackupReplicationEndpoint.class.getName();
+  private static final String CF_NAME = "cf";
+  private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier");
+  static FileSystem fs = null;
+  static Path root;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Set the configuration properties as required
+    conf.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    conf.set(REPLICATION_CLUSTER_ID, "clusterId1");
+
+    TEST_UTIL.startMiniZKCluster();
+    TEST_UTIL.startMiniCluster(3);
+    fs = FileSystem.get(conf);
+    root = TEST_UTIL.getDataTestDirOnTestFS();
+    admin = TEST_UTIL.getAdmin();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testWALAndBulkLoadFileBackup() throws IOException {
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName = TableName.valueOf("table_" + methodName);
+    String peerId = "peerId";
+
+    createTable(tableName);
+
+    Path backupRootDir = new Path(root, methodName);
+    fs.mkdirs(backupRootDir);
+
+    Map<TableName, List<String>> tableMap = new HashMap<>();
+    tableMap.put(tableName, new ArrayList<>());
+
+    addReplicationPeer(peerId, backupRootDir, tableMap);
+
+    loadRandomData(tableName, 100);
+    assertEquals(100, getRowCount(tableName));
+
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
+    generateHFiles(dir);
+    bulkLoadHFiles(tableName, dir);
+    assertEquals(1100, getRowCount(tableName));
+
+    waitForReplication(15000);
+    deleteReplicationPeer(peerId);
+
+    verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100));
+
+    deleteTable(tableName);
+  }
+
+  @Test
+  public void testMultiTableWALBackup() throws IOException {
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName table1 = TableName.valueOf("table_" + methodName + "1");
+    TableName table2 = TableName.valueOf("table_" + methodName + "2");
+    TableName table3 = TableName.valueOf("table_" + methodName + "3");
+    String peerId = "peerMulti";
+
+    for (TableName table : List.of(table1, table2, table3)) {
+      createTable(table);
+    }
+
+    Path backupRootDir = new Path(root, methodName);
+    fs.mkdirs(backupRootDir);
+
+    Map<TableName, List<String>> initialTableMap = new HashMap<>();
+    initialTableMap.put(table1, new ArrayList<>());
+    initialTableMap.put(table2, new ArrayList<>());
+
+    addReplicationPeer(peerId, backupRootDir, initialTableMap);
+
+    for (TableName table : List.of(table1, table2, table3)) {
+      loadRandomData(table, 50);
+      assertEquals(50, getRowCount(table));
+    }
+
+    waitForReplication(15000);
+
+    // Update the Replication Peer to Include table3
+    admin.updateReplicationPeerConfig(peerId,
+      ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig(peerId))
+        .setTableCFsMap(
+          Map.of(table1, new ArrayList<>(), table2, new ArrayList<>(), table3, 
new ArrayList<>()))
+        .build());
+
+    for (TableName table : List.of(table1, table2, table3)) {
+      loadRandomData(table, 50);
+      assertEquals(100, getRowCount(table));
+    }
+
+    waitForReplication(15000);
+    deleteReplicationPeer(peerId);
+
+    verifyBackup(backupRootDir.toString(), false, Map.of(table1, 100, table2, 
100, table3, 50));
+
+    for (TableName table : List.of(table1, table2, table3)) {
+      deleteTable(table);
+    }
+  }
+
+  @Test
+  public void testWALBackupWithPeerRestart() throws IOException, 
InterruptedException {
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName = TableName.valueOf("table_" + methodName);
+    String peerId = "peerId";
+
+    createTable(tableName);
+
+    Path backupRootDir = new Path(root, methodName);
+    fs.mkdirs(backupRootDir);
+
+    Map<TableName, List<String>> tableMap = new HashMap<>();
+    tableMap.put(tableName, new ArrayList<>());
+
+    addReplicationPeer(peerId, backupRootDir, tableMap);
+
+    AtomicBoolean stopLoading = new AtomicBoolean(false);
+
+    // Start a separate thread to load data continuously
+    Thread dataLoaderThread = new Thread(() -> {
+      try {
+        while (!stopLoading.get()) {
+          loadRandomData(tableName, 10);
+          Thread.sleep(1000); // Simulate delay
+        }
+      } catch (Exception e) {
+        LOG.error("Data loading thread encountered an error", e);
+      }
+    });
+
+    dataLoaderThread.start();
+
+    // Main thread enables and disables replication peer
+    try {
+      for (int i = 0; i < 5; i++) {
+        LOG.info("Disabling replication peer...");
+        admin.disableReplicationPeer(peerId);
+        Thread.sleep(2000);
+
+        LOG.info("Enabling replication peer...");
+        admin.enableReplicationPeer(peerId);
+        Thread.sleep(2000);
+      }
+    } finally {
+      stopLoading.set(true); // Stop the data loader thread
+      dataLoaderThread.join();
+    }
+
+    waitForReplication(20000);
+    deleteReplicationPeer(peerId);
+
+    verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 
getRowCount(tableName)));
+
+    deleteTable(tableName);
+  }
+
+  @Test
+  public void testDayWiseWALBackup() throws IOException {
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName = TableName.valueOf("table_" + methodName);
+    String peerId = "peerId";
+
+    createTable(tableName);
+
+    Path backupRootDir = new Path(root, methodName);
+    fs.mkdirs(backupRootDir);
+
+    Map<TableName, List<String>> tableMap = new HashMap<>();
+    tableMap.put(tableName, new ArrayList<>());
+
+    addReplicationPeer(peerId, backupRootDir, tableMap);
+
+    // Mock system time using ManualEnvironmentEdge
+    ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManagerTestHelper.injectEdge(manualEdge);
+
+    long currentTime = System.currentTimeMillis();
+    long oneDayBackTime = currentTime - ONE_DAY_IN_MILLISECONDS;
+
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+    String expectedPrevDayDir = dateFormat.format(new Date(oneDayBackTime));
+    String expectedCurrentDayDir = dateFormat.format(new Date(currentTime));
+
+    manualEdge.setValue(oneDayBackTime);
+    loadRandomData(tableName, 100);
+    assertEquals(100, getRowCount(tableName));
+
+    manualEdge.setValue(currentTime);
+    loadRandomData(tableName, 100);
+    assertEquals(200, getRowCount(tableName));
+
+    // Reset time mocking
+    EnvironmentEdgeManagerTestHelper.reset();
+
+    waitForReplication(15000);
+    deleteReplicationPeer(peerId);
+
+    verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 200));
+
+    // Verify that WALs are stored in two directories, one for each day
+    Path walDir = new Path(backupRootDir, WALS_DIR);
+    Set<String> walDirectories = new HashSet<>();
+
+    FileStatus[] fileStatuses = fs.listStatus(walDir);
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        String dirName = fileStatus.getPath().getName();
+        walDirectories.add(dirName);
+      }
+    }
+
+    assertEquals("WALs should be stored in exactly two directories", 2, 
walDirectories.size());
+    assertTrue("Expected previous day's WAL directory missing",
+      walDirectories.contains(expectedPrevDayDir));
+    assertTrue("Expected current day's WAL directory missing",
+      walDirectories.contains(expectedCurrentDayDir));
+
+    deleteTable(tableName);
+  }
+
+  private void createTable(TableName tableName) throws IOException {
+    ColumnFamilyDescriptor columnFamilyDescriptor =
+      
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CF_NAME)).setScope(1).build();
+    TableDescriptor tableDescriptor =
+      
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(columnFamilyDescriptor).build();
+
+    if (!admin.tableExists(tableName)) {
+      admin.createTable(tableDescriptor);
+    }
+  }
+
+  private void deleteTable(TableName tableName) throws IOException {
+    admin.disableTable(tableName);
+    admin.truncateTable(tableName, false);
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+  }
+
+  private void addReplicationPeer(String peerId, Path backupRootDir,
+    Map<TableName, List<String>> tableMap) throws IOException {
+    Map<String, String> additionalArgs = new HashMap<>();
+    additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString());
+    additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString());
+    additionalArgs.put(CONF_BACKUP_MAX_WAL_SIZE, "10240");
+    additionalArgs.put(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10");
+    additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10");
+
+    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      
.setReplicationEndpointImpl(replicationEndpoint).setReplicateAllUserTables(false)
+      .setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build();
+
+    admin.addReplicationPeer(peerId, peerConfig);
+  }
+
+  private void deleteReplicationPeer(String peerId) throws IOException {
+    admin.disableReplicationPeer(peerId);
+    admin.removeReplicationPeer(peerId);
+  }
+
+  private void loadRandomData(TableName tableName, int totalRows) throws 
IOException {
+    int rowSize = 32;
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      TEST_UTIL.loadRandomRows(table, Bytes.toBytes(CF_NAME), rowSize, 
totalRows);
+    }
+  }
+
+  private void bulkLoadHFiles(TableName tableName, Path inputDir) throws 
IOException {
+    
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 true);
+
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      BulkLoadHFiles loader = new 
BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
+      loader.bulkLoad(table.getName(), inputDir);
+    } finally {
+      
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 false);
+    }
+  }
+
+  private void bulkLoadHFiles(TableName tableName, Map<byte[], List<Path>> 
family2Files)
+    throws IOException {
+    
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 true);
+
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      BulkLoadHFiles loader = new 
BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
+      loader.bulkLoad(table.getName(), family2Files);
+    } finally {
+      
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 false);
+    }
+  }
+
+  private void generateHFiles(Path outputDir) throws IOException {
+    String hFileName = "MyHFile";
+    int numRows = 1000;
+    outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+    byte[] from = Bytes.toBytes(CF_NAME + "begin");
+    byte[] to = Bytes.toBytes(CF_NAME + "end");
+
+    Path familyDir = new Path(outputDir, CF_NAME);
+    HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, new 
Path(familyDir, hFileName),
+      Bytes.toBytes(CF_NAME), QUALIFIER, from, to, numRows);
+  }
+
+  private void waitForReplication(int durationInMillis) {
+    LOG.info("Waiting for replication to complete for {} ms", 
durationInMillis);
+    try {
+      Thread.sleep(durationInMillis);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Thread was interrupted while waiting", e);
+    }
+  }
+
+  /**
+   * Verifies the backup process by: 1. Checking whether any WAL (Write-Ahead 
Log) files were
+   * generated in the backup directory. 2. Checking whether any bulk-loaded 
files were generated in
+   * the backup directory. 3. Replaying the WAL and bulk-loaded files (if 
present) to restore data
+   * and check consistency by verifying that the restored data matches the 
expected row count for
+   * each table.
+   */
+  private void verifyBackup(String backupRootDir, boolean hasBulkLoadFiles,
+    Map<TableName, Integer> tablesWithExpectedRows) throws IOException {
+    verifyWALBackup(backupRootDir);
+    if (hasBulkLoadFiles) {
+      verifyBulkLoadBackup(backupRootDir);
+    }
+
+    for (Map.Entry<TableName, Integer> entry : 
tablesWithExpectedRows.entrySet()) {
+      TableName tableName = entry.getKey();
+      int expectedRows = entry.getValue();
+
+      admin.disableTable(tableName);
+      admin.truncateTable(tableName, false);
+      assertEquals(0, getRowCount(tableName));
+
+      replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName);
+      replayBulkLoadHFilesIfPresent(new Path(backupRootDir, 
BULKLOAD_FILES_DIR).toString(),
+        tableName);
+      assertEquals(expectedRows, getRowCount(tableName));
+    }
+  }
+
+  private void verifyWALBackup(String backupRootDir) throws IOException {
+    Path walDir = new Path(backupRootDir, WALS_DIR);
+    assertTrue("WAL directory does not exist!", fs.exists(walDir));
+
+    RemoteIterator<LocatedFileStatus> fileStatusIterator = 
fs.listFiles(walDir, true);
+    List<Path> walFiles = new ArrayList<>();
+
+    while (fileStatusIterator.hasNext()) {
+      LocatedFileStatus fileStatus = fileStatusIterator.next();
+      Path filePath = fileStatus.getPath();
+
+      // Check if the file starts with the expected WAL prefix
+      if (!fileStatus.isDirectory() && 
filePath.getName().startsWith(WAL_FILE_PREFIX)) {
+        walFiles.add(filePath);
+      }
+    }
+
+    assertNotNull("No WAL files found!", walFiles);
+    assertFalse("Expected some WAL files but found none!", walFiles.isEmpty());
+  }
+
+  private void verifyBulkLoadBackup(String backupRootDir) throws IOException {
+    Path bulkLoadFilesDir = new Path(backupRootDir, BULKLOAD_FILES_DIR);
+    assertTrue("BulkLoad Files directory does not exist!", 
fs.exists(bulkLoadFilesDir));
+
+    FileStatus[] bulkLoadFiles = fs.listStatus(bulkLoadFilesDir);
+    assertNotNull("No Bulk load files found!", bulkLoadFiles);
+    assertTrue("Expected some Bulk load files but found none!", 
bulkLoadFiles.length > 0);
+  }
+
+  private void replayWALs(String walDir, TableName tableName) {
+    WALPlayer player = new WALPlayer();
+    try {
+      assertEquals(0, ToolRunner.run(TEST_UTIL.getConfiguration(), player,
+        new String[] { walDir, tableName.getQualifierAsString() }));
+    } catch (Exception e) {
+      fail("Failed to replay WALs properly: " + e.getMessage());
+    }
+  }
+
+  private void replayBulkLoadHFilesIfPresent(String bulkLoadDir, TableName 
tableName) {
+    try {
+      Path tableBulkLoadDir = new Path(bulkLoadDir + "/default/" + tableName);
+      if (fs.exists(tableBulkLoadDir)) {
+        RemoteIterator<LocatedFileStatus> fileStatusIterator = 
fs.listFiles(tableBulkLoadDir, true);
+        List<Path> bulkLoadFiles = new ArrayList<>();
+
+        while (fileStatusIterator.hasNext()) {
+          LocatedFileStatus fileStatus = fileStatusIterator.next();
+          Path filePath = fileStatus.getPath();
+
+          if (!fileStatus.isDirectory()) {
+            bulkLoadFiles.add(filePath);
+          }
+        }
+        bulkLoadHFiles(tableName, Map.of(Bytes.toBytes(CF_NAME), 
bulkLoadFiles));
+      }
+    } catch (Exception e) {
+      fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage());
+    }
+  }
+
+  private int getRowCount(TableName tableName) throws IOException {
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      return HBaseTestingUtil.countRows(table);
+    }
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index 5edd5b3e8c9..fc5c2bf6265 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -51,6 +52,7 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
 
   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class Context {
+    private final ReplicationSourceInterface replicationSource;
     private final Server server;
     private final Configuration localConf;
     private final Configuration conf;
@@ -63,10 +65,12 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
     private final Abortable abortable;
 
     @InterfaceAudience.Private
-    public Context(final Server server, final Configuration localConf, final 
Configuration conf,
-      final FileSystem fs, final String peerId, final UUID clusterId,
-      final ReplicationPeer replicationPeer, final MetricsSource metrics,
-      final TableDescriptors tableDescriptors, final Abortable abortable) {
+    public Context(final ReplicationSourceInterface replicationSource, final 
Server server,
+      final Configuration localConf, final Configuration conf, final 
FileSystem fs,
+      final String peerId, final UUID clusterId, final ReplicationPeer 
replicationPeer,
+      final MetricsSource metrics, final TableDescriptors tableDescriptors,
+      final Abortable abortable) {
+      this.replicationSource = replicationSource;
       this.server = server;
       this.localConf = localConf;
       this.conf = conf;
@@ -79,6 +83,10 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
       this.abortable = abortable;
     }
 
+    public ReplicationSourceInterface getReplicationSource() {
+      return replicationSource;
+    }
+
     public Server getServer() {
       return server;
     }
@@ -208,7 +216,7 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
    * the context are assumed to be persisted in the target cluster.
    * @param replicateContext a context where WAL entries and other parameters 
can be obtained.
    */
-  boolean replicate(ReplicateContext replicateContext);
+  ReplicationResult replicate(ReplicateContext replicateContext);
 
   // The below methods are inspired by Guava Service. See
   // https://github.com/google/guava/wiki/ServiceExplained for overview of 
Guava Service.
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java
similarity index 53%
copy from 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
copy to 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java
index e6a39e7fede..03ed0ce6799 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationResult.java
@@ -17,52 +17,17 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import java.util.UUID;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.yetus.audience.InterfaceAudience;
 
-/**
- * A dummy replication endpoint that does nothing, for test use only.
- */
-@InterfaceAudience.Private
-public class DummyReplicationEndpoint extends BaseReplicationEndpoint {
-
-  @Override
-  public boolean canReplicateToSameCluster() {
-    return true;
-  }
-
-  @Override
-  public UUID getPeerUUID() {
-    return ctx.getClusterId();
-  }
-
-  @Override
-  public WALEntryFilter getWALEntryfilter() {
-    return null;
-  }
-
-  @Override
-  public boolean replicate(ReplicateContext replicateContext) {
-    return true;
-  }
-
-  @Override
-  public void start() {
-    startAsync();
-  }
-
-  @Override
-  public void stop() {
-    stopAsync();
-  }
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public enum ReplicationResult {
+  /* Batch has been replicated and persisted successfully. */
+  COMMITTED,
 
-  @Override
-  protected void doStart() {
-    notifyStarted();
-  }
+  /* Batch has been submitted for replication, but not persisted yet. */
+  SUBMITTED,
 
-  @Override
-  protected void doStop() {
-    notifyStopped();
-  }
+  /* Batch replicaton failed, should be re-tried */
+  FAILED
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java
index 229cec57e97..a9674407bd2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/VerifyWALEntriesReplicationEndpoint.java
@@ -59,10 +59,10 @@ public class VerifyWALEntriesReplicationEndpoint extends 
BaseReplicationEndpoint
   }
 
   @Override
-  public boolean replicate(ReplicateContext replicateContext) {
+  public ReplicationResult replicate(ReplicateContext replicateContext) {
     replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> 
e.getCells().stream())
       .forEach(this::checkCell);
-    return true;
+    return ReplicationResult.COMMITTED;
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 6bdc9773264..4f9a4909d78 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -48,6 +48,7 @@ import 
org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -424,7 +425,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
    * Do the shipping logic
    */
   @Override
-  public boolean replicate(ReplicateContext replicateContext) {
+  public ReplicationResult replicate(ReplicateContext replicateContext) {
     int sleepMultiplier = 1;
     int initialTimeout = replicateContext.getTimeout();
 
@@ -444,7 +445,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
         lastSinkFetchTime = EnvironmentEdgeManager.currentTime();
       }
       sleepForRetries("No sinks available at peer", sleepMultiplier);
-      return false;
+      return ReplicationResult.FAILED;
     }
 
     List<List<Entry>> batches = createBatches(replicateContext.getEntries());
@@ -458,7 +459,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
       try {
         // replicate the batches to sink side.
         parallelReplicate(replicateContext, batches);
-        return true;
+        return ReplicationResult.COMMITTED;
       } catch (IOException ioe) {
         if (ioe instanceof RemoteException) {
           if (dropOnDeletedTables && isTableNotFoundException(ioe)) {
@@ -467,14 +468,14 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
             batches = filterNotExistTableEdits(batches);
             if (batches.isEmpty()) {
               LOG.warn("After filter not exist table's edits, 0 edits to 
replicate, just return");
-              return true;
+              return ReplicationResult.COMMITTED;
             }
           } else if (dropOnDeletedColumnFamilies && 
isNoSuchColumnFamilyException(ioe)) {
             batches = filterNotExistColumnFamilyEdits(batches);
             if (batches.isEmpty()) {
               LOG.warn("After filter not exist column family's edits, 0 edits 
to replicate, "
                 + "just return");
-              return true;
+              return ReplicationResult.COMMITTED;
             }
           } else {
             LOG.warn("{} Peer encountered RemoteException, rechecking all 
sinks: ", logPeerId(),
@@ -506,7 +507,7 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
         }
       }
     }
-    return false; // in case we exited before replicating
+    return ReplicationResult.FAILED; // in case we exited before replicating
   }
 
   protected boolean isPeerEnabled() {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 094fa4aaa78..d6d59a39b52 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueData;
 import org.apache.hadoop.hbase.replication.ReplicationQueueId;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -166,6 +167,8 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
    */
   private final List<WALEntryFilter> baseFilterOutWALEntries;
 
+  private final Map<String, WALEntryBatch> lastEntryBatch = new 
ConcurrentHashMap<>();
+
   ReplicationSource() {
     // Default, filters *in* all WALs but meta WALs & filters *out* all 
WALEntries of System Tables.
     this(p -> !AbstractFSWALProvider.isMetaFile(p),
@@ -318,8 +321,8 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     if (server instanceof HRegionServer) {
       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
     }
-    replicationEndpoint
-      .init(new ReplicationEndpoint.Context(server, conf, 
replicationPeer.getConfiguration(), fs,
+    replicationEndpoint.init(
+      new ReplicationEndpoint.Context(this, server, conf, 
replicationPeer.getConfiguration(), fs,
         replicationPeer.getId(), clusterId, replicationPeer, metrics, 
tableDescriptors, server));
     replicationEndpoint.start();
     replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
@@ -861,4 +864,32 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   public long getTotalReplicatedEdits() {
     return totalReplicatedEdits.get();
   }
+
+  @Override
+  public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, 
ReplicationResult replicated) {
+    String walName = entryBatch.getLastWalPath().getName();
+    String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(walName);
+
+    synchronized (lastEntryBatch) { // Synchronize addition and processing
+      lastEntryBatch.put(walPrefix, entryBatch);
+
+      if (replicated == ReplicationResult.COMMITTED) {
+        processAndClearEntries();
+      }
+    }
+  }
+
+  public void persistOffsets() {
+    synchronized (lastEntryBatch) {
+      processAndClearEntries();
+    }
+  }
+
+  private void processAndClearEntries() {
+    // Process all entries
+    lastEntryBatch
+      .forEach((prefix, batch) -> 
getSourceManager().logPositionAndCleanOldLogs(this, batch));
+    // Clear all processed entries
+    lastEntryBatch.clear();
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 69ad2887064..f482cc73e71 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueData;
 import org.apache.hadoop.hbase.replication.ReplicationQueueId;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -207,7 +208,11 @@ public interface ReplicationSourceInterface {
    * @param entryBatch the wal entry batch we just shipped
    * @return The instance of queueStorage used by this ReplicationSource.
    */
-  default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
-    getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
+  default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, 
ReplicationResult replicated) {
+
+  }
+
+  default public void persistOffsets() {
+
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 6d0730d76b6..ee819faa77b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -155,7 +156,7 @@ public class ReplicationSourceShipper extends Thread {
     List<Entry> entries = entryBatch.getWalEntries();
     int sleepMultiplier = 0;
     if (entries.isEmpty()) {
-      updateLogPosition(entryBatch);
+      updateLogPosition(entryBatch, ReplicationResult.COMMITTED);
       return;
     }
     int currentSize = (int) entryBatch.getHeapSize();
@@ -182,21 +183,23 @@ public class ReplicationSourceShipper extends Thread {
 
         long startTimeNs = System.nanoTime();
         // send the edits to the endpoint. Will block until the edits are 
shipped and acknowledged
-        boolean replicated = 
source.getReplicationEndpoint().replicate(replicateContext);
+        ReplicationResult replicated = 
source.getReplicationEndpoint().replicate(replicateContext);
         long endTimeNs = System.nanoTime();
 
-        if (!replicated) {
+        if (replicated == ReplicationResult.FAILED) {
           continue;
         } else {
           sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
         }
-        // Clean up hfile references
-        for (Entry entry : entries) {
-          cleanUpHFileRefs(entry.getEdit());
-          LOG.trace("shipped entry {}: ", entry);
+        if (replicated == ReplicationResult.COMMITTED) {
+          // Clean up hfile references
+          for (Entry entry : entries) {
+            cleanUpHFileRefs(entry.getEdit());
+            LOG.trace("shipped entry {}: ", entry);
+          }
         }
         // Log and clean up WAL logs
-        updateLogPosition(entryBatch);
+        updateLogPosition(entryBatch, replicated);
 
         // offsets totalBufferUsed by deducting shipped batchSize (excludes 
bulk load size)
         // this sizeExcludeBulkLoad has to use same calculation that when 
calling
@@ -253,7 +256,7 @@ public class ReplicationSourceShipper extends Thread {
     }
   }
 
-  private boolean updateLogPosition(WALEntryBatch batch) {
+  private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult 
replicated) {
     boolean updated = false;
     // if end of file is true, then the logPositionAndCleanOldLogs method will 
remove the file
     // record on zk, so let's call it. The last wal position maybe zero if end 
of file is true and
@@ -263,7 +266,7 @@ public class ReplicationSourceShipper extends Thread {
       batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
         || batch.getLastWalPosition() != currentPosition
     ) {
-      source.logPositionAndCleanOldLogs(batch);
+      source.logPositionAndCleanOldLogs(batch, replicated);
       updated = true;
     }
     // if end of file is true, then we can just skip to the next file in queue.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
index b97a08c01c3..a32ce78b0c7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -63,7 +64,7 @@ public class VisibilityReplicationEndpoint implements 
ReplicationEndpoint {
   }
 
   @Override
-  public boolean replicate(ReplicateContext replicateContext) {
+  public ReplicationResult replicate(ReplicateContext replicateContext) {
     if (!delegator.canReplicateToSameCluster()) {
       // Only when the replication is inter cluster replication we need to
       // convert the visibility tags to
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
index e6a39e7fede..f0e627316cd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DummyReplicationEndpoint.java
@@ -42,8 +42,8 @@ public class DummyReplicationEndpoint extends 
BaseReplicationEndpoint {
   }
 
   @Override
-  public boolean replicate(ReplicateContext replicateContext) {
-    return true;
+  public ReplicationResult replicate(ReplicateContext replicateContext) {
+    return ReplicationResult.COMMITTED;
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
index f54c3931699..a8c76033d02 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
@@ -81,7 +81,7 @@ public class SerialReplicationTestBase {
     }
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
       synchronized (WRITER) {
         try {
           for (Entry entry : replicateContext.getEntries()) {
@@ -92,7 +92,7 @@ public class SerialReplicationTestBase {
           throw new UncheckedIOException(e);
         }
       }
-      return true;
+      return ReplicationResult.COMMITTED;
     }
 
     @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
index 058564dc0ec..d9a75b8ca8a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -59,8 +59,8 @@ public class TestHBaseReplicationEndpoint {
     when(replicationPeer.getPeerConfig()).thenReturn(peerConfig);
     
when(peerConfig.getClusterKey()).thenReturn("hbase+zk://server1:2181/hbase");
     ReplicationEndpoint.Context context =
-      new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), 
UTIL.getConfiguration(), null,
-        null, null, replicationPeer, null, null, null);
+      new ReplicationEndpoint.Context(null, null, UTIL.getConfiguration(), 
UTIL.getConfiguration(),
+        null, null, null, replicationPeer, null, null, null);
     endpoint = new DummyHBaseReplicationEndpoint();
     endpoint.init(context);
   }
@@ -199,8 +199,8 @@ public class TestHBaseReplicationEndpoint {
     }
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
-      return false;
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
+      return ReplicationResult.FAILED;
     }
 
     @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java
index 70cae18b456..c98b46c8e4b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNonHBaseReplicationEndpoint.java
@@ -127,9 +127,9 @@ public class TestNonHBaseReplicationEndpoint {
     }
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
       REPLICATED.set(true);
-      return true;
+      return ReplicationResult.COMMITTED;
     }
 
     @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 70a6d73c620..f53d9acc24f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -415,7 +415,7 @@ public class TestReplicationBase {
     }
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
       replicateCount.incrementAndGet();
       replicatedEntries.addAll(replicateContext.getEntries());
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 057a9f3567f..77cd5da8de0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -463,10 +463,10 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
     }
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
       replicateCount.incrementAndGet();
       lastEntries = new ArrayList<>(replicateContext.entries);
-      return true;
+      return ReplicationResult.COMMITTED;
     }
 
     @Override
@@ -526,12 +526,12 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
     }
 
     @Override
-    public boolean replicate(ReplicateContext context) {
+    public ReplicationResult replicate(ReplicateContext context) {
       try {
         Thread.sleep(duration);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        return false;
+        return ReplicationResult.FAILED;
       }
       return super.replicate(context);
     }
@@ -548,9 +548,9 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
     }
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
-      boolean success = super.replicate(replicateContext);
-      if (success) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
+      ReplicationResult success = super.replicate(replicateContext);
+      if (success == ReplicationResult.COMMITTED) {
         replicateCount.addAndGet(replicateContext.entries.size());
       }
       return success;
@@ -577,7 +577,7 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
     static AtomicBoolean replicated = new AtomicBoolean(false);
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
       try {
         // check row
         doAssert(row);
@@ -589,7 +589,7 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
       LOG.info("Replicated " + Bytes.toString(row) + ", count=" + 
replicateCount.get());
 
       replicated.set(replicateCount.get() > COUNT); // first 10 times, we 
return false
-      return replicated.get();
+      return replicated.get() ? ReplicationResult.COMMITTED : 
ReplicationResult.FAILED;
     }
   }
 
@@ -598,14 +598,14 @@ public class TestReplicationEndpoint extends 
TestReplicationBase {
     static AtomicReference<Exception> ex = new AtomicReference<>(null);
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
       try {
         super.replicate(replicateContext);
         doAssert(row);
       } catch (Exception e) {
         ex.set(e);
       }
-      return true;
+      return ReplicationResult.COMMITTED;
     }
 
     @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java
index b990916ae75..50b0911970a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyCellsReplicationEndpoint.java
@@ -71,7 +71,7 @@ public class TestVerifyCellsReplicationEndpoint {
   public static final class EndpointForTest extends 
VerifyWALEntriesReplicationEndpoint {
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
       LOG.info(replicateContext.getEntries().toString());
       
replicateContext.entries.stream().map(WAL.Entry::getEdit).map(WALEdit::getCells)
         .forEachOrdered(CELLS::addAll);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java
index 7b108f5ca14..92e7c8290f0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestHBaseInterClusterReplicationEndpointFilterEdits.java
@@ -83,8 +83,8 @@ public class 
TestHBaseInterClusterReplicationEndpointFilterEdits {
     when(rpc.isSerial()).thenReturn(false);
     when(replicationPeer.getPeerConfig()).thenReturn(rpc);
     when(rpc.getClusterKey()).thenReturn("hbase+zk://localhost:2181");
-    Context context = new Context(null, UTIL.getConfiguration(), 
UTIL.getConfiguration(), null,
-      null, null, replicationPeer, null, null, null);
+    Context context = new Context(null, null, UTIL.getConfiguration(), 
UTIL.getConfiguration(),
+      null, null, null, replicationPeer, null, null, null);
     endpoint = new HBaseInterClusterReplicationEndpoint();
     endpoint.init(context);
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
index 66f04dca36d..d7b5bcdcccb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -94,7 +95,7 @@ public class TestRaceWhenCreatingReplicationSource {
     }
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
       synchronized (WRITER) {
         try {
           for (Entry entry : replicateContext.getEntries()) {
@@ -105,7 +106,7 @@ public class TestRaceWhenCreatingReplicationSource {
           throw new UncheckedIOException(e);
         }
       }
-      return true;
+      return ReplicationResult.COMMITTED;
     }
 
     @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 663b444dc4e..c99f25380de 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -56,6 +56,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueId;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
@@ -94,13 +95,13 @@ public class TestReplicationSourceManager {
     private String clusterKey;
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
       // if you want to block the replication, for example, do not want the 
recovered source to be
       // removed
       if (clusterKey.endsWith("error")) {
         throw new RuntimeException("Inject error");
       }
-      return true;
+      return ReplicationResult.COMMITTED;
     }
 
     @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index 979db712ef3..cdbd1c73a2a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.replication.TestReplicationBase;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -218,7 +219,7 @@ public class TestReplicator extends TestReplicationBase {
     }
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
       try {
         await();
       } catch (InterruptedException e) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
index 7d5a5627d2c..ffbc0d2cee5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
@@ -65,6 +65,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.SecurityTests;
@@ -473,8 +474,8 @@ public class TestVisibilityLabelsReplication {
     }
 
     @Override
-    public boolean replicate(ReplicateContext replicateContext) {
-      boolean ret = super.replicate(replicateContext);
+    public ReplicationResult replicate(ReplicateContext replicateContext) {
+      ReplicationResult ret = super.replicate(replicateContext);
       lastEntries = replicateContext.getEntries();
       replicateCount.incrementAndGet();
       return ret;


Reply via email to