This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 66d9077bfd Phoenix-7632 Adding ReplicationLogProcessor Component 
(#2335)
66d9077bfd is described below

commit 66d9077bfda4fb6919e713debcd6daec5969f2bd
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Tue Dec 23 00:21:36 2025 +0530

    Phoenix-7632 Adding ReplicationLogProcessor Component (#2335)
---
 .../metrics/MetricsReplicationLogProcessor.java    |   88 +
 .../MetricsReplicationLogProcessorImpl.java        |  106 ++
 .../ReplicationLogProcessorMetricValues.java       |   58 +
 .../reader/ReplicationLogProcessor.java            |  540 ++++++
 .../phoenix/replication/reader/package-info.java   |   23 +
 .../reader/ReplicationLogProcessorTestIT.java      | 1885 ++++++++++++++++++++
 6 files changed, 2700 insertions(+)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogProcessor.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogProcessor.java
new file mode 100644
index 0000000000..5ed7bd8e6f
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogProcessor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+/** Interface for metrics related to ReplicationLogProcessor operations. */
+public interface MetricsReplicationLogProcessor extends BaseSource {
+
+  String METRICS_NAME = "ReplicationLogProcessor";
+  String METRICS_CONTEXT = "phoenix";
+  String METRICS_DESCRIPTION = "Metrics about Replication Log Processor for an 
HA Group";
+  String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+  String FAILED_MUTATIONS_COUNT = "failedMutationsCount";
+  String FAILED_MUTATIONS_COUNT_DESC = "Number of failed mutations";
+  String FAILED_BATCH_COUNT = "failedBatchCount";
+  String FAILED_BATCH_COUNT_DESC = "Number of failed batches";
+  String LOG_FILE_REPLAY_FAILURE_COUNT = "logFileReplayFailureCount";
+  String LOG_FILE_REPLAY_FAILURE_COUNT_DESC = "Number of files failed to 
replay";
+  String LOG_FILE_REPLAY_SUCCESS_COUNT = "logFileReplaySuccessCount";
+  String LOG_FILE_REPLAY_SUCCESS_COUNT_DESC = "Number of files successfully to 
replayed";
+  String BATCH_REPLAY_TIME = "batchReplayTimeMs";
+  String BATCH_REPLAY_TIME_DESC =
+    "Histogram of time taken for replaying a batch of log file in 
milliseconds";
+  String LOG_FILE_REPLAY_TIME = "logFileReplayTimeMs";
+  String LOG_FILE_REPLAY_TIME_DESC =
+    "Histogram of time taken for replaying a log file in milliseconds";
+
+  /**
+   * Increments the counter for failed mutations. This counter tracks the 
number of mutations that
+   * failed during processing.
+   */
+  void incrementFailedMutationsCount(long delta);
+
+  /**
+   * Increments the counter for log file replay failures. This counter tracks 
the number of log
+   * files that failed to replay.
+   */
+  void incrementFailedBatchCount();
+
+  /**
+   * Increments the counter for log file replay failures. This counter tracks 
the number of log
+   * files that failed to replay.
+   */
+  void incrementLogFileReplayFailureCount();
+
+  /**
+   * Increments the counter for log file replay successes. This counter tracks 
the number of log
+   * files that were successfully replayed.
+   */
+  void incrementLogFileReplaySuccessCount();
+
+  /**
+   * Update the time taken for replaying a batch of mutations in milliseconds.
+   * @param timeMs Time taken in milliseconds
+   */
+  void updateBatchReplayTime(long timeMs);
+
+  /**
+   * Update the time taken for replaying a log file in milliseconds.
+   * @param timeMs Time taken in milliseconds
+   */
+  void updateLogFileReplayTime(long timeMs);
+
+  /**
+   * Unregister this metrics source.
+   */
+  void close();
+
+  // Get current values for testing
+  ReplicationLogProcessorMetricValues getCurrentMetricValues();
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogProcessorImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogProcessorImpl.java
new file mode 100644
index 0000000000..be8f7715c9
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogProcessorImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableHistogram;
+
+/** Implementation of metrics source for ReplicationLogProcessor operations. */
+public class MetricsReplicationLogProcessorImpl extends BaseSourceImpl
+  implements MetricsReplicationLogProcessor {
+
+  private String groupMetricsContext;
+  private final MutableFastCounter failedMutationsCount;
+  private final MutableFastCounter failedBatchCount;
+  private final MutableFastCounter logFileReplayFailureCount;
+  private final MutableFastCounter logFileReplaySuccessCount;
+  private final MutableHistogram batchReplayTime;
+  private final MutableHistogram logFileReplayTime;
+
+  public MetricsReplicationLogProcessorImpl(final String haGroupName) {
+    this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT,
+      METRICS_JMX_CONTEXT + ",haGroup=" + haGroupName);
+    groupMetricsContext = METRICS_JMX_CONTEXT + ",haGroup=" + haGroupName;
+  }
+
+  public MetricsReplicationLogProcessorImpl(String metricsName, String 
metricsDescription,
+    String metricsContext, String metricsJmxContext) {
+    super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+    failedMutationsCount =
+      getMetricsRegistry().newCounter(FAILED_MUTATIONS_COUNT, 
FAILED_MUTATIONS_COUNT_DESC, 0L);
+    failedBatchCount =
+      getMetricsRegistry().newCounter(FAILED_BATCH_COUNT, 
FAILED_BATCH_COUNT_DESC, 0L);
+    logFileReplayFailureCount = 
getMetricsRegistry().newCounter(LOG_FILE_REPLAY_FAILURE_COUNT,
+      LOG_FILE_REPLAY_FAILURE_COUNT_DESC, 0L);
+    logFileReplaySuccessCount = 
getMetricsRegistry().newCounter(LOG_FILE_REPLAY_SUCCESS_COUNT,
+      LOG_FILE_REPLAY_SUCCESS_COUNT_DESC, 0L);
+    batchReplayTime = getMetricsRegistry().newHistogram(BATCH_REPLAY_TIME, 
BATCH_REPLAY_TIME_DESC);
+    logFileReplayTime =
+      getMetricsRegistry().newHistogram(LOG_FILE_REPLAY_TIME, 
LOG_FILE_REPLAY_TIME_DESC);
+  }
+
+  @Override
+  public void incrementFailedMutationsCount(long delta) {
+    failedMutationsCount.incr(delta);
+  }
+
+  @Override
+  public void incrementFailedBatchCount() {
+    failedBatchCount.incr();
+  }
+
+  @Override
+  public void incrementLogFileReplayFailureCount() {
+    logFileReplayFailureCount.incr();
+  }
+
+  @Override
+  public void incrementLogFileReplaySuccessCount() {
+    logFileReplaySuccessCount.incr();
+  }
+
+  @Override
+  public void updateBatchReplayTime(long timeMs) {
+    batchReplayTime.add(timeMs);
+  }
+
+  @Override
+  public void updateLogFileReplayTime(long timeMs) {
+    logFileReplayTime.add(timeMs);
+  }
+
+  @Override
+  public void close() {
+    // Unregister this metrics source
+    DefaultMetricsSystem.instance().unregisterSource(groupMetricsContext);
+  }
+
+  @Override
+  public ReplicationLogProcessorMetricValues getCurrentMetricValues() {
+    return new 
ReplicationLogProcessorMetricValues(failedMutationsCount.value(),
+      logFileReplayFailureCount.value(), logFileReplaySuccessCount.value(),
+      logFileReplayTime.getMax(), batchReplayTime.getMax());
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return groupMetricsContext;
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogProcessorMetricValues.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogProcessorMetricValues.java
new file mode 100644
index 0000000000..4911ac4bd7
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogProcessorMetricValues.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.metrics;
+
+/** Class to hold the values of all metrics tracked by the 
ReplicationLogProcessor metric source. */
+public class ReplicationLogProcessorMetricValues {
+
+  private final long failedMutationsCount;
+  private final long logFileReplayFailureCount;
+  private final long logFileReplaySuccessCount;
+  private final long logFileReplayTime;
+  private final long logFileBatchReplayTime;
+
+  public ReplicationLogProcessorMetricValues(long failedMutationsCount,
+    long logFileReplayFailureCount, long logFileReplaySuccessCount, long 
logFileReplayTime,
+    long logFileBatchReplayTime) {
+    this.failedMutationsCount = failedMutationsCount;
+    this.logFileReplayFailureCount = logFileReplayFailureCount;
+    this.logFileReplaySuccessCount = logFileReplaySuccessCount;
+    this.logFileReplayTime = logFileReplayTime;
+    this.logFileBatchReplayTime = logFileBatchReplayTime;
+  }
+
+  public long getFailedMutationsCount() {
+    return failedMutationsCount;
+  }
+
+  public long getLogFileReplayFailureCount() {
+    return logFileReplayFailureCount;
+  }
+
+  public long getLogFileReplaySuccessCount() {
+    return logFileReplaySuccessCount;
+  }
+
+  public long getLogFileReplayTime() {
+    return logFileReplayTime;
+  }
+
+  public long getLogFileBatchReplayTime() {
+    return logFileBatchReplayTime;
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
new file mode 100644
index 0000000000..7215ebe20e
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogProcessor;
+import 
org.apache.phoenix.replication.metrics.MetricsReplicationLogProcessorImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class ReplicationLogProcessor implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessor.class);
+
+  /**
+   * The maximum count of mutations to process in single batch while reading 
replication log file
+   */
+  public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE =
+    "phoenix.replication.log.standby.replay.batch.size";
+
+  /**
+   * The default batch size for reading the replication log file. Assuming 
each log record to be 10
+   * KB (un-compressed) and allow at max 64 MB of in-memory records to be 
processed
+   */
+  public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = 
6400;
+
+  /**
+   * The maximum total size of mutations to process in single batch while 
reading replication log
+   * file
+   */
+  public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE_BYTES =
+    "phoenix.replication.log.standby.replay.batch.size.bytes";
+
+  /**
+   * The default batch size in bytes for reading the replication log file (64 
MB)
+   */
+  public static final long 
DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE_BYTES =
+    64 * 1024 * 1024L;
+
+  /**
+   * The number of threads to apply mutations via async hbase client
+   */
+  public static final String REPLICATION_STANDBY_LOG_REPLAY_THREAD_POOL_SIZE =
+    "phoenix.replication.log.standby.replay.thread.pool.size";
+
+  /**
+   * The default number of threads for applying mutations via async hbase 
client
+   */
+  public static final int 
DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_THREAD_POOL_SIZE = 5;
+
+  /**
+   * The maximum number of retries for HBase client operations while applying 
the mutations
+   */
+  public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT =
+    "phoenix.replication.standby.hbase.client.retries.number";
+
+  /**
+   * The default number of retries for HBase client operations while applying 
the mutations.
+   */
+  public static final int 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4;
+
+  /**
+   * The timeout for HBase client operations while applying the mutations.
+   */
+  public static final String 
REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS =
+    "phoenix.replication.standby.hbase.client.operations.timeout";
+
+  /**
+   * The default timeout for HBase client operations while applying the 
mutations.
+   */
+  public static final long 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = 8000;
+
+  /**
+   * The maximum number of retry attempts for failed batch operations.
+   */
+  public static final String REPLICATION_STANDBY_BATCH_RETRY_COUNT =
+    "phoenix.replication.standby.batch.retry.count";
+
+  /**
+   * The default number of retry attempts for failed batch operations.
+   */
+  public static final int DEFAULT_REPLICATION_STANDBY_BATCH_RETRY_COUNT = 2;
+
+  /**
+   * The maximum delay for retry attempts in milliseconds.
+   */
+  public static final String REPLICATION_STANDBY_BATCH_RETRY_MAX_DELAY_MS =
+    "phoenix.replication.standby.batch.retry.max.delay.ms";
+
+  /**
+   * The default maximum delay for retry attempts in milliseconds.
+   */
+  public static final long 
DEFAULT_REPLICATION_STANDBY_BATCH_RETRY_MAX_DELAY_MS = 10000;
+
+  private final String haGroupName;
+
+  private final Configuration conf;
+
+  private final ExecutorService executorService;
+
+  /**
+   * This {@link AsyncConnection} is used for handling mutations
+   */
+  private volatile AsyncConnection asyncConnection;
+
+  private final int batchSize;
+
+  private final long batchSizeBytes;
+
+  private final int batchRetryCount;
+
+  private final long maxRetryDelayMs;
+
+  private final MetricsReplicationLogProcessor metrics;
+
+  /** Cache of ReplicationLogGroup instances by HA Group Name */
+  private static final ConcurrentHashMap<String, ReplicationLogProcessor> 
INSTANCES =
+    new ConcurrentHashMap<>();
+
+  /**
+   * Get or create a ReplicationLogProcessor instance for the given HA Group.
+   * @param conf        Configuration object
+   * @param haGroupName The HA Group name
+   * @return ReplicationLogProcessor instance
+   */
+  public static ReplicationLogProcessor get(Configuration conf, String 
haGroupName) {
+    return INSTANCES.computeIfAbsent(haGroupName,
+      k -> new ReplicationLogProcessor(conf, haGroupName));
+  }
+
+  /**
+   * Creates a new ReplicationLogProcessor with the given configuration and 
executor service.
+   * @param conf        The configuration to use
+   * @param haGroupName The HA group name
+   */
+  protected ReplicationLogProcessor(final Configuration conf, final String 
haGroupName) {
+    // Create a copy of configuration as some of the properties would be
+    // overridden
+    this.conf = HBaseConfiguration.create(conf);
+    this.haGroupName = haGroupName;
+    this.batchSize = 
this.conf.getInt(REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
+      DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE);
+    this.batchSizeBytes = 
this.conf.getLong(REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE_BYTES,
+      DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE_BYTES);
+    this.batchRetryCount = 
this.conf.getInt(REPLICATION_STANDBY_BATCH_RETRY_COUNT,
+      DEFAULT_REPLICATION_STANDBY_BATCH_RETRY_COUNT);
+    this.maxRetryDelayMs = 
this.conf.getLong(REPLICATION_STANDBY_BATCH_RETRY_MAX_DELAY_MS,
+      DEFAULT_REPLICATION_STANDBY_BATCH_RETRY_MAX_DELAY_MS);
+    final int threadPoolSize = 
this.conf.getInt(REPLICATION_STANDBY_LOG_REPLAY_THREAD_POOL_SIZE,
+      DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_THREAD_POOL_SIZE);
+    decorateConf();
+    this.metrics = createMetricsSource();
+    this.executorService = Executors.newFixedThreadPool(threadPoolSize, new 
ThreadFactoryBuilder()
+      .setNameFormat("Phoenix-Replication-Log-Processor-" + haGroupName + 
"-%d").build());
+  }
+
+  /**
+   * Decorate the Configuration object to make replication more receptive to 
delays by reducing the
+   * timeout and number of retries.
+   */
+  private void decorateConf() {
+    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      this.conf.getInt(REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT,
+        DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT));
+    this.conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+      this.conf.getLong(REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS,
+        DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS));
+  }
+
+  public void processLogFile(FileSystem fs, Path filePath) throws IOException {
+
+    // Map from Table Name to List of Mutations
+    Map<TableName, List<Mutation>> tableToMutationsMap = new HashMap<>();
+
+    // Track the total number of processed records from input log file
+    long totalProcessed = 0;
+
+    // Track the current batch size as records will be processed in batch size 
of
+    // {@link REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE}
+    long currentBatchSize = 0;
+
+    // Track the current batch size in bytes
+    long currentBatchSizeBytes = 0;
+
+    LogFileReader logFileReader = null;
+
+    long startTime = System.currentTimeMillis();
+
+    try {
+      // Create the LogFileReader for given path
+      logFileReader = createLogFileReader(fs, filePath);
+
+      for (LogFile.Record record : logFileReader) {
+        final TableName tableName = 
TableName.valueOf(record.getHBaseTableName());
+        final Mutation mutation = record.getMutation();
+
+        tableToMutationsMap.computeIfAbsent(tableName, k -> new 
ArrayList<>()).add(mutation);
+
+        // Increment current batch size and current batch size bytes
+        currentBatchSize++;
+        currentBatchSizeBytes += mutation.heapSize();
+
+        // Process when we reach either the batch count or size limit
+        if (currentBatchSize >= getBatchSize() || currentBatchSizeBytes >= 
getBatchSizeBytes()) {
+          processReplicationLogBatch(tableToMutationsMap);
+          totalProcessed += currentBatchSize;
+          tableToMutationsMap.clear();
+          currentBatchSize = 0;
+          currentBatchSizeBytes = 0;
+        }
+      }
+
+      // Process any remaining mutations
+      if (currentBatchSize > 0) {
+        processReplicationLogBatch(tableToMutationsMap);
+        totalProcessed += currentBatchSize;
+      }
+
+      LOG.info("Completed processing log file {}. Total mutations processed: 
{}",
+        logFileReader.getContext().getFilePath(), totalProcessed);
+      getMetrics().incrementLogFileReplaySuccessCount();
+    } catch (Exception e) {
+      LOG.error("Error while processing replication log file", e);
+      getMetrics().incrementLogFileReplayFailureCount();
+      throw new IOException("Failed to process log file " + filePath, e);
+    } finally {
+      closeReader(logFileReader);
+      // Update log file replay time metric
+      long endTime = System.currentTimeMillis();
+      getMetrics().updateLogFileReplayTime(endTime - startTime);
+    }
+  }
+
+  /**
+   * Creates a LogFileReader for the specified file path. Validates that the 
file exists and
+   * initializes the reader with the given file system and path.
+   * @param fs       The file system to use for reading
+   * @param filePath The path to the log file
+   * @return A configured LogFileReader instance
+   * @throws IOException if the file doesn't exist or initialization fails
+   */
+  protected LogFileReader createLogFileReader(FileSystem fs, Path filePath) 
throws IOException {
+    // Ensure that file exists. If we face exception while checking the path 
itself,
+    // method would throw same exception back to the caller
+    if (!fs.exists(filePath)) {
+      throw new IOException("Log file does not exist: " + filePath);
+    }
+    LogFileReader logFileReader = new LogFileReader();
+    try {
+      LogFileReaderContext logFileReaderContext =
+        new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath);
+      logFileReader.init(logFileReaderContext);
+    } catch (IOException exception) {
+      LOG.error("Failed to initialize new LogFileReader for path {}", 
filePath, exception);
+      throw exception;
+    }
+    return logFileReader;
+  }
+
+  /**
+   * Closes the given writer, logging any errors that occur during close.
+   */
+  protected void closeReader(LogFileReader logFileReader) {
+    if (logFileReader == null) {
+      return;
+    }
+    try {
+      logFileReader.close();
+    } catch (IOException exception) {
+      LOG.error("Error while closing LogFileReader: {}", logFileReader, 
exception);
+    }
+  }
+
+  protected void processReplicationLogBatch(Map<TableName, List<Mutation>> 
tableMutationMap)
+    throws IOException {
+
+    if (tableMutationMap == null || tableMutationMap.isEmpty()) {
+      return;
+    }
+
+    // Track failed operations for retry
+    Map<TableName, List<Mutation>> currentOperations = tableMutationMap;
+    IOException lastError = null;
+
+    int attempt = 0;
+
+    long startTime = System.currentTimeMillis();
+
+    while (attempt <= batchRetryCount && !currentOperations.isEmpty()) {
+      if (attempt > 0) {
+        LOG.warn("Retrying failed batch operations, attempt {} of {}", 
attempt, batchRetryCount);
+      }
+
+      try {
+        // Apply mutations and get any failed operations
+        ApplyMutationBatchResult result = applyMutations(currentOperations);
+
+        // If no failures, we're done
+        if (!result.hasFailures()) {
+          return;
+        }
+
+        // Update current operations for next retry
+        currentOperations = result.getFailedMutations();
+        lastError = new IOException("Failed to apply the mutations", 
result.getException());
+      } catch (IOException e) {
+        lastError = e;
+      }
+      attempt++;
+      getMetrics().incrementFailedBatchCount();
+      // Add delay between retries (exponential backoff)
+      if (attempt <= batchRetryCount && !currentOperations.isEmpty()) {
+        try {
+          long delayMs = calculateRetryDelay(attempt);
+          Thread.sleep(delayMs);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted during retry delay", e);
+        }
+      }
+    }
+    // Update batch replay time metrics
+    long endTime = System.currentTimeMillis();
+    getMetrics().updateBatchReplayTime(endTime - startTime);
+
+    // If we still have failed operations after all retries, throw the last 
error
+    if (!currentOperations.isEmpty() && lastError != null) {
+      LOG.error("Failed to process batch operations after {} retries. Failed 
tables: {}",
+        batchRetryCount, currentOperations.keySet());
+      throw lastError;
+    }
+  }
+
+  /**
+   * Calculates the delay time for retry attempts using exponential backoff.
+   * @param attempt The current retry attempt number (0-based)
+   * @return The delay time in milliseconds
+   */
+  protected long calculateRetryDelay(int attempt) {
+    return Math.min(1000L * (1L << attempt), maxRetryDelayMs);
+  }
+
+  /**
+   * Applies mutations to HBase tables and returns any failed operations.
+   * @param tableMutationMap Map of table names to their mutations
+   * @return ApplyMutationBatchResult containing failed mutations and any 
exceptions
+   * @throws IOException if there's an error applying mutations
+   */
+  protected ApplyMutationBatchResult applyMutations(Map<TableName, 
List<Mutation>> tableMutationMap)
+    throws IOException {
+
+    if (tableMutationMap == null || tableMutationMap.isEmpty()) {
+      return new ApplyMutationBatchResult(Collections.emptyMap(), null);
+    }
+
+    Map<TableName, List<Mutation>> failedOperations = new HashMap<>();
+    Map<TableName, Future<?>> futures = new HashMap<>();
+    Exception lastException = null;
+
+    // Submit batch operations
+    for (Map.Entry<TableName, List<Mutation>> entry : 
tableMutationMap.entrySet()) {
+      TableName tableName = entry.getKey();
+      List<Mutation> mutations = entry.getValue();
+      AsyncTable<?> table = getAsyncConnection().getTable(tableName, 
executorService);
+      futures.put(tableName, table.batchAll(mutations));
+    }
+
+    // Check results and track failures
+    for (Map.Entry<TableName, Future<?>> entry : futures.entrySet()) {
+      TableName tableName = entry.getKey();
+      Future<?> future = entry.getValue();
+      try {
+        FutureUtils.get(future);
+      } catch (IOException e) {
+        // Add failed mutations to retry list
+        failedOperations.put(tableName, tableMutationMap.get(tableName));
+        
getMetrics().incrementFailedMutationsCount(tableMutationMap.get(tableName).size());
+        LOG.debug("Failed to apply mutations for table {}: {}", tableName, 
e.getMessage());
+        lastException = e;
+      }
+    }
+
+    return new ApplyMutationBatchResult(failedOperations, lastException);
+  }
+
+  /**
+   * Return the {@link AsyncConnection} which is used for applying mutations. 
It ensures to create a
+   * new connection ONLY when it's not previously initialized or was closed
+   */
+  private AsyncConnection getAsyncConnection() throws IOException {
+    AsyncConnection existingAsyncConnection = asyncConnection;
+    if (existingAsyncConnection == null || existingAsyncConnection.isClosed()) 
{
+      synchronized (this) {
+        existingAsyncConnection = asyncConnection;
+        if (existingAsyncConnection == null || 
existingAsyncConnection.isClosed()) {
+          /**
+           * Get the AsyncConnection immediately.
+           */
+          existingAsyncConnection = 
FutureUtils.get(ConnectionFactory.createAsyncConnection(conf));
+          asyncConnection = existingAsyncConnection;
+        }
+      }
+    }
+    return existingAsyncConnection;
+  }
+
+  /**
+   * Closes the {@link AsyncConnection} and releases all associated resources.
+   * @throws IOException if there's an error closing the AsyncConnection
+   */
+  @Override
+  public void close() throws IOException {
+    synchronized (this) {
+      // Close the async connection
+      if (asyncConnection != null && !asyncConnection.isClosed()) {
+        asyncConnection.close();
+      }
+      asyncConnection = null;
+      // Shutdown the executor service
+      if (executorService != null && !executorService.isShutdown()) {
+        executorService.shutdownNow();
+      }
+      // Remove the instance from cache
+      INSTANCES.remove(haGroupName);
+    }
+  }
+
+  /** Creates a new metrics source for monitoring operations. */
+  protected MetricsReplicationLogProcessor createMetricsSource() {
+    return new MetricsReplicationLogProcessorImpl(haGroupName);
+  }
+
+  /** Returns the metrics source for monitoring replication log operations. */
+  public MetricsReplicationLogProcessor getMetrics() {
+    return metrics;
+  }
+
+  public int getBatchSize() {
+    return this.batchSize;
+  }
+
+  public long getBatchSizeBytes() {
+    return this.batchSizeBytes;
+  }
+
+  public int getHBaseClientRetriesCount() {
+    return this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT);
+  }
+
+  public long getHBaseClientOperationTimeout() {
+    return this.conf.getLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+      DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS);
+  }
+
+  public int getBatchRetryCount() {
+    return this.batchRetryCount;
+  }
+
+  public long getMaxRetryDelayMs() {
+    return this.maxRetryDelayMs;
+  }
+
+  public String getHaGroupName() {
+    return this.haGroupName;
+  }
+
+  protected ExecutorService getExecutorService() {
+    return this.executorService;
+  }
+
+  /**
+   * Result class for batch mutation operations containing failed mutations 
and any exceptions.
+   */
+  protected static class ApplyMutationBatchResult {
+    private final Map<TableName, List<Mutation>> failedMutations;
+    private final Exception exception;
+
+    public ApplyMutationBatchResult(final Map<TableName, List<Mutation>> 
failedMutations,
+      final Exception exception) {
+      this.failedMutations = failedMutations != null
+        ? Collections.unmodifiableMap(failedMutations)
+        : Collections.emptyMap();
+      this.exception = exception;
+    }
+
+    public Map<TableName, List<Mutation>> getFailedMutations() {
+      return failedMutations;
+    }
+
+    public Exception getException() {
+      return exception;
+    }
+
+    public boolean hasFailures() {
+      return !failedMutations.isEmpty();
+    }
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/package-info.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/package-info.java
new file mode 100644
index 0000000000..abadacaf87
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes and utilities for reading replication log 
files
+ * and applying mutations on target cluster
+ */
+package org.apache.phoenix.replication.reader;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
new file mode 100644
index 0000000000..ca3c4cfe97
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -0,0 +1,1885 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import 
org.apache.phoenix.replication.metrics.ReplicationLogProcessorMetricValues;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogProcessorTestIT extends ParallelStatsDisabledIT {
+
+  private static final String CREATE_TABLE_SQL_STATEMENT =
+    "CREATE TABLE %s (ID VARCHAR PRIMARY KEY, " + "COL_1 VARCHAR, COL_2 
VARCHAR, COL_3 BIGINT)";
+
+  private static final String UPSERT_SQL_STATEMENT = "upsert into %s values 
('%s', '%s', '%s', %s)";
+
+  private static final String PRINCIPAL = "replicationLogProcessor";
+
+  private final String testHAGroupName = "testHAGroupName";
+
+  @ClassRule
+  public static TemporaryFolder testFolder = new TemporaryFolder();
+
+  private static Configuration conf;
+  private static FileSystem localFs;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    conf = getUtility().getConfiguration();
+    localFs = FileSystem.getLocal(conf);
+  }
+
+  @AfterClass
+  public static void cleanUp() throws IOException {
+    localFs.delete(new Path(testFolder.getRoot().toURI()), true);
+  }
+
+  /**
+   * Tests successful creation of LogFileReader with a properly formatted log 
file.
+   */
+  @Test
+  public void testCreateLogFileReaderWithValidLogFile() throws IOException {
+    // Test with valid log file
+    Path validFilePath = new 
Path(testFolder.newFile("valid_log_file").toURI());
+    String tableName = "T_" + generateUniqueName();
+
+    // Create a valid log file with proper structure and one record
+    LogFileWriter writer = initLogFileWriter(validFilePath);
+
+    // Add a mutation to make it a proper log file with data
+    Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+    writer.append(tableName, 1, put);
+    writer.sync();
+    writer.close();
+
+    // Verify file exists and has content
+    assertTrue("Valid log file should exist", localFs.exists(validFilePath));
+    assertTrue("Valid log file should have content",
+      localFs.getFileStatus(validFilePath).getLen() > 0);
+
+    // Test createLogFileReader with valid file - should succeed
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    LogFileReader reader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+
+    // Verify reader is created successfully
+    assertNotNull("Reader should not be null for valid file", reader);
+    assertNotNull("Reader context should not be null", reader.getContext());
+    assertEquals("File path should match", validFilePath, 
reader.getContext().getFilePath());
+    assertEquals("File system should match", localFs, 
reader.getContext().getFileSystem());
+
+    // Verify we can read from the reader
+    assertTrue("Reader should have records", reader.iterator().hasNext());
+
+    // Clean up
+    reader.close();
+    replicationLogProcessor.close();
+  }
+
+  /**
+   * Tests error handling when attempting to create LogFileReader with a 
non-existent file.
+   */
+  @Test
+  public void testCreateLogFileReaderWithNonExistentFile() throws IOException {
+    Path nonExistentPath = new Path(testFolder.toString(), 
"non_existent_file");
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    try {
+      replicationLogProcessor.createLogFileReader(localFs, nonExistentPath);
+      fail("Should throw IOException for non-existent file");
+    } catch (IOException e) {
+      assertTrue("Error message should mention file does not exist and file 
path name",
+        e.getMessage().contains("Log file does not exist: " + 
nonExistentPath));
+    } finally {
+      replicationLogProcessor.close();
+    }
+  }
+
+  /**
+   * Tests error handling when attempting to create LogFileReader with an 
invalid/corrupted file.
+   */
+  @Test
+  public void testCreateLogFileReaderWithInvalidLogFile() throws IOException {
+    Path invalidFilePath = new 
Path(testFolder.newFile("invalid_file").toURI());
+    localFs.create(invalidFilePath).close(); // Create empty file
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    try {
+      replicationLogProcessor.createLogFileReader(localFs, invalidFilePath);
+      fail("Should throw IOException for invalid file");
+    } catch (IOException e) {
+      // Should throw some kind of IOException when trying to read header
+      assertTrue("Should throw IOException", true);
+    } finally {
+      // Delete the invalid file
+      localFs.delete(invalidFilePath);
+      replicationLogProcessor.close();
+    }
+  }
+
+  /**
+   * Tests the closeReader method with both null and valid LogFileReader 
instances.
+   */
+  @Test
+  public void testCloseReader() throws IOException {
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    replicationLogProcessor.closeReader(null);
+    Path filePath = new Path(testFolder.newFile("testCloseReader").toURI());
+    String tableName = "T_" + generateUniqueName();
+
+    // Create a valid log file with proper structure and one record
+    LogFileWriter writer = initLogFileWriter(filePath);
+
+    // Add a mutation to make it a proper log file with data
+    Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+    writer.append(tableName, 1, put);
+    writer.sync();
+    writer.close();
+
+    // Test with valid reader
+    LogFileReader reader = Mockito.spy(new LogFileReader());
+
+    reader.init(new 
LogFileReaderContext(conf).setFileSystem(localFs).setFilePath(filePath));
+
+    replicationLogProcessor.closeReader(reader);
+    replicationLogProcessor.close();
+
+    // Ensure reader's close method is called only once
+    Mockito.verify(reader, Mockito.times(1)).close();
+  }
+
+  /**
+   * Tests the calculateRetryDelay method with different configurations.
+   */
+  @Test
+  public void testCalculateRetryDelay() throws IOException {
+    // Test with default configuration
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+
+    // Test exponential backoff pattern with default max delay (10 seconds)
+    assertEquals("First retry should have 1 second delay", 1000L,
+      replicationLogProcessor.calculateRetryDelay(0));
+    assertEquals("Second retry should have 2 second delay", 2000L,
+      replicationLogProcessor.calculateRetryDelay(1));
+    assertEquals("Third retry should have 4 second delay", 4000L,
+      replicationLogProcessor.calculateRetryDelay(2));
+    assertEquals("Fourth retry should have 8 second delay", 8000L,
+      replicationLogProcessor.calculateRetryDelay(3));
+    assertEquals("Fifth retry should be capped at 10 seconds", 10000L,
+      replicationLogProcessor.calculateRetryDelay(4));
+    assertEquals("Sixth retry should be capped at 10 seconds", 10000L,
+      replicationLogProcessor.calculateRetryDelay(5));
+
+    // Clean up
+    replicationLogProcessor.close();
+
+    // Test with custom max delay configuration
+    Configuration customConf = new Configuration(conf);
+    long customMaxDelay = 5000L; // 5 seconds
+    
customConf.setLong(ReplicationLogProcessor.REPLICATION_STANDBY_BATCH_RETRY_MAX_DELAY_MS,
+      customMaxDelay);
+
+    ReplicationLogProcessor customProcessor =
+      new ReplicationLogProcessor(customConf, testHAGroupName);
+
+    // Test exponential backoff pattern with custom max delay
+    assertEquals("First retry should have 1 second delay", 1000L,
+      customProcessor.calculateRetryDelay(0));
+    assertEquals("Second retry should have 2 second delay", 2000L,
+      customProcessor.calculateRetryDelay(1));
+    assertEquals("Third retry should have 4 second delay", 4000L,
+      customProcessor.calculateRetryDelay(2));
+    assertEquals("Fourth retry should be capped at 5 seconds", 5000L,
+      customProcessor.calculateRetryDelay(3));
+    assertEquals("Fifth retry should be capped at 5 seconds", 5000L,
+      customProcessor.calculateRetryDelay(4));
+
+    // Clean up
+    customProcessor.close();
+
+    // Test with very small max delay
+    Configuration smallDelayConf = new Configuration(conf);
+    long smallMaxDelay = 1500L; // 1.5 seconds
+    
smallDelayConf.setLong(ReplicationLogProcessor.REPLICATION_STANDBY_BATCH_RETRY_MAX_DELAY_MS,
+      smallMaxDelay);
+
+    ReplicationLogProcessor smallDelayProcessor =
+      new ReplicationLogProcessor(smallDelayConf, testHAGroupName);
+
+    assertEquals("First retry should have 1 second delay", 1000L,
+      smallDelayProcessor.calculateRetryDelay(0));
+    assertEquals("Second retry should be capped at 1.5 seconds", 1500L,
+      smallDelayProcessor.calculateRetryDelay(1));
+    assertEquals("Third retry should be capped at 1.5 seconds", 1500L,
+      smallDelayProcessor.calculateRetryDelay(2));
+
+    // Clean up
+    smallDelayProcessor.close();
+  }
+
+  /**
+   * Tests that configuration parameters are properly read and applied.
+   */
+  @Test
+  public void testReplicationLogProcessorConfiguration() throws IOException {
+    // Test that all default configurations are used when no custom 
configuration is provided
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+
+    // Validate default batch size
+    assertEquals("Default batch size should be used",
+      
ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
+      replicationLogProcessor.getBatchSize());
+
+    // Validate default batch size bytes
+    assertEquals("Default batch size bytes should be used",
+      
ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE_BYTES,
+      replicationLogProcessor.getBatchSizeBytes());
+
+    // Validate default HBase client retries count
+    assertEquals("Default HBase client retries count should be used",
+      
ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT,
+      replicationLogProcessor.getHBaseClientRetriesCount());
+
+    // Validate default HBase client operation timeout
+    assertEquals("Default HBase client operation timeout should be used",
+      
ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS,
+      replicationLogProcessor.getHBaseClientOperationTimeout());
+
+    // Validate default batch retry count
+    assertEquals("Default batch retry count should be used",
+      ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_BATCH_RETRY_COUNT,
+      replicationLogProcessor.getBatchRetryCount());
+
+    // Validate default max retry delay
+    assertEquals("Default max retry delay should be used",
+      
ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_BATCH_RETRY_MAX_DELAY_MS,
+      replicationLogProcessor.getMaxRetryDelayMs());
+
+    // Validate default thread pool size
+    ExecutorService executorService = 
replicationLogProcessor.getExecutorService();
+    assertNotNull("Executor service must not be null", executorService);
+    assertEquals("Default thread pool size should be used",
+      
ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_THREAD_POOL_SIZE,
+      ((ThreadPoolExecutor) executorService).getCorePoolSize());
+
+    // Clean up
+    replicationLogProcessor.close();
+
+    // Test that all custom configurations are honored
+    Configuration customConf = new Configuration(conf);
+
+    // Set custom values for all configuration parameters
+    int customBatchSize = 1000;
+    long customBatchSizeBytes = 128 * 1024 * 1024L; // 128 MB
+    int customRetriesCount = 6;
+    long customOperationTimeout = 15000L;
+    int customBatchRetryCount = 5;
+    long customMaxRetryDelay = 20000L;
+    int customThreadPoolSize = 10;
+
+    
customConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
+      customBatchSize);
+    
customConf.setLong(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE_BYTES,
+      customBatchSizeBytes);
+    
customConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT,
+      customRetriesCount);
+    customConf.setLong(
+      
ReplicationLogProcessor.REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS,
+      customOperationTimeout);
+    
customConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_BATCH_RETRY_COUNT,
+      customBatchRetryCount);
+    
customConf.setLong(ReplicationLogProcessor.REPLICATION_STANDBY_BATCH_RETRY_MAX_DELAY_MS,
+      customMaxRetryDelay);
+    
customConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_THREAD_POOL_SIZE,
+      customThreadPoolSize);
+
+    ReplicationLogProcessor customProcessor =
+      new ReplicationLogProcessor(customConf, testHAGroupName);
+
+    // Validate all custom configurations are honored
+    assertEquals("Custom batch size should be honored", customBatchSize,
+      customProcessor.getBatchSize());
+
+    assertEquals("Custom batch size bytes should be honored", 
customBatchSizeBytes,
+      customProcessor.getBatchSizeBytes());
+
+    assertEquals("Custom HBase client retries count should be honored", 
customRetriesCount,
+      customProcessor.getHBaseClientRetriesCount());
+
+    assertEquals("Custom HBase client operation timeout should be honored", 
customOperationTimeout,
+      customProcessor.getHBaseClientOperationTimeout());
+
+    assertEquals("Custom batch retry count should be honored", 
customBatchRetryCount,
+      customProcessor.getBatchRetryCount());
+
+    assertEquals("Custom max retry delay should be honored", 
customMaxRetryDelay,
+      customProcessor.getMaxRetryDelayMs());
+
+    // Validate custom thread pool size
+    ExecutorService customExecutorService = 
customProcessor.getExecutorService();
+    assertNotNull("Executor service must not be null", customExecutorService);
+    assertEquals("Custom thread pool size should be used", 
customThreadPoolSize,
+      ((ThreadPoolExecutor) customExecutorService).getCorePoolSize());
+
+    // Clean up
+    customProcessor.close();
+  }
+
+  /**
+   * Tests end-to-end processing of a valid log file with mutations for 
multiple tables.
+   */
+  @Test
+  public void testProcessLogFileForValidLogFile() throws Exception {
+    final String table1Name = "T_" + generateUniqueName();
+    final String table2Name = "T_" + generateUniqueName();
+    final Path filePath = new 
Path(testFolder.newFile("testProcessLogFileEnd2End").toURI());
+    LogFileWriter writer = initLogFileWriter(filePath);
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1Name));
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2Name));
+      PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+      List<Mutation> table1Mutations =
+        generateHBaseMutations(phoenixConnection, 2, table1Name, 100L, "a");
+      List<Mutation> table2Mutations =
+        generateHBaseMutations(phoenixConnection, 5, table2Name, 101L, "b");
+      table1Mutations.forEach(mutation -> {
+        try {
+          writer.append(table1Name, mutation.hashCode(), mutation);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      });
+      table2Mutations.forEach(mutation -> {
+        try {
+          writer.append(table2Name, mutation.hashCode(), mutation);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      });
+      writer.sync();
+      writer.close();
+
+      replicationLogProcessor.processLogFile(localFs, filePath);
+
+      validate(table1Name, table1Mutations);
+      validate(table2Name, table2Mutations);
+
+      // Ensure metrics are correctly populated
+      ReplicationLogProcessorMetricValues metricValues =
+        replicationLogProcessor.getMetrics().getCurrentMetricValues();
+      assertEquals("Invalid log file success count", 1,
+        metricValues.getLogFileReplaySuccessCount());
+      assertEquals("There must not be any failed mutations", 0,
+        metricValues.getFailedMutationsCount());
+      assertEquals("There must not be any failed files", 0,
+        metricValues.getLogFileReplayFailureCount());
+
+    } finally {
+      replicationLogProcessor.close();
+    }
+  }
+
+  /**
+   * Tests error handling when attempting to process a non-existent log file.
+   */
+  @Test
+  public void testProcessLogFileWithNonExistentFile() throws Exception {
+    // Create a path to a file that doesn't exist
+    Path nonExistentFilePath =
+      new Path(testFolder.getRoot().getAbsolutePath(), 
"non_existent_log_file.log");
+    // Verify the file doesn't exist
+    assertFalse("Non-existent file should not exist", 
localFs.exists(nonExistentFilePath));
+
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    // Attempt to process non-existent file - should throw IOException
+    try {
+      replicationLogProcessor.processLogFile(localFs, nonExistentFilePath);
+      fail("Should throw IOException for non-existent file");
+    } catch (IOException e) {
+      // Expected behavior - non-existent file should cause IOException
+      assertTrue("Should throw IOException for non-existent file", true);
+      // Ensure metrics are correctly populated
+      ReplicationLogProcessorMetricValues metricValues =
+        replicationLogProcessor.getMetrics().getCurrentMetricValues();
+      assertEquals("Invalid log file success count", 0,
+        metricValues.getLogFileReplaySuccessCount());
+      assertEquals("There must 1 failed replication log file", 1,
+        metricValues.getLogFileReplayFailureCount());
+    } finally {
+      replicationLogProcessor.close();
+    }
+  }
+
+  /**
+   * Tests processing of empty log files (files with header/trailer but no 
mutation records).
+   */
+  @Test
+  public void testProcessLogFileWithEmptyFile() throws Exception {
+    final Path emptyFilePath = new 
Path(testFolder.newFile("testProcessLogFileEmpty").toURI());
+    LogFileWriter writer = initLogFileWriter(emptyFilePath);
+
+    // Close the writer without adding any records - this creates a valid 
empty log file
+    writer.close();
+
+    // Verify file exists and has some content (header + trailer)
+    assertTrue("Empty log file should exist", localFs.exists(emptyFilePath));
+    assertTrue("Empty log file should have header/trailer content",
+      localFs.getFileStatus(emptyFilePath).getLen() > 0);
+
+    // Process the empty log file - should not throw any exceptions
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    try {
+      replicationLogProcessor.processLogFile(localFs, emptyFilePath);
+      // If we reach here, the empty file was processed successfully
+      assertTrue("Processing empty log file should complete without errors", 
true);
+      // Ensure metrics are correctly populated
+      ReplicationLogProcessorMetricValues metricValues =
+        replicationLogProcessor.getMetrics().getCurrentMetricValues();
+      assertEquals("Invalid log file success count", 1,
+        metricValues.getLogFileReplaySuccessCount());
+      assertEquals("There must not be any failed mutations", 0,
+        metricValues.getFailedMutationsCount());
+      assertEquals("There must not be any failed files", 0,
+        metricValues.getLogFileReplayFailureCount());
+    } catch (Exception e) {
+      fail("Processing empty log file should not throw exception: " + 
e.getMessage());
+    } finally {
+      replicationLogProcessor.close();
+    }
+  }
+
+  /**
+   * Tests processing of log files that were not closed, ensuring it's 
successful.
+   */
+  @Test
+  public void testProcessLogFileForUnClosedFile() throws Exception {
+    final String tableNameString = "T1_" + generateUniqueName();
+    final Path emptyFilePath =
+      new 
Path(testFolder.newFile("testProcessLogFileForUnClosedFile").toURI());
+    LogFileWriter writer = initLogFileWriter(emptyFilePath);
+
+    // Add one mutation
+    Mutation put = LogFileTestUtil.newPut("row1", 3L, 4);
+    writer.append(tableNameString, 1, put);
+    writer.sync();
+
+    ReplicationLogProcessor spyProcessor =
+      Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName));
+
+    // Create argument captor to capture the actual parameters passed to 
processReplicationLogBatch
+    ArgumentCaptor<Map<TableName, List<Mutation>>> mapCaptor = 
ArgumentCaptor.forClass(Map.class);
+
+    
Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(mapCaptor.capture());
+
+    // Process the file without closing - should not throw any exceptions
+    spyProcessor.processLogFile(localFs, emptyFilePath);
+
+    // Verify processReplicationLogBatch was called the expected number of 
times
+    Mockito.verify(spyProcessor, Mockito.times(1))
+      .processReplicationLogBatch(Mockito.any(Map.class));
+
+    // Validate the captured parameters
+    Map<TableName, List<Mutation>> capturedMap = mapCaptor.getValue();
+    assertNotNull("Captured map should not be null", capturedMap);
+    assertEquals("Should have exactly one table", 1, capturedMap.size());
+
+    // Verify the table name
+    TableName expectedTableName = TableName.valueOf(tableNameString);
+    assertTrue("Map should contain the expected table", 
capturedMap.containsKey(expectedTableName));
+
+    // Verify the mutations list
+    List<Mutation> mutations = capturedMap.get(expectedTableName);
+    assertNotNull("Mutations list should not be null", mutations);
+    assertEquals("Should have exactly one mutation", 1, mutations.size());
+
+    // Verify the mutation details
+    Mutation capturedMutation = mutations.get(0);
+    assertTrue("Mutation should be a Put", capturedMutation instanceof Put);
+    LogFileTestUtil.assertMutationEquals("Invalid put", put, capturedMutation);
+
+    // Clean up
+    spyProcessor.close();
+  }
+
+  /**
+   * Tests batching logic with various record counts and batch sizes.
+   */
+  @Test
+  public void testProcessLogFileBatchingLogic() throws Exception {
+    // Test multiple batching scenarios to ensure the logic works correctly
+
+    // Test case 1: General case where total records don't align perfectly 
with batch size
+    testProcessLogFileBatching(10, 3);
+
+    // Test case 2: Edge case where total records exactly matches batch size
+    testProcessLogFileBatching(5, 5);
+
+    // Test case 3: Single record with large batch size
+    testProcessLogFileBatching(1, 10);
+
+    // Test case 4: Multiple full batches
+    testProcessLogFileBatching(12, 4);
+  }
+
+  /**
+   * Tests batch size logic when both count and size limits are configured. 
Verifies that batching
+   * occurs when either count or size limit is reached first.
+   */
+  @Test
+  public void testProcessLogFileBatchSizeLogic() throws Exception {
+    final Path batchSizeFilePath =
+      new Path(testFolder.newFile("testProcessLogFileBatchSizeLogic").toURI());
+    final String tableName = "T_" + generateUniqueName();
+    final int batchSize = 5;
+    final long batchSizeBytes = 1800;
+
+    // Create log file with mutations of varying sizes
+    LogFileWriter writer = initLogFileWriter(batchSizeFilePath);
+
+    // Create mutations with fixed sizes to test both count and size limits
+    List<Mutation> mutations = new ArrayList<>();
+
+    // Add first 4 small mutations (each ~344 bytes)
+    for (int i = 0; i < 3; i++) {
+      Put put = new Put(Bytes.toBytes("row" + i));
+      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qual"), 
Bytes.toBytes("abcd"));
+      mutations.add(put);
+      writer.append(tableName, i, put);
+    }
+
+    // Add 1 big mutation that will cross the byte size threshold before count 
threshold
+    Put bigPut = new Put(Bytes.toBytes("bigRow"));
+    bigPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qual"),
+      Bytes.toBytes("This is a very large mutation that will exceed the size 
limit. "
+        + "It needs to be large enough to trigger the size-based batching 
logic. "
+        + "The mutation should be significantly larger than the small 
mutations to ensure "
+        + "it crosses the byte size threshold and forces a batch to be 
processed. "
+        + "This is a very large mutation that will exceed the size limit. "
+        + "It needs to be large enough to trigger the size-based batching 
logic. "
+        + "The mutation should be significantly larger than the small 
mutations to ensure "
+        + "it crosses the byte size threshold and forces a batch to be 
processed."));
+
+    mutations.add(bigPut);
+    writer.append(tableName, 100, bigPut);
+
+    // Add more small mutations that will be batched due to count limit
+    for (int i = 3; i < 10; i++) {
+      Put put = new Put(Bytes.toBytes("row" + i));
+      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qual"), 
Bytes.toBytes("abcd"));
+      mutations.add(put);
+      writer.append(tableName, i, put);
+    }
+
+    writer.close();
+
+    // Create processor with custom batch size and size limits
+    Configuration testConf = new Configuration(conf);
+    
testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
 batchSize);
+    
testConf.setLong(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE_BYTES,
+      batchSizeBytes);
+
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(testConf, testHAGroupName);
+
+    // Validate that the batch sizes are correctly set
+    assertEquals("Batch size should be set correctly", batchSize,
+      replicationLogProcessor.getBatchSize());
+    assertEquals("Batch size bytes should be set correctly", batchSizeBytes,
+      replicationLogProcessor.getBatchSizeBytes());
+
+    ReplicationLogProcessor spyProcessor = 
Mockito.spy(replicationLogProcessor);
+
+    // Store captured arguments
+    List<Map<TableName, List<Mutation>>> capturedArguments = new ArrayList<>();
+
+    // Mock processReplicationLogBatch to capture deep copies
+    Mockito.doAnswer(invocation -> {
+      Map<TableName, List<Mutation>> originalMap = invocation.getArgument(0);
+      Map<TableName, List<Mutation>> deepCopy = new HashMap<>(originalMap);
+      capturedArguments.add(deepCopy);
+      return null;
+    }).when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class));
+
+    // Process the log file
+    spyProcessor.processLogFile(localFs, batchSizeFilePath);
+
+    // Verify processReplicationLogBatch was called multiple times
+    Mockito.verify(spyProcessor, Mockito.atLeast(3))
+      .processReplicationLogBatch(Mockito.any(Map.class));
+
+    // Validate that we have captured batch calls
+    assertEquals("Should have 3 captured batch calls", 3, 
capturedArguments.size());
+
+    // Validate first batch (should be triggered by size limit)
+    Map<TableName, List<Mutation>> firstBatch = capturedArguments.get(0);
+    assertNotNull("First batch should not be null", firstBatch);
+
+    TableName expectedTableName = TableName.valueOf(tableName);
+    assertTrue("First batch should contain the table", 
firstBatch.containsKey(expectedTableName));
+
+    List<Mutation> firstBatchMutations = firstBatch.get(expectedTableName);
+    assertNotNull("First batch mutations should not be null", 
firstBatchMutations);
+
+    // First batch should be triggered by size limit (3 small + 1 big mutation)
+    assertEquals("First batch should have 4 mutations (3 small + 1 big)", 4,
+      firstBatchMutations.size());
+
+    // Calculate expected size of first batch
+    long firstBatchSize = 0;
+    for (Mutation mutation : firstBatchMutations) {
+      firstBatchSize += mutation.heapSize();
+    }
+
+    // First batch should exceed the size limit due to the large mutation
+    assertTrue("First batch should exceed size limit", firstBatchSize >= 
batchSizeBytes);
+
+    // Validate that captured mutations match input mutations
+    // First batch should contain mutations 0-3 (3 small + 1 big)
+    for (int i = 0; i < 3; i++) {
+      LogFileTestUtil.assertMutationEquals("First batch small mutation " + i + 
" mismatch",
+        mutations.get(i), firstBatchMutations.get(i));
+    }
+    LogFileTestUtil.assertMutationEquals("First batch big mutation mismatch", 
mutations.get(3),
+      firstBatchMutations.get(3));
+
+    // Validate second batch (should contain remaining small mutations)
+    Map<TableName, List<Mutation>> secondBatch = capturedArguments.get(1);
+    assertNotNull("Second batch should not be null", secondBatch);
+    assertTrue("Second batch should contain the table", 
secondBatch.containsKey(expectedTableName));
+
+    List<Mutation> secondBatchMutations = secondBatch.get(expectedTableName);
+    assertNotNull("Second batch mutations should not be null", 
secondBatchMutations);
+
+    // Second batch should be triggered by count limit (5 small mutations)
+    assertEquals("Second batch should have 5 (equal to batch size) mutations 
(count limit)", 5,
+      secondBatchMutations.size());
+
+    // Calculate expected size of second batch
+    long secondBatchSize = 0;
+    for (Mutation mutation : secondBatchMutations) {
+      secondBatchSize += mutation.heapSize();
+    }
+
+    // Second batch should be smaller than the size limit
+    assertTrue("Second batch should be within size limit", secondBatchSize < 
batchSizeBytes);
+
+    // Second batch should contain mutations 4-8 (5 small mutations)
+    for (int i = 0; i < 5; i++) {
+      LogFileTestUtil.assertMutationEquals("Second batch mutation " + i + " 
mismatch",
+        mutations.get(i + 4), secondBatchMutations.get(i));
+    }
+
+    // Validate third batch (should contain remaining 6 mutations)
+    Map<TableName, List<Mutation>> thirdBatch = capturedArguments.get(2);
+    assertNotNull("Third batch should not be null", thirdBatch);
+    assertTrue("Third batch should contain the table", 
thirdBatch.containsKey(expectedTableName));
+
+    List<Mutation> thirdBatchMutations = thirdBatch.get(expectedTableName);
+    assertNotNull("Third batch mutations should not be null", 
thirdBatchMutations);
+
+    // Third batch should contain remaining 2 small mutations
+    assertEquals("Third batch should have 2 mutations (remaining small 
mutations)", 2,
+      thirdBatchMutations.size());
+
+    // Calculate expected size of third batch
+    long thirdBatchSize = 0;
+    for (Mutation mutation : thirdBatchMutations) {
+      thirdBatchSize += mutation.heapSize();
+    }
+
+    // Third batch should be smaller than the size limit
+    assertTrue("Third batch should be within size limit", thirdBatchSize < 
batchSizeBytes);
+
+    // Third batch should contain mutations 9-10 (2 remaining small mutations)
+    for (int i = 0; i < 2; i++) {
+      LogFileTestUtil.assertMutationEquals("Third batch mutation " + i + " 
mismatch",
+        mutations.get(i + 9), thirdBatchMutations.get(i));
+    }
+
+    // Ensure metrics are correctly populated
+    ReplicationLogProcessorMetricValues metricValues =
+      spyProcessor.getMetrics().getCurrentMetricValues();
+    assertEquals("Invalid log file success count", 1, 
metricValues.getLogFileReplaySuccessCount());
+    assertEquals("There must not be any failed mutations", 0,
+      metricValues.getFailedMutationsCount());
+    assertEquals("There must not be any failed files", 0,
+      metricValues.getLogFileReplayFailureCount());
+
+    // Clean up
+    spyProcessor.close();
+  }
+
+  /**
+   * Tests batching logic when processing log files with mutations for 
multiple tables.
+   */
+  @Test
+  public void testProcessLogFileWithMultipleTables() throws Exception {
+    final Path multiTableBatchFilePath =
+      new 
Path(testFolder.newFile("testProcessLogFileWithMultipleTables").toURI());
+    final String table1Name = "T1_" + generateUniqueName();
+    final String table2Name = "T2_" + generateUniqueName();
+    final int batchSize = 4;
+    final int recordsPerTable = 3;
+    final int totalRecords = recordsPerTable * 2; // 6 total records
+    final int expectedBatchCalls = (totalRecords + batchSize - 1) / batchSize; 
// 2 calls
+
+    // Store mutations for validation
+    List<Mutation> table1Mutations = new ArrayList<>();
+    List<Mutation> table2Mutations = new ArrayList<>();
+
+    // Create log file with mutations for multiple tables
+    LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath);
+
+    // Add mutations alternating between tables using LogFileTestUtil
+    for (int i = 0; i < recordsPerTable; i++) {
+      // Add mutation for table1
+      Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, (i * 2) 
+ 1);
+      table1Mutations.add(put1);
+      writer.append(table1Name, (i * 2) + 1, put1);
+      writer.sync();
+
+      // Add mutation for table2
+      Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, (i * 2) 
+ 2);
+      table2Mutations.add(put2);
+      writer.append(table2Name, (i * 2) + 2, put2);
+      writer.sync();
+    }
+    writer.close();
+
+    // Create processor with custom batch size and spy on it
+    Configuration testConf = new Configuration(conf);
+    
testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
 batchSize);
+
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(testConf, testHAGroupName);
+
+    // Validate that the batch size is correctly set
+    assertEquals("Batch size should be set correctly", batchSize,
+      replicationLogProcessor.getBatchSize());
+
+    ReplicationLogProcessor spyProcessor = 
Mockito.spy(replicationLogProcessor);
+
+    // Store captured arguments manually to avoid reference issues
+    List<Map<TableName, List<Mutation>>> capturedArguments = new ArrayList<>();
+
+    // Mock processReplicationLogBatch to capture deep copies
+    Mockito.doAnswer(invocation -> {
+      // Capture deep copy of mutations
+      Map<TableName, List<Mutation>> originalMap = invocation.getArgument(0);
+      Map<TableName, List<Mutation>> deepCopy = new HashMap<>(originalMap);
+      capturedArguments.add(deepCopy);
+      return null;
+    }).when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class));
+
+    // Process the log file
+    spyProcessor.processLogFile(localFs, multiTableBatchFilePath);
+
+    // Verify processReplicationLogBatch was called the expected number of 
times
+    Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls))
+      .processReplicationLogBatch(Mockito.any(Map.class));
+
+    // Validate the captured parameters using our manually captured arguments
+    assertEquals("Should have captured " + expectedBatchCalls + " batch 
calls", expectedBatchCalls,
+      capturedArguments.size());
+
+    // Validate each batch call individually
+    TableName expectedTable1Name = TableName.valueOf(table1Name);
+    TableName expectedTable2Name = TableName.valueOf(table2Name);
+
+    // First batch should contain 4 mutations (batch size = 4)
+    // Based on alternating pattern: table1[0], table2[0], table1[1], table2[1]
+    Map<TableName, List<Mutation>> firstBatch = capturedArguments.get(0);
+    assertNotNull("First batch should not be null", firstBatch);
+
+    // Validate first batch contains both tables
+    assertTrue("First batch should contain table1", 
firstBatch.containsKey(expectedTable1Name));
+    assertTrue("First batch should contain table2", 
firstBatch.containsKey(expectedTable2Name));
+
+    List<Mutation> firstBatchTable1 = firstBatch.get(expectedTable1Name);
+    List<Mutation> firstBatchTable2 = firstBatch.get(expectedTable2Name);
+
+    assertNotNull("First batch table1 mutations should not be null", 
firstBatchTable1);
+    assertNotNull("First batch table2 mutations should not be null", 
firstBatchTable2);
+
+    // Validate first batch mutation counts
+    assertEquals("First batch should have 2 mutations for table1", 2, 
firstBatchTable1.size());
+    assertEquals("First batch should have 2 mutations for table2", 2, 
firstBatchTable2.size());
+
+    // Validate first batch mutation content
+    LogFileTestUtil.assertMutationEquals("First batch table1 mutation 0 
mismatch",
+      table1Mutations.get(0), firstBatchTable1.get(0));
+    LogFileTestUtil.assertMutationEquals("First batch table1 mutation 1 
mismatch",
+      table1Mutations.get(1), firstBatchTable1.get(1));
+    LogFileTestUtil.assertMutationEquals("First batch table2 mutation 0 
mismatch",
+      table2Mutations.get(0), firstBatchTable2.get(0));
+    LogFileTestUtil.assertMutationEquals("First batch table2 mutation 1 
mismatch",
+      table2Mutations.get(1), firstBatchTable2.get(1));
+
+    // Second batch should contain 2 mutations (remaining records)
+    // Based on alternating pattern: table1[2], table2[2]
+    Map<TableName, List<Mutation>> secondBatch = capturedArguments.get(1);
+    assertNotNull("Second batch should not be null", secondBatch);
+
+    // Validate second batch contains both tables
+    assertTrue("Second batch should contain table1", 
secondBatch.containsKey(expectedTable1Name));
+    assertTrue("Second batch should contain table2", 
secondBatch.containsKey(expectedTable2Name));
+
+    List<Mutation> secondBatchTable1 = secondBatch.get(expectedTable1Name);
+    List<Mutation> secondBatchTable2 = secondBatch.get(expectedTable2Name);
+
+    assertNotNull("Second batch table1 mutations should not be null", 
secondBatchTable1);
+    assertNotNull("Second batch table2 mutations should not be null", 
secondBatchTable2);
+
+    // Validate second batch mutation counts
+    assertEquals("Second batch should have 1 mutation for table1", 1, 
secondBatchTable1.size());
+    assertEquals("Second batch should have 1 mutation for table2", 1, 
secondBatchTable2.size());
+
+    // Validate second batch mutation content
+    LogFileTestUtil.assertMutationEquals("Second batch table1 mutation 0 
mismatch",
+      table1Mutations.get(2), secondBatchTable1.get(0));
+    LogFileTestUtil.assertMutationEquals("Second batch table2 mutation 0 
mismatch",
+      table2Mutations.get(2), secondBatchTable2.get(0));
+
+    // Ensure metrics are correctly populated
+    ReplicationLogProcessorMetricValues metricValues =
+      spyProcessor.getMetrics().getCurrentMetricValues();
+    assertEquals("Invalid log file success count", 1, 
metricValues.getLogFileReplaySuccessCount());
+    assertEquals("There must not be any failed mutations", 0,
+      metricValues.getFailedMutationsCount());
+    assertEquals("There must not be any failed files", 0,
+      metricValues.getLogFileReplayFailureCount());
+
+    // Clean up
+    spyProcessor.close();
+  }
+
+  /**
+   * Tests processing an empty mutation map - should complete without errors.
+   */
+  @Test
+  public void testApplyMutationsWithEmptyMap() throws IOException {
+    // Test with empty map - should not throw any exception
+    Map<TableName, List<Mutation>> emptyMap = new HashMap<>();
+
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    try {
+      ReplicationLogProcessor.ApplyMutationBatchResult 
applyMutationBatchResult =
+        replicationLogProcessor.applyMutations(emptyMap);
+      assertNotNull("Apply mutations result must not be null", 
applyMutationBatchResult);
+      assertNull("Apply mutations result exception should be null",
+        applyMutationBatchResult.getException());
+      Map<TableName, List<Mutation>> failedMutations =
+        applyMutationBatchResult.getFailedMutations();
+      assertNotNull("Failed mutations must not be null", failedMutations);
+      assertTrue("Failed mutations must be empty", failedMutations.isEmpty());
+      // Should not throw any exception
+    } catch (Exception e) {
+      fail("Should not throw exception for empty map: " + e.getMessage());
+    } finally {
+      replicationLogProcessor.close();
+    }
+  }
+
+  /**
+   * Tests applyMutations method with three tables where two succeed and one 
fails. This test
+   * verifies that the method properly handles partial failure scenarios and 
returns only the failed
+   * mutations in the result map.
+   */
+  @Test
+  public void testApplyMutationsWithPartialFailures() throws Exception {
+    final String table1 = "T_" + generateUniqueName();
+    final String table2 = "T_" + generateUniqueName();
+    final String table3 = "T_" + generateUniqueName();
+    final TableName tableName1 = TableName.valueOf(table1);
+    final TableName tableName2 = TableName.valueOf(table2);
+    final TableName tableName3 = TableName.valueOf(table3);
+    Map<TableName, List<Mutation>> tableMutationsMap = new HashMap<>();
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      // Create first table (will succeed)
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1));
+      // Create second table (will succeed)
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2));
+      // Create third table (will be disabled to simulate failure)
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table3));
+
+      PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+      // Generate mutations for the first table (will succeed)
+      List<Mutation> mutations1 = generateHBaseMutations(phoenixConnection, 2, 
table1, 10L, "a");
+      tableMutationsMap.put(tableName1, mutations1);
+
+      // Generate mutations for the second table (will succeed)
+      List<Mutation> mutations2 = generateHBaseMutations(phoenixConnection, 2, 
table2, 20L, "b");
+      tableMutationsMap.put(tableName2, mutations2);
+
+      // Generate mutations for the third table (will fail)
+      List<Mutation> mutations3 = generateHBaseMutations(phoenixConnection, 2, 
table3, 30L, "c");
+      tableMutationsMap.put(tableName3, mutations3);
+
+      // Disable the third table to simulate failure
+      Admin admin = phoenixConnection.getQueryServices().getAdmin();
+      admin.disableTable(tableName3);
+
+      // Apply mutations - should have partial failures
+      ReplicationLogProcessor.ApplyMutationBatchResult 
applyMutationBatchResult =
+        replicationLogProcessor.applyMutations(tableMutationsMap);
+      assertNotNull("Apply mutations result must not be null", 
applyMutationBatchResult);
+      assertNotNull("Apply mutations result exception must not be null",
+        applyMutationBatchResult.getException());
+      assertTrue("Invalid exception message", 
applyMutationBatchResult.getException().getMessage()
+        .contains(NotServingRegionException.class.getSimpleName()));
+      Map<TableName, List<Mutation>> failedMutations =
+        applyMutationBatchResult.getFailedMutations();
+
+      // Verify failed mutations map is not null and contains the failed table
+      assertNotNull("Failed mutations map should not be null", 
failedMutations);
+      assertFalse("Some mutations should have failed", 
failedMutations.isEmpty());
+
+      // Verify that table3 mutations failed
+      assertTrue("Table3 mutations should be in failed mutations",
+        failedMutations.containsKey(tableName3));
+      assertEquals("Table3 should have all its mutations failed", 
mutations3.size(),
+        failedMutations.get(tableName3).size());
+      for (int i = 0; i < mutations3.size(); i++) {
+        LogFileTestUtil.assertMutationEquals("Mutations modified by 
applyMutations method",
+          mutations3.get(i), 
failedMutations.get(TableName.valueOf(table3)).get(i));
+      }
+
+      // Verify that table1 and table2 mutations succeeded (not in failed 
mutations)
+      assertFalse("Table1 mutations should not be in failed mutations",
+        failedMutations.containsKey(tableName1));
+      assertFalse("Table2 mutations should not be in failed mutations",
+        failedMutations.containsKey(tableName2));
+
+      // Verify mutations were actually applied to the successful tables
+      validate(table1, mutations1);
+      validate(table2, mutations2);
+
+      // Ensure metrics are correctly populated
+      ReplicationLogProcessorMetricValues metricValues =
+        replicationLogProcessor.getMetrics().getCurrentMetricValues();
+      assertEquals("Failed mutations count should match table3 mutations", 
mutations3.size(),
+        metricValues.getFailedMutationsCount());
+    } finally {
+      replicationLogProcessor.close();
+    }
+  }
+
+  /**
+   * Tests applyMutations method with two tables that both succeed in one go. 
This test verifies
+   * that the method can handle multiple tables successfully and returns an 
empty failed mutations
+   * map.
+   */
+  @Test
+  public void testApplyMutationsSuccess() throws Exception {
+    final String table1 = "T_" + generateUniqueName();
+    final String table2 = "T_" + generateUniqueName();
+    Map<TableName, List<Mutation>> tableMutationsMap = new HashMap<>();
+    // Create processor and apply mutations
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      // Create first table
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1));
+      // Create second table
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2));
+
+      PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+      // Generate mutations for the first table
+      List<Mutation> mutations1 = generateHBaseMutations(phoenixConnection, 3, 
table1, 10L, "a");
+      tableMutationsMap.put(TableName.valueOf(table1), mutations1);
+
+      // Generate mutations for the second table
+      List<Mutation> mutations2 = generateHBaseMutations(phoenixConnection, 2, 
table2, 20L, "b");
+      tableMutationsMap.put(TableName.valueOf(table2), mutations2);
+
+      // Apply mutations - should succeed for both tables
+      ReplicationLogProcessor.ApplyMutationBatchResult 
applyMutationBatchResult =
+        replicationLogProcessor.applyMutations(tableMutationsMap);
+      assertNotNull("Apply mutations result must not be null", 
applyMutationBatchResult);
+      assertNull("Apply mutations result exception should be null",
+        applyMutationBatchResult.getException());
+      Map<TableName, List<Mutation>> failedMutations =
+        applyMutationBatchResult.getFailedMutations();
+
+      // Verify no mutations failed
+      assertNotNull("Failed mutations map should not be null", 
failedMutations);
+      assertTrue("No mutations should have failed", failedMutations.isEmpty());
+
+      // Verify that table1 mutations were applied successfully
+      validate(table1, mutations1);
+
+      // Verify that table2 mutations were applied successfully
+      validate(table2, mutations2);
+
+      // Ensure metrics are correctly populated
+      ReplicationLogProcessorMetricValues metricValues =
+        replicationLogProcessor.getMetrics().getCurrentMetricValues();
+      assertEquals("Failed mutations count must be 0", 0, 
metricValues.getFailedMutationsCount());
+    } finally {
+      replicationLogProcessor.close();
+    }
+  }
+
+  /**
+   * Tests applyMutations method with two tables that both fail. This test 
verifies that the method
+   * properly handles complete failure scenarios and returns all mutations in 
the failed mutations
+   * map.
+   */
+  @Test
+  public void testApplyMutationsFailure() throws Exception {
+    final String table1 = "T_" + generateUniqueName();
+    final String table2 = "T_" + generateUniqueName();
+    Map<TableName, List<Mutation>> tableMutationsMap = new HashMap<>();
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      // Create first table
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1));
+      // Create second table
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2));
+
+      PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+      // Generate mutations for the first table
+      List<Mutation> table1Mutations =
+        generateHBaseMutations(phoenixConnection, 2, table1, 10L, "a");
+      tableMutationsMap.put(TableName.valueOf(table1), table1Mutations);
+
+      // Generate mutations for the second table
+      List<Mutation> table2Mutations =
+        generateHBaseMutations(phoenixConnection, 3, table2, 20L, "b");
+      tableMutationsMap.put(TableName.valueOf(table2), table2Mutations);
+
+      // Disable regions for both tables to simulate complete failure
+      Admin admin = phoenixConnection.getQueryServices().getAdmin();
+
+      // Disable region for table1
+      List<HRegionInfo> regions1 = 
admin.getTableRegions(TableName.valueOf(table1));
+      if (!regions1.isEmpty()) {
+        admin.unassign(regions1.get(0).getRegionName(), true);
+      }
+
+      // Disable region for table2
+      List<HRegionInfo> regions2 = 
admin.getTableRegions(TableName.valueOf(table2));
+      if (!regions2.isEmpty()) {
+        admin.unassign(regions2.get(0).getRegionName(), true);
+      }
+
+      // Apply mutations - should fail for both tables
+      ReplicationLogProcessor.ApplyMutationBatchResult 
applyMutationBatchResult =
+        replicationLogProcessor.applyMutations(tableMutationsMap);
+      assertNotNull("Apply mutations result must not be null", 
applyMutationBatchResult);
+      assertNotNull("Apply mutations result exception must not be null",
+        applyMutationBatchResult.getException());
+      assertTrue("Invalid exception message", 
applyMutationBatchResult.getException().getMessage()
+        .contains(NotServingRegionException.class.getSimpleName()));
+      Map<TableName, List<Mutation>> failedMutations =
+        applyMutationBatchResult.getFailedMutations();
+
+      // Verify failed mutations map contains both tables
+      assertNotNull("Failed mutations map should not be null", 
failedMutations);
+      assertFalse("Some mutations should have failed", 
failedMutations.isEmpty());
+      assertEquals("Should have 2 failed tables", 2, failedMutations.size());
+
+      // Verify that table1 mutations failed
+      assertTrue("Table1 mutations should be in failed mutations",
+        failedMutations.containsKey(TableName.valueOf(table1)));
+      assertEquals("Table1 should have all its mutations failed", 
table1Mutations.size(),
+        failedMutations.get(TableName.valueOf(table1)).size());
+      for (int i = 0; i < table1Mutations.size(); i++) {
+        LogFileTestUtil.assertMutationEquals("Table1 mutation mismatch", 
table1Mutations.get(i),
+          failedMutations.get(TableName.valueOf(table1)).get(i));
+      }
+
+      // Verify that table2 mutations failed
+      assertTrue("Table2 mutations should be in failed mutations",
+        failedMutations.containsKey(TableName.valueOf(table2)));
+      assertEquals("Table2 should have all its mutations failed", 
table2Mutations.size(),
+        failedMutations.get(TableName.valueOf(table2)).size());
+      for (int i = 0; i < table2Mutations.size(); i++) {
+        LogFileTestUtil.assertMutationEquals("Table2 mutation mismatch", 
table2Mutations.get(i),
+          failedMutations.get(TableName.valueOf(table2)).get(i));
+      }
+
+      // Ensure metrics are correctly populated
+      ReplicationLogProcessorMetricValues metricValues =
+        replicationLogProcessor.getMetrics().getCurrentMetricValues();
+      assertEquals("Failed mutations count mismatch",
+        table1Mutations.size() + table2Mutations.size(), 
metricValues.getFailedMutationsCount());
+    } finally {
+      replicationLogProcessor.close();
+    }
+  }
+
+  /**
+   * Tests processReplicationLogBatch method with multiple tables where all 
mutations succeed in one
+   * go.
+   */
+  @Test
+  public void testProcessReplicationLogBatchWithMultipleTablesSuccess() throws 
Exception {
+    final String table1 = "T_" + generateUniqueName();
+    final String table2 = "T_" + generateUniqueName();
+    Map<TableName, List<Mutation>> tableMutationsMap = new HashMap<>();
+
+    // Create test mutations for multiple tables
+    List<Mutation> mutations1 = new ArrayList<>();
+    mutations1.add(LogFileTestUtil.newPut("row1_table1", 1L, 1));
+    mutations1.add(LogFileTestUtil.newPut("row2_table1", 2L, 2));
+    tableMutationsMap.put(TableName.valueOf(table1), mutations1);
+
+    List<Mutation> mutations2 = new ArrayList<>();
+    mutations2.add(LogFileTestUtil.newPut("row1_table2", 3L, 3));
+    mutations2.add(LogFileTestUtil.newDelete("row2_table2", 4L, 4));
+    tableMutationsMap.put(TableName.valueOf(table2), mutations2);
+
+    // Create processor and spy on it
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    ReplicationLogProcessor spyProcessor = 
Mockito.spy(replicationLogProcessor);
+
+    // Mock applyMutations to return empty failed mutations map (all succeed)
+    ReplicationLogProcessor.ApplyMutationBatchResult 
successApplyMutationsResult =
+      new 
ReplicationLogProcessor.ApplyMutationBatchResult(Collections.emptyMap(), null);
+    Mockito.doReturn(successApplyMutationsResult).when(spyProcessor)
+      .applyMutations(Mockito.anyMap());
+
+    // Call processReplicationLogBatch - should succeed without retries
+    spyProcessor.processReplicationLogBatch(tableMutationsMap);
+
+    // Verify applyMutations was called exactly once with the correct 
parameters
+    Mockito.verify(spyProcessor, 
Mockito.times(1)).applyMutations(Mockito.anyMap());
+
+    // Capture the argument passed to applyMutations
+    ArgumentCaptor<Map<TableName, List<Mutation>>> argumentCaptor =
+      ArgumentCaptor.forClass(Map.class);
+    Mockito.verify(spyProcessor).applyMutations(argumentCaptor.capture());
+
+    // Verify the captured argument contains both tables with correct mutations
+    Map<TableName, List<Mutation>> capturedMap = argumentCaptor.getValue();
+    assertNotNull("Captured map should not be null", capturedMap);
+    assertEquals("Should have 2 tables", 2, capturedMap.size());
+
+    // Verify table1 mutations
+    assertTrue("Should contain table1", 
capturedMap.containsKey(TableName.valueOf(table1)));
+    List<Mutation> capturedMutations1 = 
capturedMap.get(TableName.valueOf(table1));
+    assertEquals("Table1 should have 2 mutations", 2, 
capturedMutations1.size());
+    LogFileTestUtil.assertMutationEquals("Table1 mutation 1 mismatch", 
mutations1.get(0),
+      capturedMutations1.get(0));
+    LogFileTestUtil.assertMutationEquals("Table1 mutation 2 mismatch", 
mutations1.get(1),
+      capturedMutations1.get(1));
+
+    // Verify table2 mutations
+    assertTrue("Should contain table2", 
capturedMap.containsKey(TableName.valueOf(table2)));
+    List<Mutation> capturedMutations2 = 
capturedMap.get(TableName.valueOf(table2));
+    assertEquals("Table2 should have 2 mutations", 2, 
capturedMutations2.size());
+    LogFileTestUtil.assertMutationEquals("Table2 mutation 1 mismatch", 
mutations2.get(0),
+      capturedMutations2.get(0));
+    LogFileTestUtil.assertMutationEquals("Table2 mutation 2 mismatch", 
mutations2.get(1),
+      capturedMutations2.get(1));
+
+    // Clean up
+    spyProcessor.close();
+  }
+
+  /**
+   * Tests processReplicationLogBatch method with persistent failure for one 
table. One table's
+   * region is brought down to simulate persistent failure while another table 
succeeds. This tests
+   * the retry logic and failure handling behavior.
+   */
+  @Test
+  public void testProcessReplicationLogBatchWithPersistentFailure() throws 
Exception {
+    final String table1 = "T_" + generateUniqueName();
+    final String table2 = "T_" + generateUniqueName();
+    Map<TableName, List<Mutation>> tableMutationsMap = new HashMap<>();
+
+    // Create processor and spy on it
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    ReplicationLogProcessor spyProcessor = 
Mockito.spy(replicationLogProcessor);
+
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      // Create first table (will succeed)
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1));
+      // Create second table (will have region down to simulate failure)
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2));
+
+      PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+      // Generate mutations for the first table (will succeed)
+      List<Mutation> table1Mutations =
+        generateHBaseMutations(phoenixConnection, 2, table1, 10L, "a");
+      tableMutationsMap.put(TableName.valueOf(table1), table1Mutations);
+
+      // Generate mutations for the second table (will fail due to region down)
+      List<Mutation> table2Mutations =
+        generateHBaseMutations(phoenixConnection, 2, table2, 20L, "b");
+      tableMutationsMap.put(TableName.valueOf(table2), table2Mutations);
+
+      // Bring down a region for the second table to simulate persistent 
failure
+      Admin admin = phoenixConnection.getQueryServices().getAdmin();
+      TableName table2TableName = TableName.valueOf(table2);
+
+      // Get regions for the table and disable one of them
+      List<HRegionInfo> regions = admin.getTableRegions(table2TableName);
+      if (!regions.isEmpty()) {
+        // Disable the first region to simulate failure
+        admin.unassign(regions.get(0).getRegionName(), true);
+      }
+
+      // Capture all calls to applyMutations
+      List<Map<TableName, List<Mutation>>> capturedCalls = new ArrayList<>();
+      Mockito.doAnswer(invocation -> {
+        Map<TableName, List<Mutation>> originalMap = invocation.getArgument(0);
+        Map<TableName, List<Mutation>> deepCopy = new HashMap<>(originalMap);
+        capturedCalls.add(deepCopy);
+        return invocation.callRealMethod();
+      }).when(spyProcessor).applyMutations(Mockito.anyMap());
+
+      try {
+        spyProcessor.processReplicationLogBatch(tableMutationsMap);
+        fail("Should throw IOException for non-existent file");
+      } catch (IOException e) {
+        assertTrue("IOException must be thrown due to persistent failures", 
true);
+      }
+
+      // Get the expected number of retries from configuration
+      int maxRetries = 
conf.getInt(ReplicationLogProcessor.REPLICATION_STANDBY_BATCH_RETRY_COUNT,
+        ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_BATCH_RETRY_COUNT);
+      int expectedCalls = maxRetries + 1; // Initial call + retries
+
+      // Verify the exact number of retries
+      Mockito.verify(spyProcessor, 
Mockito.times(expectedCalls)).applyMutations(Mockito.anyMap());
+      assertEquals("Should have made " + expectedCalls + " calls to 
applyMutations", expectedCalls,
+        capturedCalls.size());
+
+      // First call should contain both tables
+      Map<TableName, List<Mutation>> call = capturedCalls.get(0);
+      assertEquals("First call should have 2 tables", 2, call.size());
+
+      // Verify table1 mutations are present and correct
+      assertTrue("First call should contain table1", 
call.containsKey(TableName.valueOf(table1)));
+      List<Mutation> callMutations1 = call.get(TableName.valueOf(table1));
+      assertEquals("Mismatch in number of mutations for table1", 
table1Mutations.size(),
+        callMutations1.size());
+      for (int i = 0; i < table1Mutations.size(); i++) {
+        LogFileTestUtil.assertMutationEquals("Mutation mismatch for table1", 
table1Mutations.get(i),
+          callMutations1.get(i));
+      }
+
+      // Verify table2 mutations are present and correct
+      assertTrue("First call should contain table2", 
call.containsKey(TableName.valueOf(table2)));
+      List<Mutation> callMutations2 = call.get(TableName.valueOf(table2));
+      assertEquals("Mismatch in number of mutations for table2", 
table2Mutations.size(),
+        callMutations2.size());
+      for (int mutationIndex = 0; mutationIndex < table1Mutations.size(); 
mutationIndex++) {
+        LogFileTestUtil.assertMutationEquals("Mutation mismatch for table2",
+          table2Mutations.get(mutationIndex), 
callMutations2.get(mutationIndex));
+      }
+
+      // Verify that retry calls should contain only table2 (the failed table)
+      for (int callIndex = 1; callIndex < capturedCalls.size(); callIndex++) {
+        call = capturedCalls.get(callIndex);
+        assertNotNull("Call " + callIndex + " should not be null", call);
+        assertEquals("Retry call " + callIndex + " should have 1 table", 1, 
call.size());
+
+        // Verify table1 is NOT present in retry calls
+        assertFalse("Retry call " + callIndex + " should not contain table1",
+          call.containsKey(TableName.valueOf(table1)));
+
+        // Verify table2 mutations are present and correct
+        assertTrue("First call should contain table2", 
call.containsKey(TableName.valueOf(table2)));
+        callMutations2 = call.get(TableName.valueOf(table2));
+        assertEquals("Mismatch in number of mutations for table2", 
table2Mutations.size(),
+          callMutations2.size());
+        for (int mutationIndex = 0; mutationIndex < table1Mutations.size(); 
mutationIndex++) {
+          LogFileTestUtil.assertMutationEquals("Mutation mismatch for table2",
+            table2Mutations.get(mutationIndex), 
callMutations2.get(mutationIndex));
+        }
+
+        // Ensure metrics are correctly populated
+        ReplicationLogProcessorMetricValues metricValues =
+          spyProcessor.getMetrics().getCurrentMetricValues();
+        assertEquals("Invalid failed mutations count", table2Mutations.size() 
* 3,
+          metricValues.getFailedMutationsCount());
+      }
+    } finally {
+      spyProcessor.close();
+    }
+  }
+
+  /**
+   * Tests processReplicationLogBatch method with intermittent failure where a 
region comes back up
+   * during retries. This test verifies that the retry logic works correctly 
when failures are
+   * temporary and eventually resolve.
+   */
+  @Test
+  public void testProcessReplicationLogBatchWithIntermittentFailure() throws 
Exception {
+    final String table1 = "T_" + generateUniqueName();
+    final String table2 = "T_" + generateUniqueName();
+    Map<TableName, List<Mutation>> tableMutationsMap = new HashMap<>();
+
+    // Create processor and spy on it
+    ReplicationLogProcessor replicationLogProcessor =
+      new ReplicationLogProcessor(conf, testHAGroupName);
+    ReplicationLogProcessor spyProcessor = 
Mockito.spy(replicationLogProcessor);
+
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      // Create first table (will succeed)
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1));
+      // Create second table (will have intermittent failure)
+      conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2));
+
+      PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+      // Generate mutations for the first table (will succeed)
+      List<Mutation> mutations1 = generateHBaseMutations(phoenixConnection, 2, 
table1, 10L, "a");
+      tableMutationsMap.put(TableName.valueOf(table1), mutations1);
+
+      // Generate mutations for the second table (will have intermittent 
failure)
+      List<Mutation> mutations2 = generateHBaseMutations(phoenixConnection, 2, 
table2, 20L, "b");
+      tableMutationsMap.put(TableName.valueOf(table2), mutations2);
+
+      // Verify table2 has exactly 1 region
+      Admin admin = phoenixConnection.getQueryServices().getAdmin();
+      TableName tableName2TableName = TableName.valueOf(table2);
+      List<HRegionInfo> regions = admin.getTableRegions(tableName2TableName);
+      assertEquals("Table2 should have exactly 1 region", 1, regions.size());
+
+      // Bring down the region to simulate intermittent failure
+      HRegionInfo regionToDisable = regions.get(0);
+      admin.unassign(regionToDisable.getRegionName(), true);
+
+      // Capture all calls to applyMutations
+      List<Map<TableName, List<Mutation>>> capturedCalls = new ArrayList<>();
+      AtomicInteger callCount = new AtomicInteger(0);
+
+      Mockito.doAnswer(invocation -> {
+        Map<TableName, List<Mutation>> originalMap = invocation.getArgument(0);
+        Map<TableName, List<Mutation>> deepCopy = new HashMap<>(originalMap);
+        capturedCalls.add(deepCopy);
+
+        int currentCall = callCount.incrementAndGet();
+
+        // After 2 retries, bring the region back up
+        if (currentCall == 3 && regionToDisable != null) {
+          try {
+            admin.assign(regionToDisable.getRegionName());
+          } catch (Exception e) {
+            // Ignore if region is already assigned
+          }
+        }
+
+        // Call the real applyMutations method
+        return invocation.callRealMethod();
+      }).when(spyProcessor).applyMutations(Mockito.anyMap());
+
+      try {
+        spyProcessor.processReplicationLogBatch(tableMutationsMap);
+        // Should succeed after retries
+      } catch (IOException e) {
+        fail("Should not throw IOException as mutations should eventually 
succeed");
+      }
+
+      // Expected calls: 1 initial + 2 retries (since table2 succeeds on 3rd 
attempt)
+      int expectedCalls = 3;
+
+      // Verify the exact number of calls
+      Mockito.verify(spyProcessor, 
Mockito.times(expectedCalls)).applyMutations(Mockito.anyMap());
+      assertEquals("Should have made " + expectedCalls + " calls to 
applyMutations", expectedCalls,
+        capturedCalls.size());
+
+      // First call should contain both tables
+      Map<TableName, List<Mutation>> firstCall = capturedCalls.get(0);
+      assertEquals("First call should have 2 tables", 2, firstCall.size());
+
+      // Verify table1 mutations are present and correct in first call
+      assertTrue("First call should contain table1",
+        firstCall.containsKey(TableName.valueOf(table1)));
+      List<Mutation> firstCallMutations1 = 
firstCall.get(TableName.valueOf(table1));
+      assertEquals("Mismatch in number of mutations for table1", 
mutations1.size(),
+        firstCallMutations1.size());
+      for (int i = 0; i < mutations1.size(); i++) {
+        LogFileTestUtil.assertMutationEquals("Mutation mismatch for table1", 
mutations1.get(i),
+          firstCallMutations1.get(i));
+      }
+
+      // Verify table2 mutations are present and correct in first call
+      assertTrue("First call should contain table2",
+        firstCall.containsKey(TableName.valueOf(table2)));
+      List<Mutation> firstCallMutations2 = 
firstCall.get(TableName.valueOf(table2));
+      assertEquals("Mismatch in number of mutations for table2", 
mutations2.size(),
+        firstCallMutations2.size());
+      for (int i = 0; i < mutations2.size(); i++) {
+        LogFileTestUtil.assertMutationEquals("Mutation mismatch for table2", 
mutations2.get(i),
+          firstCallMutations2.get(i));
+      }
+
+      // Verify that retry calls (1 and 2) should contain only table2 (the 
failed table)
+      for (int callIndex = 1; callIndex < 3; callIndex++) {
+        Map<TableName, List<Mutation>> retryCall = 
capturedCalls.get(callIndex);
+        assertNotNull("Call " + callIndex + " should not be null", retryCall);
+        assertEquals("Retry call " + callIndex + " should have 1 table", 1, 
retryCall.size());
+
+        // Verify table1 is NOT present in retry calls
+        assertFalse("Retry call " + callIndex + " should not contain table1",
+          retryCall.containsKey(TableName.valueOf(table1)));
+
+        // Verify table2 mutations are present and correct
+        assertTrue("Retry call " + callIndex + " should contain table2",
+          retryCall.containsKey(TableName.valueOf(table2)));
+        List<Mutation> retryCallMutations2 = 
retryCall.get(TableName.valueOf(table2));
+        assertEquals("Mismatch in number of mutations for table2", 
mutations2.size(),
+          retryCallMutations2.size());
+        for (int i = 0; i < mutations2.size(); i++) {
+          LogFileTestUtil.assertMutationEquals("Mutation mismatch for table2", 
mutations2.get(i),
+            retryCallMutations2.get(i));
+        }
+      }
+
+      // Verify that table1 mutations were applied successfully
+      validate(table1, mutations1);
+
+      // Verify that table2 mutations were eventually applied successfully
+      validate(table2, mutations2);
+
+      // Ensure metrics are correctly populated
+      ReplicationLogProcessorMetricValues metricValues =
+        spyProcessor.getMetrics().getCurrentMetricValues();
+      assertEquals("Invalid failed mutations count", mutations2.size() * 2,
+        metricValues.getFailedMutationsCount());
+    } finally {
+      spyProcessor.close();
+    }
+  }
+
+  /**
+   * Helper method to test batching scenarios with different record counts and 
batch sizes
+   */
+  private void testProcessLogFileBatching(int totalRecords, int batchSize) 
throws Exception {
+    final Path batchTestFilePath = new Path(testFolder.newFile("test_" + new 
Random(1000)).toURI());
+    final String tableName = "T_" + generateUniqueName();
+    final int expectedBatchCalls = (totalRecords + batchSize - 1) / batchSize; 
// Ceiling division
+
+    // Create log file with specific number of records
+    LogFileWriter writer = initLogFileWriter(batchTestFilePath);
+    List<Mutation> originalMutations = new ArrayList<>();
+
+    // Add exactly totalRecords mutations to the log file using LogFileTestUtil
+    for (int i = 0; i < totalRecords; i++) {
+      Mutation put = LogFileTestUtil.newPut("row" + i, i + 1, i + 1);
+      originalMutations.add(put);
+      writer.append(tableName, i + 1, put);
+      writer.sync();
+    }
+    writer.close();
+
+    // Create a configuration with custom batch size
+    Configuration testConf = new Configuration(conf);
+    
testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
 batchSize);
+
+    // Create processor with custom batch size and spy on it
+    ReplicationLogProcessor spyProcessor =
+      Mockito.spy(new ReplicationLogProcessor(testConf, testHAGroupName));
+
+    // Validate that the batch size is correctly set
+    assertEquals("Batch size incorrectly set", batchSize, 
spyProcessor.getBatchSize());
+
+    // Store captured arguments manually to avoid reference issues
+    List<Map<TableName, List<Mutation>>> capturedArguments = new ArrayList<>();
+    AtomicInteger processReplicationLogBatchCount = new AtomicInteger(0);
+
+    // Mock applyMutations to capture arguments
+    Mockito.doAnswer(invocation -> {
+      // Capture deep copy of arguments
+      Map<TableName, List<Mutation>> originalMap = invocation.getArgument(0);
+      Map<TableName, List<Mutation>> deepCopy = new HashMap<>(originalMap);
+      capturedArguments.add(deepCopy);
+      return null;
+    }).when(spyProcessor).processReplicationLogBatch(Mockito.anyMap());
+
+    // Process the log file
+    spyProcessor.processLogFile(localFs, batchTestFilePath);
+
+    // Verify processReplicationLogBatch was called the expected number of 
times
+    Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls))
+      .processReplicationLogBatch(Mockito.any(Map.class));
+
+    // Validate the captured parameters using our manually captured arguments
+    assertEquals("Should have captured " + expectedBatchCalls + " batch 
calls", expectedBatchCalls,
+      capturedArguments.size());
+
+    // Validate each batch call individually
+    TableName expectedTableName = TableName.valueOf(tableName);
+    int mutationIndex = 0;
+
+    for (int batchIndex = 0; batchIndex < expectedBatchCalls; batchIndex++) {
+      Map<TableName, List<Mutation>> batch = capturedArguments.get(batchIndex);
+      assertNotNull("Batch " + batchIndex + " should not be null", batch);
+
+      // Validate batch contains the expected table
+      assertTrue("Batch " + batchIndex + " should contain table " + tableName,
+        batch.containsKey(expectedTableName));
+
+      List<Mutation> batchMutations = batch.get(expectedTableName);
+      assertNotNull("Batch " + batchIndex + " mutations should not be null", 
batchMutations);
+
+      // Calculate expected mutations in this batch
+      int expectedMutationsInBatch = Math.min(batchSize, totalRecords - 
mutationIndex);
+      assertEquals(
+        "Batch " + batchIndex + " should have " + expectedMutationsInBatch + " 
mutations",
+        expectedMutationsInBatch, batchMutations.size());
+
+      // Validate each mutation in the batch
+      for (int i = 0; i < expectedMutationsInBatch; i++) {
+        LogFileTestUtil.assertMutationEquals("Batch " + batchIndex + " 
mutation " + i + " mismatch",
+          originalMutations.get(mutationIndex), batchMutations.get(i));
+        mutationIndex++;
+      }
+    }
+
+    // Ensure all mutations were processed
+    assertEquals("All mutations should have been processed", totalRecords, 
mutationIndex);
+
+    // Clean up
+    spyProcessor.close();
+  }
+
+  /**
+   * Tests that ReplicationLogProcessor.get() returns the same instance for 
the same haGroup.
+   * Verifies that multiple calls with the same parameters return the cached 
instance.
+   */
+  @Test
+  public void testReplicationLogProcessorInstanceCaching() throws Exception {
+    final String haGroupName1 = "testHAGroup_1";
+    final String haGroupName2 = "testHAGroup_2";
+
+    // Get instances for the first HA group
+    ReplicationLogProcessor group1Instance1 = 
ReplicationLogProcessor.get(conf, haGroupName1);
+    ReplicationLogProcessor group1Instance2 = 
ReplicationLogProcessor.get(conf, haGroupName1);
+
+    // Verify same instance is returned for same haGroupName
+    assertNotNull("ReplicationLogProcessor should not be null", 
group1Instance1);
+    assertNotNull("ReplicationLogProcessor should not be null", 
group1Instance2);
+    assertSame("Same instance should be returned for same haGroup", 
group1Instance1,
+      group1Instance2);
+    assertEquals("HA Group ID should match", haGroupName1, 
group1Instance1.getHaGroupName());
+
+    // Get instance for a different HA group
+    ReplicationLogProcessor group2Instance1 = 
ReplicationLogProcessor.get(conf, haGroupName2);
+    assertNotNull("ReplicationLogProcessor should not be null", 
group2Instance1);
+    assertNotSame("Different instance should be returned for different 
haGroup", group2Instance1,
+      group1Instance1);
+    assertEquals("HA Group ID should match", haGroupName2, 
group2Instance1.getHaGroupName());
+
+    // Verify multiple calls still return cached instances
+    ReplicationLogProcessor group1Instance3 = 
ReplicationLogProcessor.get(conf, haGroupName1);
+    ReplicationLogProcessor group2Instance2 = 
ReplicationLogProcessor.get(conf, haGroupName2);
+    assertSame("Cached instance should be returned", group1Instance3, 
group1Instance1);
+    assertSame("Cached instance should be returned", group2Instance2, 
group2Instance1);
+
+    // Clean up
+    group1Instance1.close();
+    group2Instance1.close();
+  }
+
+  /**
+   * Tests that close() removes the instance from the cache. Verifies that 
after closing, a new call
+   * to get() creates a new instance.
+   */
+  @Test
+  public void testReplicationLogProcessorCacheRemovalOnClose() throws 
Exception {
+    final String haGroupName = "testHAGroup";
+
+    // Get initial instance
+    ReplicationLogProcessor group1Instance1 = 
ReplicationLogProcessor.get(conf, haGroupName);
+    assertNotNull("ReplicationLogProcessor should not be null", 
group1Instance1);
+    assertFalse("Executor service must not be shut down",
+      group1Instance1.getExecutorService().isShutdown());
+
+    // Verify cached instance is returned
+    ReplicationLogProcessor group1Instance2 = 
ReplicationLogProcessor.get(conf, haGroupName);
+    assertSame("Same instance should be returned before close", 
group1Instance2, group1Instance1);
+
+    // Close the group
+    group1Instance1.close();
+    assertTrue("Executor service must be shut down",
+      group1Instance1.getExecutorService().isShutdown());
+
+    // Get instance after close - should be a new instance
+    ReplicationLogProcessor group1Instance3 = 
ReplicationLogProcessor.get(conf, haGroupName);
+    assertNotNull("ReplicationLogProcessor should not be null after close", 
group1Instance3);
+    assertFalse("Executor service must not be shut down",
+      group1Instance3.getExecutorService().isShutdown());
+    assertNotSame("New instance should be created after close", 
group1Instance1, group1Instance3);
+    assertEquals("HA Group ID should match", haGroupName, 
group1Instance3.getHaGroupName());
+
+    // Clean up
+    group1Instance3.close();
+  }
+
+  private LogFileWriter initLogFileWriter(Path filePath) throws IOException {
+    LogFileWriter writer = new LogFileWriter();
+    LogFileWriterContext writerContext =
+      new 
LogFileWriterContext(conf).setFileSystem(localFs).setFilePath(filePath);
+    writer.init(writerContext);
+    return writer;
+  }
+
+  private Connection getConnection() throws Exception {
+    return getConnection(PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+  }
+
+  private List<Mutation> generateHBaseMutations(final PhoenixConnection 
phoenixConnection,
+    final int rows, final String tableName, final long timestamp, final String 
rowKeyPrefix)
+    throws Exception {
+    List<Mutation> mutations = new ArrayList<>();
+    int randomNumber = new Random().nextInt(1000000);
+    for (int i = 0; i < rows; i++) {
+      final String dml = String.format(UPSERT_SQL_STATEMENT, tableName,
+        rowKeyPrefix + randomNumber + i, "b" + randomNumber + i, "c" + 
randomNumber + i, i + 1);
+      phoenixConnection.createStatement().execute(dml);
+      Iterator<Pair<byte[], List<Mutation>>> iterator =
+        phoenixConnection.getMutationState().toMutations();
+      while (iterator.hasNext()) {
+        Pair<byte[], List<Mutation>> mutationPair = iterator.next();
+        for (Mutation mutation : mutationPair.getSecond()) {
+          if (mutation instanceof Put) {
+            Put put = (Put) mutation;
+            Put newPut = new Put(put.getRow());
+            newPut.setTimestamp(timestamp);
+            // Copy cells with mutation timestamp
+            for (Cell cell : 
put.getFamilyCellMap().values().stream().flatMap(List::stream)
+              .collect(Collectors.toList())) {
+              newPut.add(cloneCellWithCustomTimestamp(cell, timestamp));
+            }
+            mutations.add(newPut);
+          } else if (mutation instanceof Delete) {
+            Delete delete = (Delete) mutation;
+            Delete newDelete = new Delete(delete.getRow());
+            newDelete.setTimestamp(delete.getTimestamp());
+            // Copy cells with mutation timestamp
+            for (Cell cell : 
delete.getFamilyCellMap().values().stream().flatMap(List::stream)
+              .collect(Collectors.toList())) {
+              newDelete.add(cloneCellWithCustomTimestamp(cell, timestamp));
+            }
+            mutations.add(newDelete);
+          }
+        }
+      }
+    }
+    return mutations;
+  }
+
+  private Connection getConnection(Properties props) throws Exception {
+    props.setProperty(QueryServices.DROP_METADATA_ATTRIB, 
Boolean.toString(true));
+    // Force real driver to be used as the test one doesn't handle creating
+    // more than one ConnectionQueryService
+    props.setProperty(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
StringUtil.EMPTY_STRING);
+    // Create new ConnectionQueryServices so that we can set 
DROP_METADATA_ATTRIB
+    String url = QueryUtil.getConnectionUrl(props, config, PRINCIPAL);
+    return DriverManager.getConnection(url, props);
+  }
+
+  /**
+   * Validates that the given mutations have been correctly applied to the 
specified table by
+   * creating a temporary table, applying the mutations, and comparing the 
results.
+   */
+  private void validate(String tableName, List<Mutation> mutations) throws 
IOException {
+    // Create a temporary table with the same schema
+    String tempTableName = tableName + "_temp";
+    try (Connection conn = getConnection()) {
+      // Get the table descriptor of the original table
+      Table originalTable =
+        
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName));
+      TableDescriptor originalDesc = originalTable.getDescriptor();
+
+      // Create temporary table with same schema
+      Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+      TableDescriptorBuilder builder =
+        TableDescriptorBuilder.newBuilder(TableName.valueOf(tempTableName));
+      for (ColumnFamilyDescriptor cf : originalDesc.getColumnFamilies()) {
+        builder.setColumnFamily(cf);
+      }
+      admin.createTable(builder.build());
+
+      // Apply mutations to temporary table
+      Table tempTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+        .getTable(Bytes.toBytes(tempTableName));
+
+      // Apply all mutations in a batch
+      tempTable.batch(mutations, null);
+
+      // Compare data between original and temporary tables
+      Scan scan = new Scan();
+      scan.setRaw(true); // Enable raw scan to see delete markers
+
+      ResultScanner originalScanner = originalTable.getScanner(scan);
+      ResultScanner tempScanner = tempTable.getScanner(scan);
+
+      Map<String, Result> originalResults = new HashMap<>();
+      Map<String, Result> tempResults = new HashMap<>();
+
+      // Collect results from original table
+      for (Result result : originalScanner) {
+        String rowKey = Bytes.toString(result.getRow());
+        originalResults.put(rowKey, result);
+      }
+
+      // Collect results from temporary table
+      for (Result result : tempScanner) {
+        String rowKey = Bytes.toString(result.getRow());
+        tempResults.put(rowKey, result);
+      }
+
+      // Compare results
+      assertEquals("Number of rows should match", originalResults.size(), 
tempResults.size());
+
+      for (Map.Entry<String, Result> entry : originalResults.entrySet()) {
+        String rowKey = entry.getKey();
+        Result originalResult = entry.getValue();
+        Result tempResult = tempResults.get(rowKey);
+
+        assertNotNull("Row " + rowKey + " should exist in temporary table", 
tempResult);
+
+        // Compare cells using sets approach for better performance
+        Cell[] originalCells = originalResult.rawCells();
+        Cell[] tempCells = tempResult.rawCells();
+
+        assertEquals("Number of cells should match for row " + rowKey, 
originalCells.length,
+          tempCells.length);
+
+        // Use sets comparison approach - convert cells to comparable format
+        java.util.Set<String> originalCellSet = new java.util.HashSet<>();
+        java.util.Set<String> tempCellSet = new java.util.HashSet<>();
+
+        // Create string representations of cells for set comparison
+        for (Cell cell : originalCells) {
+          originalCellSet.add(CellUtil.toString(cell, false));
+        }
+
+        for (Cell cell : tempCells) {
+          tempCellSet.add(CellUtil.toString(cell, false));
+        }
+
+        // Compare sets directly
+        assertEquals("Cell sets do not match for row " + rowKey, 
originalCellSet, tempCellSet);
+      }
+
+      // Cleanup
+      originalScanner.close();
+      tempScanner.close();
+      originalTable.close();
+      tempTable.close();
+      admin.disableTable(TableName.valueOf(tempTableName));
+      admin.deleteTable(TableName.valueOf(tempTableName));
+    } catch (Exception e) {
+      throw new IOException("Failed to validate mutations: " + e.getMessage(), 
e);
+    }
+  }
+
+  private Cell cloneCellWithCustomTimestamp(Cell cell, long timestamp) {
+    return CellBuilderFactory.create(CellBuilderType.DEEP_COPY)
+      .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
+      .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength())
+      .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength())
+      .setTimestamp(timestamp) // Use mutation timestamp for all cells
+      .setType(cell.getType())
+      .setValue(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength()).build();
+  }
+}

Reply via email to