apurtell commented on code in PR #2188:
URL: https://github.com/apache/phoenix/pull/2188#discussion_r2146270282


##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessor.class);
+
+    /**
+     * The maximum count of mutations to process in single batch while reading 
replication log file
+     */
+    public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE =
+            "phoenix.replication.log.standby.replay.batch.size";
+
+    /**
+     * The default batch size for reading the replication log file.
+     * Assuming each log record to be 10 KB (un-compressed) and allow at max 
64 MB of
+     * in-memory records to be processed
+     */
+    public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE 
= 6400;
+
+    /**
+     * The maximum number of retries for HBase client operations while 
applying the mutations
+     */
+    public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT =
+            "phoenix.replication.standby.hbase.client.retries.number";
+
+    /**
+     * The default number of retries for HBase client operations while 
applying the mutations.
+     */
+    public static final int 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4;

Review Comment:
   Why 4? Why not 3? or 5? Does this align with other retry defaults, like in 
the code we added on the writer side? (It does not, fwiw..)
   These are just starting points for users, sure, but should work well without 
being changed. The default number of retries and the default timeouts work 
together to establish a time budget for transient issues. After this budget is 
exhausted the issues are raised by exception to higher layers, which may in 
some cases abort the server.
   Let's think them through.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessor.class);
+
+    /**
+     * The maximum count of mutations to process in single batch while reading 
replication log file
+     */
+    public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE =
+            "phoenix.replication.log.standby.replay.batch.size";
+
+    /**
+     * The default batch size for reading the replication log file.
+     * Assuming each log record to be 10 KB (un-compressed) and allow at max 
64 MB of
+     * in-memory records to be processed
+     */
+    public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE 
= 6400;
+
+    /**
+     * The maximum number of retries for HBase client operations while 
applying the mutations
+     */
+    public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT =
+            "phoenix.replication.standby.hbase.client.retries.number";
+
+    /**
+     * The default number of retries for HBase client operations while 
applying the mutations.
+     */
+    public static final int 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4;
+
+    /**
+     * The timeout for HBase client operations while applying the mutations.
+     */
+    public static final String 
REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS =
+            "phoenix.replication.standby.hbase.client.operations.timeout";
+
+    /**
+     * The default timeout for HBase client operations while applying the 
mutations.
+     */
+    public static final int 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = 10000;

Review Comment:
   So the total time we'd wait before throwing an exception up to the higher 
layers if consistently timing out is 50 seconds (10s + (4 retries * 10s))? Why 
50? Should it be 60? or 30? or 120? or 300? We are applying edits on the 
standby, we can certainly be more tolerant than on the write site, where 
timeouts/retries need to be short to fail fast. We don't need to fail fast in 
the reader. On the other hand we don't want to wait so long as to complicate 
failover or miss a failover SLA.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessor.class);
+
+    /**
+     * The maximum count of mutations to process in single batch while reading 
replication log file
+     */
+    public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE =
+            "phoenix.replication.log.standby.replay.batch.size";
+
+    /**
+     * The default batch size for reading the replication log file.
+     * Assuming each log record to be 10 KB (un-compressed) and allow at max 
64 MB of
+     * in-memory records to be processed
+     */
+    public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE 
= 6400;
+
+    /**
+     * The maximum number of retries for HBase client operations while 
applying the mutations
+     */
+    public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT =
+            "phoenix.replication.standby.hbase.client.retries.number";
+
+    /**
+     * The default number of retries for HBase client operations while 
applying the mutations.
+     */
+    public static final int 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4;
+
+    /**
+     * The timeout for HBase client operations while applying the mutations.
+     */
+    public static final String 
REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS =
+            "phoenix.replication.standby.hbase.client.operations.timeout";
+
+    /**
+     * The default timeout for HBase client operations while applying the 
mutations.
+     */
+    public static final int 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = 10000;
+
+    private final Configuration conf;
+
+    private final ExecutorService executorService;
+
+    /**
+     * This {@link AsyncConnection} is used for handling mutations
+     */
+    private volatile AsyncConnection asyncConnection;
+
+    private final Object asyncConnectionLock = new Object();

Review Comment:
   Do you need this? Taking a lock on this to lazily initialize the 
`asyncConnection` is equivalent to `synchronized (this)` so just use that?



##########
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessorTest.class);
+
+    private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s 
(ID VARCHAR PRIMARY KEY, " +
+            "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)";
+
+    private static final String UPSERT_SQL_STATEMENT = "upsert into %s values 
('%s', '%s', '%s', %s)";
+
+    private static final String PRINCIPAL = "replicationLogProcessor";
+
+    @ClassRule
+    public static TemporaryFolder testFolder = new TemporaryFolder();
+
+    private static Configuration conf;
+    private static FileSystem localFs;
+    private static ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupBeforeClass() throws Exception {
+        conf = getUtility().getConfiguration();
+        localFs = FileSystem.getLocal(conf);
+        executorService = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        if(executorService != null) {
+            executorService.shutdown();
+        }
+    }
+
+    /**
+     * Tests successful creation of LogFileReader with a properly formatted 
log file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithValidLogFile() throws IOException {
+        // Test with valid log file
+        Path validFilePath = new 
Path(testFolder.newFile("valid_log_file").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(validFilePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Verify file exists and has content
+        assertTrue("Valid log file should exist", 
localFs.exists(validFilePath));
+        assertTrue("Valid log file should have content", 
localFs.getFileStatus(validFilePath).getLen() > 0);
+
+        // Test createLogFileReader with valid file - should succeed
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        LogFileReader reader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+
+        // Verify reader is created successfully
+        assertNotNull("Reader should not be null for valid file", reader);
+        assertNotNull("Reader context should not be null", 
reader.getContext());
+        assertEquals("File path should match", validFilePath, 
reader.getContext().getFilePath());
+        assertEquals("File system should match", localFs, 
reader.getContext().getFileSystem());
+
+        // Verify we can read from the reader
+        assertTrue("Reader should have records", reader.iterator().hasNext());
+
+        // Clean up
+        reader.close();
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with a 
non-existent file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithNonExistentFile() {
+        Path nonExistentPath = new Path(testFolder.toString(), 
"non_existent_file");
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
nonExistentPath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    e.getMessage().contains("Log file does not exist: " + 
nonExistentPath));
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with an 
invalid/corrupted file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithInvalidLogFile() throws IOException 
{
+        Path invalidFilePath = new 
Path(testFolder.newFile("invalid_file").toURI());
+        localFs.create(invalidFilePath).close(); // Create empty file
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
invalidFilePath);
+            fail("Should throw IOException for invalid file");
+        } catch (IOException e) {
+            // Should throw some kind of IOException when trying to read header
+            assertTrue("Should throw IOException", true);
+        } finally {
+            // Delete the invalid file
+            localFs.delete(invalidFilePath);
+        }
+    }
+
+    /**
+     * Tests the closeReader method with both null and valid LogFileReader 
instances.
+     */
+    @Test
+    public void testCloseReader() throws IOException {
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        replicationLogProcessor.closeReader(null);
+        Path filePath = new 
Path(testFolder.newFile("testCloseReader").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(filePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Test with valid reader
+        LogFileReader reader = Mockito.spy(new LogFileReader());
+
+        reader.init(new LogFileReaderContext(conf)
+                .setFileSystem(localFs)
+                .setFilePath(filePath));
+
+        replicationLogProcessor.closeReader(reader);
+
+        // Ensure reader's close method is called only once
+        Mockito.verify(reader, Mockito.times(1)).close();
+    }
+
+    /**
+     * Tests processing an empty mutation map - should complete without errors.
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithEmptyMap() {
+        Map<String, List<Mutation>> emptyMap = new HashMap<>();
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Process empty batch - should not throw any exceptions and should 
return immediately
+        try {
+            replicationLogProcessor.processReplicationLogBatch(emptyMap);
+            // If we reach here, the empty map was processed successfully
+            assertTrue("Processing empty map should complete without errors", 
true);
+        } catch (Exception e) {
+            fail("Processing empty map should not throw exception: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Tests exception handling when attempting to process mutations for 
non-existent tables.
+     */
+    @Test
+    public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() {
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5);
+        tableMutationsMap.put("NON_EXISTENT_TABLE", 
Collections.singletonList(mutation));
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        try {
+            
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+            fail("Should throw TableNotFoundException for non-existent table");
+        } catch (IOException exception) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    exception.getMessage().contains("TableNotFoundException"));
+        }
+    }
+
+    /**
+     * Tests behavior when HBase operations fail (simulated by disabling 
table).
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithHBaseFailure() throws 
Exception {
+        final String tableName = "T_" + generateUniqueName();
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Create table first
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
tableName));
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+            // Generate some mutations for the table
+            List<Mutation> mutations = 
generateHBaseMutations(phoenixConnection, 2, tableName, 10L);
+            tableMutationsMap.put(tableName, mutations);
+            TableName hbaseTableName = TableName.valueOf(tableName);
+            try (Admin admin = 
phoenixConnection.getQueryServices().getAdmin()) {
+                // Disable the table to simulate HBase failure
+                admin.disableTable(hbaseTableName);
+                LOG.info("Disabled table {} to simulate HBase failure", 
tableName);
+
+                ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+                // Attempt to process mutations on disabled table - should fail
+                try {
+                    
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+                    fail("Should throw IOException when trying to apply 
mutations to disabled table");
+                } catch (IOException e) {
+                    // Expected behavior - disabled table should cause 
IOException
+                    assertTrue("Should throw IOException for disabled table", 
true);
+                    LOG.info("Expected IOException caught when processing 
mutations on disabled table: " + e.getMessage());
+                }
+            }
+        }
+    }
+
+    /**
+     * Tests end-to-end processing of a valid log file with mutations for 
multiple tables.
+     */
+    @Test
+    public void testProcessLogFileForValidLogFile() throws Exception {
+        final String table1Name = "T_" + generateUniqueName();
+        final String table2Name = "T_" + generateUniqueName();
+        final Path filePath = new 
Path(testFolder.newFile("testProcessLogFileEnd2End").toURI());
+        LogFileWriter writer = initLogFileWriter(filePath);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1Name));
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2Name));
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+            List<Mutation> table1Mutations = 
generateHBaseMutations(phoenixConnection, 2, table1Name, 100L);
+            List<Mutation> table2Mutations = 
generateHBaseMutations(phoenixConnection, 5, table2Name, 101L);
+            table1Mutations.forEach(mutation -> {
+                try {
+                    writer.append(table1Name, mutation.hashCode(), mutation);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            table2Mutations.forEach(mutation -> {
+                try {
+                    writer.append(table2Name, mutation.hashCode(), mutation);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            writer.sync();
+            writer.close();
+
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.processLogFile(localFs, filePath);
+
+            validate(table1Name, table1Mutations);
+            validate(table2Name, table2Mutations);
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to process a non-existent log file.
+     */
+    @Test
+    public void testProcessLogFileWithNonExistentFile() throws Exception {
+        // Create a path to a file that doesn't exist
+        Path nonExistentFilePath = new 
Path(testFolder.getRoot().getAbsolutePath(), "non_existent_log_file.log");
+        // Verify the file doesn't exist
+        assertFalse("Non-existent file should not exist", 
localFs.exists(nonExistentFilePath));
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Attempt to process non-existent file - should throw IOException
+        try {
+            replicationLogProcessor.processLogFile(localFs, 
nonExistentFilePath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            // Expected behavior - non-existent file should cause IOException
+            assertTrue("Should throw IOException for non-existent file", true);
+        }
+    }
+
+    /**
+     * Tests batching logic when processing log files with mutations for 
multiple tables.
+     */
+    @Test
+    public void testProcessLogFileBatchingWithMultipleTables() throws 
Exception {
+        final Path multiTableBatchFilePath = new 
Path(testFolder.newFile("testMultiTableBatch").toURI());
+        final String table1Name = "T1_" + generateUniqueName();
+        final String table2Name = "T2_" + generateUniqueName();
+        final int batchSize = 4;
+        final int recordsPerTable = 3;
+        final int totalRecords = recordsPerTable * 2; // 6 total records
+        final int expectedBatchCalls = (totalRecords + batchSize - 1) / 
batchSize; // 2 calls
+        // Create log file with mutations for multiple tables
+        LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath);
+        // Add mutations alternating between tables using LogFileTestUtil
+        for (int i = 0; i < recordsPerTable; i++) {
+            // Add mutation for table1
+            Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, 
(i * 2) + 1);
+            writer.append(table1Name, (i * 2) + 1, put1);
+            writer.sync();
+            // Add mutation for table2
+            Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, 
(i * 2) + 2);
+            writer.append(table2Name, (i * 2) + 2, put2);
+            writer.sync();
+        }
+        writer.close();
+        // Create processor with custom batch size and spy on it
+        Configuration testConf = new Configuration(conf);
+        
testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
 batchSize);
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(testConf, executorService);
+        // Validate that the batch size is correctly set
+        assertEquals("Batch size should be set correctly", batchSize, 
replicationLogProcessor.getBatchSize());
+        ReplicationLogProcessor spyProcessor = 
Mockito.spy(replicationLogProcessor);
+        // Mock the processReplicationLogBatch method
+        
Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class));
+        // Process the log file
+        spyProcessor.processLogFile(localFs, multiTableBatchFilePath);
+        // Verify processReplicationLogBatch was called the expected number of 
times
+        Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls))
+            .processReplicationLogBatch(Mockito.any(Map.class));

Review Comment:
   You can also use your spy to check if expected mutations were found in each 
invocation of processReplicationLogBatch, put1 in the first, put2 in the 
second, as a sanity check.



##########
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessorTest.class);
+
+    private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s 
(ID VARCHAR PRIMARY KEY, " +
+            "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)";
+
+    private static final String UPSERT_SQL_STATEMENT = "upsert into %s values 
('%s', '%s', '%s', %s)";
+
+    private static final String PRINCIPAL = "replicationLogProcessor";
+
+    @ClassRule
+    public static TemporaryFolder testFolder = new TemporaryFolder();
+
+    private static Configuration conf;
+    private static FileSystem localFs;
+    private static ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupBeforeClass() throws Exception {
+        conf = getUtility().getConfiguration();
+        localFs = FileSystem.getLocal(conf);
+        executorService = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        if(executorService != null) {
+            executorService.shutdown();
+        }
+    }
+
+    /**
+     * Tests successful creation of LogFileReader with a properly formatted 
log file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithValidLogFile() throws IOException {
+        // Test with valid log file
+        Path validFilePath = new 
Path(testFolder.newFile("valid_log_file").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(validFilePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Verify file exists and has content
+        assertTrue("Valid log file should exist", 
localFs.exists(validFilePath));
+        assertTrue("Valid log file should have content", 
localFs.getFileStatus(validFilePath).getLen() > 0);
+
+        // Test createLogFileReader with valid file - should succeed
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        LogFileReader reader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+
+        // Verify reader is created successfully
+        assertNotNull("Reader should not be null for valid file", reader);
+        assertNotNull("Reader context should not be null", 
reader.getContext());
+        assertEquals("File path should match", validFilePath, 
reader.getContext().getFilePath());
+        assertEquals("File system should match", localFs, 
reader.getContext().getFileSystem());
+
+        // Verify we can read from the reader
+        assertTrue("Reader should have records", reader.iterator().hasNext());
+
+        // Clean up
+        reader.close();
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with a 
non-existent file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithNonExistentFile() {
+        Path nonExistentPath = new Path(testFolder.toString(), 
"non_existent_file");
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
nonExistentPath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    e.getMessage().contains("Log file does not exist: " + 
nonExistentPath));
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with an 
invalid/corrupted file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithInvalidLogFile() throws IOException 
{
+        Path invalidFilePath = new 
Path(testFolder.newFile("invalid_file").toURI());
+        localFs.create(invalidFilePath).close(); // Create empty file
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
invalidFilePath);
+            fail("Should throw IOException for invalid file");
+        } catch (IOException e) {
+            // Should throw some kind of IOException when trying to read header
+            assertTrue("Should throw IOException", true);
+        } finally {
+            // Delete the invalid file
+            localFs.delete(invalidFilePath);
+        }
+    }
+
+    /**
+     * Tests the closeReader method with both null and valid LogFileReader 
instances.
+     */
+    @Test
+    public void testCloseReader() throws IOException {
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        replicationLogProcessor.closeReader(null);
+        Path filePath = new 
Path(testFolder.newFile("testCloseReader").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(filePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Test with valid reader
+        LogFileReader reader = Mockito.spy(new LogFileReader());
+
+        reader.init(new LogFileReaderContext(conf)
+                .setFileSystem(localFs)
+                .setFilePath(filePath));
+
+        replicationLogProcessor.closeReader(reader);
+
+        // Ensure reader's close method is called only once
+        Mockito.verify(reader, Mockito.times(1)).close();
+    }
+
+    /**
+     * Tests processing an empty mutation map - should complete without errors.
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithEmptyMap() {
+        Map<String, List<Mutation>> emptyMap = new HashMap<>();
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Process empty batch - should not throw any exceptions and should 
return immediately
+        try {
+            replicationLogProcessor.processReplicationLogBatch(emptyMap);
+            // If we reach here, the empty map was processed successfully
+            assertTrue("Processing empty map should complete without errors", 
true);
+        } catch (Exception e) {
+            fail("Processing empty map should not throw exception: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Tests exception handling when attempting to process mutations for 
non-existent tables.
+     */
+    @Test
+    public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() {
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5);
+        tableMutationsMap.put("NON_EXISTENT_TABLE", 
Collections.singletonList(mutation));
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        try {
+            
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+            fail("Should throw TableNotFoundException for non-existent table");
+        } catch (IOException exception) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    exception.getMessage().contains("TableNotFoundException"));
+        }
+    }
+
+    /**
+     * Tests behavior when HBase operations fail (simulated by disabling 
table).
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithHBaseFailure() throws 
Exception {
+        final String tableName = "T_" + generateUniqueName();
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Create table first
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
tableName));
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+            // Generate some mutations for the table
+            List<Mutation> mutations = 
generateHBaseMutations(phoenixConnection, 2, tableName, 10L);
+            tableMutationsMap.put(tableName, mutations);
+            TableName hbaseTableName = TableName.valueOf(tableName);
+            try (Admin admin = 
phoenixConnection.getQueryServices().getAdmin()) {
+                // Disable the table to simulate HBase failure
+                admin.disableTable(hbaseTableName);
+                LOG.info("Disabled table {} to simulate HBase failure", 
tableName);
+
+                ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+                // Attempt to process mutations on disabled table - should fail
+                try {
+                    
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+                    fail("Should throw IOException when trying to apply 
mutations to disabled table");
+                } catch (IOException e) {
+                    // Expected behavior - disabled table should cause 
IOException
+                    assertTrue("Should throw IOException for disabled table", 
true);
+                    LOG.info("Expected IOException caught when processing 
mutations on disabled table: " + e.getMessage());
+                }
+            }
+        }
+    }
+
+    /**
+     * Tests end-to-end processing of a valid log file with mutations for 
multiple tables.
+     */
+    @Test
+    public void testProcessLogFileForValidLogFile() throws Exception {
+        final String table1Name = "T_" + generateUniqueName();
+        final String table2Name = "T_" + generateUniqueName();
+        final Path filePath = new 
Path(testFolder.newFile("testProcessLogFileEnd2End").toURI());
+        LogFileWriter writer = initLogFileWriter(filePath);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1Name));
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2Name));
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+            List<Mutation> table1Mutations = 
generateHBaseMutations(phoenixConnection, 2, table1Name, 100L);
+            List<Mutation> table2Mutations = 
generateHBaseMutations(phoenixConnection, 5, table2Name, 101L);
+            table1Mutations.forEach(mutation -> {
+                try {
+                    writer.append(table1Name, mutation.hashCode(), mutation);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            table2Mutations.forEach(mutation -> {
+                try {
+                    writer.append(table2Name, mutation.hashCode(), mutation);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            writer.sync();
+            writer.close();
+
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.processLogFile(localFs, filePath);
+
+            validate(table1Name, table1Mutations);
+            validate(table2Name, table2Mutations);
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to process a non-existent log file.
+     */
+    @Test
+    public void testProcessLogFileWithNonExistentFile() throws Exception {
+        // Create a path to a file that doesn't exist
+        Path nonExistentFilePath = new 
Path(testFolder.getRoot().getAbsolutePath(), "non_existent_log_file.log");
+        // Verify the file doesn't exist
+        assertFalse("Non-existent file should not exist", 
localFs.exists(nonExistentFilePath));
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Attempt to process non-existent file - should throw IOException
+        try {
+            replicationLogProcessor.processLogFile(localFs, 
nonExistentFilePath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            // Expected behavior - non-existent file should cause IOException
+            assertTrue("Should throw IOException for non-existent file", true);
+        }
+    }
+
+    /**
+     * Tests batching logic when processing log files with mutations for 
multiple tables.
+     */
+    @Test
+    public void testProcessLogFileBatchingWithMultipleTables() throws 
Exception {
+        final Path multiTableBatchFilePath = new 
Path(testFolder.newFile("testMultiTableBatch").toURI());
+        final String table1Name = "T1_" + generateUniqueName();
+        final String table2Name = "T2_" + generateUniqueName();
+        final int batchSize = 4;
+        final int recordsPerTable = 3;
+        final int totalRecords = recordsPerTable * 2; // 6 total records
+        final int expectedBatchCalls = (totalRecords + batchSize - 1) / 
batchSize; // 2 calls
+        // Create log file with mutations for multiple tables
+        LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath);
+        // Add mutations alternating between tables using LogFileTestUtil
+        for (int i = 0; i < recordsPerTable; i++) {
+            // Add mutation for table1
+            Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, 
(i * 2) + 1);
+            writer.append(table1Name, (i * 2) + 1, put1);
+            writer.sync();
+            // Add mutation for table2
+            Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, 
(i * 2) + 2);
+            writer.append(table2Name, (i * 2) + 2, put2);
+            writer.sync();
+        }
+        writer.close();
+        // Create processor with custom batch size and spy on it
+        Configuration testConf = new Configuration(conf);
+        
testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
 batchSize);
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(testConf, executorService);
+        // Validate that the batch size is correctly set
+        assertEquals("Batch size should be set correctly", batchSize, 
replicationLogProcessor.getBatchSize());
+        ReplicationLogProcessor spyProcessor = 
Mockito.spy(replicationLogProcessor);
+        // Mock the processReplicationLogBatch method
+        
Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class));
+        // Process the log file
+        spyProcessor.processLogFile(localFs, multiTableBatchFilePath);
+        // Verify processReplicationLogBatch was called the expected number of 
times
+        Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls))
+            .processReplicationLogBatch(Mockito.any(Map.class));
+    }
+
+    /**
+     * Tests processing of empty log files (files with header/trailer but no 
mutation records).
+     */
+    @Test
+    public void testProcessLogFileWithEmptyFile() throws Exception {
+        final Path emptyFilePath = new 
Path(testFolder.newFile("testProcessLogFileEmpty").toURI());
+        LogFileWriter writer = initLogFileWriter(emptyFilePath);
+
+        // Close the writer without adding any records - this creates a valid 
empty log file
+        writer.close();
+
+        // Verify file exists and has some content (header + trailer)
+        assertTrue("Empty log file should exist", 
localFs.exists(emptyFilePath));
+        assertTrue("Empty log file should have header/trailer content", 
localFs.getFileStatus(emptyFilePath).getLen() > 0);
+
+        // Process the empty log file - should not throw any exceptions
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        try {
+            replicationLogProcessor.processLogFile(localFs, emptyFilePath);
+            // If we reach here, the empty file was processed successfully
+            assertTrue("Processing empty log file should complete without 
errors", true);
+        } catch (Exception e) {
+            fail("Processing empty log file should not throw exception: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Tests processing of log files that were not closed, ensuring it's 
successf.
+     */
+    @Test
+    public void testProcessLogFileForUnClosedFile() throws Exception {
+        final Path emptyFilePath = new 
Path(testFolder.newFile("testProcessLogFileForUnClosedFile").toURI());
+        LogFileWriter writer = initLogFileWriter(emptyFilePath);
+
+        // Add one mutation
+        Mutation put = LogFileTestUtil.newPut("row1", 3L, 4);
+        writer.append("table", 1, put);
+        writer.sync();
+
+        // Process the file without closing - should not throw any exceptions
+        ReplicationLogProcessor spyProcessor = Mockito.spy(new 
ReplicationLogProcessor(conf, executorService));
+        
Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class));
+
+        spyProcessor.processLogFile(localFs, emptyFilePath);
+
+        // Verify processReplicationLogBatch was called the expected number of 
times
+        Mockito.verify(spyProcessor, Mockito.times(1))
+                .processReplicationLogBatch(Mockito.any(Map.class));
+    }
+
+    /**
+     * Tests that configuration parameters are properly read and applied.
+     */
+    @Test
+    public void testReplicationLogProcessorConfiguration() {
+        // Test that all default configurations are used when no custom 
configuration is provided
+        ReplicationLogProcessor defaultProcessor = new 
ReplicationLogProcessor(conf, executorService);
+
+        // Validate default batch size
+        assertEquals("Default batch size should be used",
+                
ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
+                defaultProcessor.getBatchSize());
+
+        // Validate default HBase client retries count
+        assertEquals("Default HBase client retries count should be used",
+                
ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT,
+                defaultProcessor.getHBaseClientRetriesCount());
+
+        // Validate default HBase client operation timeout
+        assertEquals("Default HBase client operation timeout should be used",
+                
ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS,
+                defaultProcessor.getHBaseClientOperationTimeout());
+
+        // Test that all custom configurations are honored
+        Configuration customConf = new Configuration(conf);
+
+        // Set custom values for all configuration parameters
+        int customBatchSize = 1000;
+        int customRetriesCount = 6;
+        int customOperationTimeout = 15000;
+
+        
customConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
 customBatchSize);
+        
customConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT,
 customRetriesCount);
+        
customConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS,
 customOperationTimeout);
+
+        ReplicationLogProcessor customProcessor = new 
ReplicationLogProcessor(customConf, executorService);
+
+        // Validate all custom configurations are honored
+        assertEquals("Custom batch size should be honored",
+                customBatchSize, customProcessor.getBatchSize());
+
+        assertEquals("Custom HBase client retries count should be honored",
+                customRetriesCount, 
customProcessor.getHBaseClientRetriesCount());
+
+        assertEquals("Custom HBase client operation timeout should be honored",
+                customOperationTimeout, 
customProcessor.getHBaseClientOperationTimeout());
+    }
+
+    /**
+     * Tests batching logic with various record counts and batch sizes.
+     */
+    @Test
+    public void testProcessLogFileBatchingLogic() throws Exception {
+        // Test multiple batching scenarios to ensure the logic works correctly
+
+        // Test case 1: General case where total records don't align perfectly 
with batch size
+        testProcessLogFileBatching(10, 3);
+
+        // Test case 2: Edge case where total records exactly matches batch 
size
+        testProcessLogFileBatching(5, 5);
+
+        // Test case 3: Single record with large batch size
+        testProcessLogFileBatching(1, 10);
+
+        // Test case 4: Multiple full batches
+        testProcessLogFileBatching(12, 4);
+    }
+
+    /**
+     * Helper method to test batching scenarios with different record counts 
and batch sizes
+     */
+    private void testProcessLogFileBatching(int totalRecords, int batchSize) 
throws Exception {
+        final Path batchTestFilePath = new Path(testFolder.newFile("test_" + 
new Random(1000)).toURI());
+        final String tableName = "T_" + generateUniqueName();
+        final int expectedBatchCalls = (totalRecords + batchSize - 1) / 
batchSize; // Ceiling division
+
+        // Create log file with specific number of records
+        LogFileWriter writer = initLogFileWriter(batchTestFilePath);
+
+        // Add exactly totalRecords mutations to the log file using 
LogFileTestUtil
+        for (int i = 0; i < totalRecords; i++) {
+            Mutation put = LogFileTestUtil.newPut("row" + i, i + 1, i + 1);
+            writer.append(tableName, i + 1, put);
+            writer.sync();
+        }
+        writer.close();
+
+        // Create a configuration with custom batch size
+        Configuration testConf = new Configuration(conf);
+        
testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
 batchSize);
+
+        // Create processor with custom batch size and spy on it
+        ReplicationLogProcessor testProcessor = new 
ReplicationLogProcessor(testConf, executorService);
+
+        // Validate that the batch size is correctly set
+        assertEquals("Batch size incorrectly set", batchSize, 
testProcessor.getBatchSize());
+
+        ReplicationLogProcessor spyProcessor = Mockito.spy(testProcessor);
+
+        // Mock the processReplicationLogBatch method to do nothing but track 
calls
+        
Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class));
+
+        // Process the log file
+        spyProcessor.processLogFile(localFs, batchTestFilePath);
+
+        // Verify processReplicationLogBatch was called the expected number of 
times
+        Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls))
+                .processReplicationLogBatch(Mockito.any(Map.class));
+    }

Review Comment:
   You can also verify that the expected mutations for the batch were found in 
the batch.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessor.class);
+
+    /**
+     * The maximum count of mutations to process in single batch while reading 
replication log file
+     */
+    public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE =
+            "phoenix.replication.log.standby.replay.batch.size";
+
+    /**
+     * The default batch size for reading the replication log file.
+     * Assuming each log record to be 10 KB (un-compressed) and allow at max 
64 MB of
+     * in-memory records to be processed
+     */
+    public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE 
= 6400;
+
+    /**
+     * The maximum number of retries for HBase client operations while 
applying the mutations
+     */
+    public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT =
+            "phoenix.replication.standby.hbase.client.retries.number";
+
+    /**
+     * The default number of retries for HBase client operations while 
applying the mutations.
+     */
+    public static final int 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4;
+
+    /**
+     * The timeout for HBase client operations while applying the mutations.
+     */
+    public static final String 
REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS =
+            "phoenix.replication.standby.hbase.client.operations.timeout";
+
+    /**
+     * The default timeout for HBase client operations while applying the 
mutations.
+     */
+    public static final int 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = 10000;
+
+    private final Configuration conf;
+
+    private final ExecutorService executorService;
+
+    /**
+     * This {@link AsyncConnection} is used for handling mutations
+     */
+    private volatile AsyncConnection asyncConnection;
+
+    private final Object asyncConnectionLock = new Object();
+
+    private final int batchSize;
+
+    /**
+     * Creates a new ReplicationLogProcessor with the given configuration and 
executor service.
+     * @param conf The configuration to use
+     * @param executorService The executor service for processing mutations
+     * @throws IOException if initialization fails
+     */
+    public ReplicationLogProcessor(final Configuration conf,
+            final ExecutorService executorService) {
+        // Create a copy of configuration as some of the properties would be
+        // overridden
+        this.conf = HBaseConfiguration.create(conf);
+        this.executorService = executorService;
+        this.batchSize = 
this.conf.getInt(REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
+                DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE);
+        decorateConf();
+    }
+
+    /**
+     * Decorate the Configuration object to make replication more receptive to 
delays by
+     * reducing the timeout and number of retries.
+     */
+    private void decorateConf() {
+        this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+                
this.conf.getInt(REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT,
+                        
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT));
+        this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+                
this.conf.getInt(REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS,
+                        
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS));
+    }
+
+    public void processLogFile(FileSystem fs, Path filePath) throws 
IOException {
+
+        // Map from Table Name to List of Mutations
+        Map<String, List<Mutation>> tableToMutationsMap = new HashMap<>();
+
+        // Track the total number of processed records from input log file
+        long totalProcessed = 0;
+
+        // Track the current batch size as records will be processed in batch 
size of
+        // {@link REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE}
+        long currentBatchSize = 0;
+
+        LogFileReader logFileReader = null;
+
+        try {
+            // Create the LogFileReader for given path
+            logFileReader = createLogFileReader(fs, filePath);
+
+            for (LogFile.Record record : logFileReader) {
+
+                final String tableName = record.getHBaseTableName();
+                final Mutation mutation = record.getMutation();
+
+                tableToMutationsMap.computeIfAbsent(tableName, k -> new 
ArrayList<>())
+                        .add(mutation);
+                currentBatchSize++;
+
+                // Process when we reach the batch size and reset the batch 
size and
+                // table to mutations map
+                if (currentBatchSize >= getBatchSize()) {
+                    processReplicationLogBatch(tableToMutationsMap);
+                    totalProcessed += currentBatchSize;
+                    tableToMutationsMap.clear();
+                    currentBatchSize = 0;
+                }
+            }
+
+            // Process any remaining mutations
+            if (currentBatchSize > 0) {
+                processReplicationLogBatch(tableToMutationsMap);
+                totalProcessed += currentBatchSize;
+            }
+
+            LOG.info("Completed processing log file {}. Total mutations 
processed: {}",
+                    logFileReader.getContext().getFilePath(), totalProcessed);
+
+        } catch (Exception e) {
+            LOG.error("Error while processing replication log file", e);
+            throw new IOException("Failed to process log file " + filePath, e);
+        } finally {
+            closeReader(logFileReader);
+        }
+    }
+
+    protected LogFileReader createLogFileReader(FileSystem fs, Path filePath) 
throws IOException {
+        // Ensure that file exists. If we face exception while checking the 
path itself,
+        // method would throw same exception back to the caller
+        if (!fs.exists(filePath)) {
+            throw new IOException("Log file does not exist: " + filePath);
+        }
+        LogFileReader logFileReader = new LogFileReader();
+        try {
+            LogFileReaderContext logFileReaderContext = new 
LogFileReaderContext(conf)
+                    .setFileSystem(fs).setFilePath(filePath);
+            logFileReader.init(logFileReaderContext);
+        } catch (IOException exception) {
+            LOG.error("Failed to initialize new LogFileReader for path {}",
+                    filePath, exception);
+            throw exception;
+        }
+        return logFileReader;
+    }
+
+    /**
+     * Closes the given writer, logging any errors that occur during close.
+     */
+    protected  void closeReader(LogFileReader logFileReader) {
+        if (logFileReader == null) {
+            return;
+        }
+        try {
+            logFileReader.close();
+        } catch (IOException exception) {
+            LOG.error("Error while closing LogFileReader: {}",
+                    logFileReader, exception);
+        }
+    }
+
+    protected void processReplicationLogBatch(
+            Map<String, List<Mutation>> tableMutationMap) throws IOException {
+
+        if (tableMutationMap == null || tableMutationMap.isEmpty()) {
+            return;
+        }
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (Map.Entry<String, List<Mutation>> entry : 
tableMutationMap.entrySet()) {
+            String tableName = entry.getKey();
+            List<Mutation> mutations = entry.getValue();
+            AsyncTable<?> table = getAsyncConnection()
+                    .getTable(TableName.valueOf(tableName), executorService);

Review Comment:
   How about `Map<TableName, List<Mutation>>` instead?



##########
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessorTest.class);
+
+    private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s 
(ID VARCHAR PRIMARY KEY, " +
+            "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)";
+
+    private static final String UPSERT_SQL_STATEMENT = "upsert into %s values 
('%s', '%s', '%s', %s)";
+
+    private static final String PRINCIPAL = "replicationLogProcessor";
+
+    @ClassRule
+    public static TemporaryFolder testFolder = new TemporaryFolder();
+
+    private static Configuration conf;
+    private static FileSystem localFs;
+    private static ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupBeforeClass() throws Exception {
+        conf = getUtility().getConfiguration();
+        localFs = FileSystem.getLocal(conf);
+        executorService = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        if(executorService != null) {
+            executorService.shutdown();
+        }
+    }
+
+    /**
+     * Tests successful creation of LogFileReader with a properly formatted 
log file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithValidLogFile() throws IOException {
+        // Test with valid log file
+        Path validFilePath = new 
Path(testFolder.newFile("valid_log_file").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(validFilePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Verify file exists and has content
+        assertTrue("Valid log file should exist", 
localFs.exists(validFilePath));
+        assertTrue("Valid log file should have content", 
localFs.getFileStatus(validFilePath).getLen() > 0);
+
+        // Test createLogFileReader with valid file - should succeed
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        LogFileReader reader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+
+        // Verify reader is created successfully
+        assertNotNull("Reader should not be null for valid file", reader);
+        assertNotNull("Reader context should not be null", 
reader.getContext());
+        assertEquals("File path should match", validFilePath, 
reader.getContext().getFilePath());
+        assertEquals("File system should match", localFs, 
reader.getContext().getFileSystem());
+
+        // Verify we can read from the reader
+        assertTrue("Reader should have records", reader.iterator().hasNext());
+
+        // Clean up
+        reader.close();
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with a 
non-existent file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithNonExistentFile() {
+        Path nonExistentPath = new Path(testFolder.toString(), 
"non_existent_file");
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
nonExistentPath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    e.getMessage().contains("Log file does not exist: " + 
nonExistentPath));
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with an 
invalid/corrupted file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithInvalidLogFile() throws IOException 
{
+        Path invalidFilePath = new 
Path(testFolder.newFile("invalid_file").toURI());
+        localFs.create(invalidFilePath).close(); // Create empty file
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
invalidFilePath);
+            fail("Should throw IOException for invalid file");
+        } catch (IOException e) {
+            // Should throw some kind of IOException when trying to read header
+            assertTrue("Should throw IOException", true);
+        } finally {
+            // Delete the invalid file
+            localFs.delete(invalidFilePath);
+        }
+    }
+
+    /**
+     * Tests the closeReader method with both null and valid LogFileReader 
instances.
+     */
+    @Test
+    public void testCloseReader() throws IOException {
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        replicationLogProcessor.closeReader(null);
+        Path filePath = new 
Path(testFolder.newFile("testCloseReader").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(filePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Test with valid reader
+        LogFileReader reader = Mockito.spy(new LogFileReader());
+
+        reader.init(new LogFileReaderContext(conf)
+                .setFileSystem(localFs)
+                .setFilePath(filePath));
+
+        replicationLogProcessor.closeReader(reader);
+
+        // Ensure reader's close method is called only once
+        Mockito.verify(reader, Mockito.times(1)).close();
+    }
+
+    /**
+     * Tests processing an empty mutation map - should complete without errors.
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithEmptyMap() {
+        Map<String, List<Mutation>> emptyMap = new HashMap<>();
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Process empty batch - should not throw any exceptions and should 
return immediately
+        try {
+            replicationLogProcessor.processReplicationLogBatch(emptyMap);
+            // If we reach here, the empty map was processed successfully
+            assertTrue("Processing empty map should complete without errors", 
true);
+        } catch (Exception e) {
+            fail("Processing empty map should not throw exception: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Tests exception handling when attempting to process mutations for 
non-existent tables.
+     */
+    @Test
+    public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() {
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5);
+        tableMutationsMap.put("NON_EXISTENT_TABLE", 
Collections.singletonList(mutation));
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        try {
+            
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+            fail("Should throw TableNotFoundException for non-existent table");
+        } catch (IOException exception) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    exception.getMessage().contains("TableNotFoundException"));
+        }
+    }
+
+    /**
+     * Tests behavior when HBase operations fail (simulated by disabling 
table).
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithHBaseFailure() throws 
Exception {

Review Comment:
   It would also be useful to have a test case where only part of a batch 
fails. This means making a table with multiple regions, building a set of 
mutations that would be applied to all regions, and taking one of those regions 
offline.



##########
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessorTest.class);
+
+    private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s 
(ID VARCHAR PRIMARY KEY, " +
+            "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)";
+
+    private static final String UPSERT_SQL_STATEMENT = "upsert into %s values 
('%s', '%s', '%s', %s)";
+
+    private static final String PRINCIPAL = "replicationLogProcessor";
+
+    @ClassRule
+    public static TemporaryFolder testFolder = new TemporaryFolder();
+
+    private static Configuration conf;
+    private static FileSystem localFs;
+    private static ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupBeforeClass() throws Exception {
+        conf = getUtility().getConfiguration();
+        localFs = FileSystem.getLocal(conf);
+        executorService = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        if(executorService != null) {
+            executorService.shutdown();
+        }
+    }
+
+    /**
+     * Tests successful creation of LogFileReader with a properly formatted 
log file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithValidLogFile() throws IOException {
+        // Test with valid log file
+        Path validFilePath = new 
Path(testFolder.newFile("valid_log_file").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(validFilePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Verify file exists and has content
+        assertTrue("Valid log file should exist", 
localFs.exists(validFilePath));
+        assertTrue("Valid log file should have content", 
localFs.getFileStatus(validFilePath).getLen() > 0);
+
+        // Test createLogFileReader with valid file - should succeed
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        LogFileReader reader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+
+        // Verify reader is created successfully
+        assertNotNull("Reader should not be null for valid file", reader);
+        assertNotNull("Reader context should not be null", 
reader.getContext());
+        assertEquals("File path should match", validFilePath, 
reader.getContext().getFilePath());
+        assertEquals("File system should match", localFs, 
reader.getContext().getFileSystem());
+
+        // Verify we can read from the reader
+        assertTrue("Reader should have records", reader.iterator().hasNext());
+
+        // Clean up
+        reader.close();
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with a 
non-existent file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithNonExistentFile() {
+        Path nonExistentPath = new Path(testFolder.toString(), 
"non_existent_file");
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
nonExistentPath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    e.getMessage().contains("Log file does not exist: " + 
nonExistentPath));
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with an 
invalid/corrupted file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithInvalidLogFile() throws IOException 
{
+        Path invalidFilePath = new 
Path(testFolder.newFile("invalid_file").toURI());
+        localFs.create(invalidFilePath).close(); // Create empty file
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
invalidFilePath);
+            fail("Should throw IOException for invalid file");
+        } catch (IOException e) {
+            // Should throw some kind of IOException when trying to read header
+            assertTrue("Should throw IOException", true);
+        } finally {
+            // Delete the invalid file
+            localFs.delete(invalidFilePath);
+        }
+    }
+
+    /**
+     * Tests the closeReader method with both null and valid LogFileReader 
instances.
+     */
+    @Test
+    public void testCloseReader() throws IOException {
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        replicationLogProcessor.closeReader(null);
+        Path filePath = new 
Path(testFolder.newFile("testCloseReader").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(filePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Test with valid reader
+        LogFileReader reader = Mockito.spy(new LogFileReader());
+
+        reader.init(new LogFileReaderContext(conf)
+                .setFileSystem(localFs)
+                .setFilePath(filePath));
+
+        replicationLogProcessor.closeReader(reader);
+
+        // Ensure reader's close method is called only once
+        Mockito.verify(reader, Mockito.times(1)).close();
+    }
+
+    /**
+     * Tests processing an empty mutation map - should complete without errors.
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithEmptyMap() {
+        Map<String, List<Mutation>> emptyMap = new HashMap<>();
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Process empty batch - should not throw any exceptions and should 
return immediately
+        try {
+            replicationLogProcessor.processReplicationLogBatch(emptyMap);
+            // If we reach here, the empty map was processed successfully
+            assertTrue("Processing empty map should complete without errors", 
true);
+        } catch (Exception e) {
+            fail("Processing empty map should not throw exception: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Tests exception handling when attempting to process mutations for 
non-existent tables.
+     */
+    @Test
+    public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() {
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5);
+        tableMutationsMap.put("NON_EXISTENT_TABLE", 
Collections.singletonList(mutation));
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        try {
+            
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+            fail("Should throw TableNotFoundException for non-existent table");
+        } catch (IOException exception) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    exception.getMessage().contains("TableNotFoundException"));
+        }
+    }
+
+    /**
+     * Tests behavior when HBase operations fail (simulated by disabling 
table).
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithHBaseFailure() throws 
Exception {

Review Comment:
   Right now we'd have an equivalent result... a thrown IOException. However as 
the implementation evolves we might have a more granular response. Resubmission 
of only the mutations involved with failed futures, etc. Do you want to try to 
tackle this now, even? It's much more likely a regionserver is down than a 
table is disabled, so batches are likely to only partially fail. 



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessor.class);
+
+    /**
+     * The maximum count of mutations to process in single batch while reading 
replication log file
+     */
+    public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE =
+            "phoenix.replication.log.standby.replay.batch.size";
+
+    /**
+     * The default batch size for reading the replication log file.
+     * Assuming each log record to be 10 KB (un-compressed) and allow at max 
64 MB of
+     * in-memory records to be processed
+     */
+    public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE 
= 6400;
+
+    /**
+     * The maximum number of retries for HBase client operations while 
applying the mutations
+     */
+    public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT =
+            "phoenix.replication.standby.hbase.client.retries.number";
+
+    /**
+     * The default number of retries for HBase client operations while 
applying the mutations.
+     */
+    public static final int 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4;
+
+    /**
+     * The timeout for HBase client operations while applying the mutations.
+     */
+    public static final String 
REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS =
+            "phoenix.replication.standby.hbase.client.operations.timeout";
+
+    /**
+     * The default timeout for HBase client operations while applying the 
mutations.
+     */
+    public static final int 
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = 10000;
+
+    private final Configuration conf;
+
+    private final ExecutorService executorService;
+
+    /**
+     * This {@link AsyncConnection} is used for handling mutations
+     */
+    private volatile AsyncConnection asyncConnection;
+
+    private final Object asyncConnectionLock = new Object();
+
+    private final int batchSize;
+
+    /**
+     * Creates a new ReplicationLogProcessor with the given configuration and 
executor service.
+     * @param conf The configuration to use
+     * @param executorService The executor service for processing mutations
+     * @throws IOException if initialization fails
+     */
+    public ReplicationLogProcessor(final Configuration conf,
+            final ExecutorService executorService) {
+        // Create a copy of configuration as some of the properties would be
+        // overridden
+        this.conf = HBaseConfiguration.create(conf);
+        this.executorService = executorService;
+        this.batchSize = 
this.conf.getInt(REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
+                DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE);
+        decorateConf();
+    }
+
+    /**
+     * Decorate the Configuration object to make replication more receptive to 
delays by
+     * reducing the timeout and number of retries.
+     */
+    private void decorateConf() {
+        this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+                
this.conf.getInt(REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT,
+                        
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT));
+        this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+                
this.conf.getInt(REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS,
+                        
DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS));
+    }
+
+    public void processLogFile(FileSystem fs, Path filePath) throws 
IOException {
+
+        // Map from Table Name to List of Mutations
+        Map<String, List<Mutation>> tableToMutationsMap = new HashMap<>();
+
+        // Track the total number of processed records from input log file
+        long totalProcessed = 0;
+
+        // Track the current batch size as records will be processed in batch 
size of
+        // {@link REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE}
+        long currentBatchSize = 0;
+
+        LogFileReader logFileReader = null;
+
+        try {
+            // Create the LogFileReader for given path
+            logFileReader = createLogFileReader(fs, filePath);
+
+            for (LogFile.Record record : logFileReader) {
+
+                final String tableName = record.getHBaseTableName();
+                final Mutation mutation = record.getMutation();
+
+                tableToMutationsMap.computeIfAbsent(tableName, k -> new 
ArrayList<>())
+                        .add(mutation);
+                currentBatchSize++;
+
+                // Process when we reach the batch size and reset the batch 
size and
+                // table to mutations map
+                if (currentBatchSize >= getBatchSize()) {
+                    processReplicationLogBatch(tableToMutationsMap);
+                    totalProcessed += currentBatchSize;
+                    tableToMutationsMap.clear();
+                    currentBatchSize = 0;
+                }
+            }
+
+            // Process any remaining mutations
+            if (currentBatchSize > 0) {
+                processReplicationLogBatch(tableToMutationsMap);
+                totalProcessed += currentBatchSize;
+            }
+
+            LOG.info("Completed processing log file {}. Total mutations 
processed: {}",
+                    logFileReader.getContext().getFilePath(), totalProcessed);
+
+        } catch (Exception e) {
+            LOG.error("Error while processing replication log file", e);
+            throw new IOException("Failed to process log file " + filePath, e);
+        } finally {
+            closeReader(logFileReader);
+        }
+    }
+
+    protected LogFileReader createLogFileReader(FileSystem fs, Path filePath) 
throws IOException {
+        // Ensure that file exists. If we face exception while checking the 
path itself,
+        // method would throw same exception back to the caller
+        if (!fs.exists(filePath)) {
+            throw new IOException("Log file does not exist: " + filePath);
+        }
+        LogFileReader logFileReader = new LogFileReader();
+        try {
+            LogFileReaderContext logFileReaderContext = new 
LogFileReaderContext(conf)
+                    .setFileSystem(fs).setFilePath(filePath);
+            logFileReader.init(logFileReaderContext);
+        } catch (IOException exception) {
+            LOG.error("Failed to initialize new LogFileReader for path {}",
+                    filePath, exception);
+            throw exception;
+        }
+        return logFileReader;
+    }
+
+    /**
+     * Closes the given writer, logging any errors that occur during close.
+     */
+    protected  void closeReader(LogFileReader logFileReader) {
+        if (logFileReader == null) {
+            return;
+        }
+        try {
+            logFileReader.close();
+        } catch (IOException exception) {
+            LOG.error("Error while closing LogFileReader: {}",
+                    logFileReader, exception);
+        }
+    }
+
+    protected void processReplicationLogBatch(
+            Map<String, List<Mutation>> tableMutationMap) throws IOException {
+
+        if (tableMutationMap == null || tableMutationMap.isEmpty()) {
+            return;
+        }
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (Map.Entry<String, List<Mutation>> entry : 
tableMutationMap.entrySet()) {
+            String tableName = entry.getKey();
+            List<Mutation> mutations = entry.getValue();
+            AsyncTable<?> table = getAsyncConnection()
+                    .getTable(TableName.valueOf(tableName), executorService);
+            futures.add(table.batchAll(mutations));
+        }
+
+        IOException error = null;
+
+        for (Future<?> future : futures) {
+            try {
+                FutureUtils.get(future);
+            } catch (RetriesExhaustedException e) {

Review Comment:
   Is this the only exception type `get()`might throw here? Should we be 
catching `IOException` instead?



##########
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessorTest.class);
+
+    private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s 
(ID VARCHAR PRIMARY KEY, " +
+            "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)";
+
+    private static final String UPSERT_SQL_STATEMENT = "upsert into %s values 
('%s', '%s', '%s', %s)";
+
+    private static final String PRINCIPAL = "replicationLogProcessor";
+
+    @ClassRule
+    public static TemporaryFolder testFolder = new TemporaryFolder();
+
+    private static Configuration conf;
+    private static FileSystem localFs;
+    private static ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupBeforeClass() throws Exception {
+        conf = getUtility().getConfiguration();
+        localFs = FileSystem.getLocal(conf);
+        executorService = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        if(executorService != null) {
+            executorService.shutdown();
+        }
+    }
+
+    /**
+     * Tests successful creation of LogFileReader with a properly formatted 
log file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithValidLogFile() throws IOException {
+        // Test with valid log file
+        Path validFilePath = new 
Path(testFolder.newFile("valid_log_file").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(validFilePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Verify file exists and has content
+        assertTrue("Valid log file should exist", 
localFs.exists(validFilePath));
+        assertTrue("Valid log file should have content", 
localFs.getFileStatus(validFilePath).getLen() > 0);
+
+        // Test createLogFileReader with valid file - should succeed
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        LogFileReader reader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+
+        // Verify reader is created successfully
+        assertNotNull("Reader should not be null for valid file", reader);
+        assertNotNull("Reader context should not be null", 
reader.getContext());
+        assertEquals("File path should match", validFilePath, 
reader.getContext().getFilePath());
+        assertEquals("File system should match", localFs, 
reader.getContext().getFileSystem());
+
+        // Verify we can read from the reader
+        assertTrue("Reader should have records", reader.iterator().hasNext());
+
+        // Clean up
+        reader.close();
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with a 
non-existent file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithNonExistentFile() {
+        Path nonExistentPath = new Path(testFolder.toString(), 
"non_existent_file");
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
nonExistentPath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    e.getMessage().contains("Log file does not exist: " + 
nonExistentPath));
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with an 
invalid/corrupted file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithInvalidLogFile() throws IOException 
{
+        Path invalidFilePath = new 
Path(testFolder.newFile("invalid_file").toURI());
+        localFs.create(invalidFilePath).close(); // Create empty file
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
invalidFilePath);
+            fail("Should throw IOException for invalid file");
+        } catch (IOException e) {
+            // Should throw some kind of IOException when trying to read header
+            assertTrue("Should throw IOException", true);
+        } finally {
+            // Delete the invalid file
+            localFs.delete(invalidFilePath);
+        }
+    }
+
+    /**
+     * Tests the closeReader method with both null and valid LogFileReader 
instances.
+     */
+    @Test
+    public void testCloseReader() throws IOException {
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        replicationLogProcessor.closeReader(null);
+        Path filePath = new 
Path(testFolder.newFile("testCloseReader").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(filePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Test with valid reader
+        LogFileReader reader = Mockito.spy(new LogFileReader());
+
+        reader.init(new LogFileReaderContext(conf)
+                .setFileSystem(localFs)
+                .setFilePath(filePath));
+
+        replicationLogProcessor.closeReader(reader);
+
+        // Ensure reader's close method is called only once
+        Mockito.verify(reader, Mockito.times(1)).close();
+    }
+
+    /**
+     * Tests processing an empty mutation map - should complete without errors.
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithEmptyMap() {
+        Map<String, List<Mutation>> emptyMap = new HashMap<>();
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Process empty batch - should not throw any exceptions and should 
return immediately
+        try {
+            replicationLogProcessor.processReplicationLogBatch(emptyMap);
+            // If we reach here, the empty map was processed successfully
+            assertTrue("Processing empty map should complete without errors", 
true);
+        } catch (Exception e) {
+            fail("Processing empty map should not throw exception: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Tests exception handling when attempting to process mutations for 
non-existent tables.
+     */
+    @Test
+    public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() {
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5);
+        tableMutationsMap.put("NON_EXISTENT_TABLE", 
Collections.singletonList(mutation));
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        try {
+            
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+            fail("Should throw TableNotFoundException for non-existent table");
+        } catch (IOException exception) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    exception.getMessage().contains("TableNotFoundException"));
+        }
+    }
+
+    /**
+     * Tests behavior when HBase operations fail (simulated by disabling 
table).
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithHBaseFailure() throws 
Exception {
+        final String tableName = "T_" + generateUniqueName();
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Create table first
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
tableName));
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+            // Generate some mutations for the table
+            List<Mutation> mutations = 
generateHBaseMutations(phoenixConnection, 2, tableName, 10L);
+            tableMutationsMap.put(tableName, mutations);
+            TableName hbaseTableName = TableName.valueOf(tableName);
+            try (Admin admin = 
phoenixConnection.getQueryServices().getAdmin()) {
+                // Disable the table to simulate HBase failure
+                admin.disableTable(hbaseTableName);
+                LOG.info("Disabled table {} to simulate HBase failure", 
tableName);
+
+                ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+                // Attempt to process mutations on disabled table - should fail
+                try {
+                    
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+                    fail("Should throw IOException when trying to apply 
mutations to disabled table");
+                } catch (IOException e) {
+                    // Expected behavior - disabled table should cause 
IOException
+                    assertTrue("Should throw IOException for disabled table", 
true);
+                    LOG.info("Expected IOException caught when processing 
mutations on disabled table: " + e.getMessage());
+                }
+            }
+        }
+    }
+
+    /**
+     * Tests end-to-end processing of a valid log file with mutations for 
multiple tables.
+     */
+    @Test
+    public void testProcessLogFileForValidLogFile() throws Exception {
+        final String table1Name = "T_" + generateUniqueName();
+        final String table2Name = "T_" + generateUniqueName();
+        final Path filePath = new 
Path(testFolder.newFile("testProcessLogFileEnd2End").toURI());
+        LogFileWriter writer = initLogFileWriter(filePath);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1Name));
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2Name));
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+            List<Mutation> table1Mutations = 
generateHBaseMutations(phoenixConnection, 2, table1Name, 100L);
+            List<Mutation> table2Mutations = 
generateHBaseMutations(phoenixConnection, 5, table2Name, 101L);
+            table1Mutations.forEach(mutation -> {
+                try {
+                    writer.append(table1Name, mutation.hashCode(), mutation);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            table2Mutations.forEach(mutation -> {
+                try {
+                    writer.append(table2Name, mutation.hashCode(), mutation);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            writer.sync();
+            writer.close();
+
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.processLogFile(localFs, filePath);
+
+            validate(table1Name, table1Mutations);
+            validate(table2Name, table2Mutations);
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to process a non-existent log file.
+     */
+    @Test
+    public void testProcessLogFileWithNonExistentFile() throws Exception {
+        // Create a path to a file that doesn't exist
+        Path nonExistentFilePath = new 
Path(testFolder.getRoot().getAbsolutePath(), "non_existent_log_file.log");
+        // Verify the file doesn't exist
+        assertFalse("Non-existent file should not exist", 
localFs.exists(nonExistentFilePath));
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Attempt to process non-existent file - should throw IOException
+        try {
+            replicationLogProcessor.processLogFile(localFs, 
nonExistentFilePath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            // Expected behavior - non-existent file should cause IOException
+            assertTrue("Should throw IOException for non-existent file", true);
+        }
+    }
+
+    /**
+     * Tests batching logic when processing log files with mutations for 
multiple tables.
+     */
+    @Test
+    public void testProcessLogFileBatchingWithMultipleTables() throws 
Exception {
+        final Path multiTableBatchFilePath = new 
Path(testFolder.newFile("testMultiTableBatch").toURI());
+        final String table1Name = "T1_" + generateUniqueName();
+        final String table2Name = "T2_" + generateUniqueName();
+        final int batchSize = 4;
+        final int recordsPerTable = 3;
+        final int totalRecords = recordsPerTable * 2; // 6 total records
+        final int expectedBatchCalls = (totalRecords + batchSize - 1) / 
batchSize; // 2 calls
+        // Create log file with mutations for multiple tables
+        LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath);
+        // Add mutations alternating between tables using LogFileTestUtil
+        for (int i = 0; i < recordsPerTable; i++) {
+            // Add mutation for table1
+            Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, 
(i * 2) + 1);
+            writer.append(table1Name, (i * 2) + 1, put1);
+            writer.sync();
+            // Add mutation for table2
+            Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, 
(i * 2) + 2);
+            writer.append(table2Name, (i * 2) + 2, put2);
+            writer.sync();
+        }
+        writer.close();
+        // Create processor with custom batch size and spy on it
+        Configuration testConf = new Configuration(conf);
+        
testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
 batchSize);
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(testConf, executorService);
+        // Validate that the batch size is correctly set
+        assertEquals("Batch size should be set correctly", batchSize, 
replicationLogProcessor.getBatchSize());
+        ReplicationLogProcessor spyProcessor = 
Mockito.spy(replicationLogProcessor);
+        // Mock the processReplicationLogBatch method
+        
Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class));
+        // Process the log file
+        spyProcessor.processLogFile(localFs, multiTableBatchFilePath);
+        // Verify processReplicationLogBatch was called the expected number of 
times
+        Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls))
+            .processReplicationLogBatch(Mockito.any(Map.class));
+    }
+
+    /**
+     * Tests processing of empty log files (files with header/trailer but no 
mutation records).
+     */
+    @Test
+    public void testProcessLogFileWithEmptyFile() throws Exception {
+        final Path emptyFilePath = new 
Path(testFolder.newFile("testProcessLogFileEmpty").toURI());
+        LogFileWriter writer = initLogFileWriter(emptyFilePath);
+
+        // Close the writer without adding any records - this creates a valid 
empty log file
+        writer.close();
+
+        // Verify file exists and has some content (header + trailer)
+        assertTrue("Empty log file should exist", 
localFs.exists(emptyFilePath));
+        assertTrue("Empty log file should have header/trailer content", 
localFs.getFileStatus(emptyFilePath).getLen() > 0);
+
+        // Process the empty log file - should not throw any exceptions
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        try {
+            replicationLogProcessor.processLogFile(localFs, emptyFilePath);
+            // If we reach here, the empty file was processed successfully
+            assertTrue("Processing empty log file should complete without 
errors", true);
+        } catch (Exception e) {
+            fail("Processing empty log file should not throw exception: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Tests processing of log files that were not closed, ensuring it's 
successf.

Review Comment:
   Spelling



##########
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessorTest.class);
+
+    private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s 
(ID VARCHAR PRIMARY KEY, " +
+            "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)";
+
+    private static final String UPSERT_SQL_STATEMENT = "upsert into %s values 
('%s', '%s', '%s', %s)";
+
+    private static final String PRINCIPAL = "replicationLogProcessor";
+
+    @ClassRule
+    public static TemporaryFolder testFolder = new TemporaryFolder();
+
+    private static Configuration conf;
+    private static FileSystem localFs;
+    private static ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupBeforeClass() throws Exception {
+        conf = getUtility().getConfiguration();
+        localFs = FileSystem.getLocal(conf);
+        executorService = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        if(executorService != null) {
+            executorService.shutdown();
+        }
+    }
+
+    /**
+     * Tests successful creation of LogFileReader with a properly formatted 
log file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithValidLogFile() throws IOException {
+        // Test with valid log file
+        Path validFilePath = new 
Path(testFolder.newFile("valid_log_file").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(validFilePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Verify file exists and has content
+        assertTrue("Valid log file should exist", 
localFs.exists(validFilePath));
+        assertTrue("Valid log file should have content", 
localFs.getFileStatus(validFilePath).getLen() > 0);
+
+        // Test createLogFileReader with valid file - should succeed
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        LogFileReader reader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+
+        // Verify reader is created successfully
+        assertNotNull("Reader should not be null for valid file", reader);
+        assertNotNull("Reader context should not be null", 
reader.getContext());
+        assertEquals("File path should match", validFilePath, 
reader.getContext().getFilePath());
+        assertEquals("File system should match", localFs, 
reader.getContext().getFileSystem());
+
+        // Verify we can read from the reader
+        assertTrue("Reader should have records", reader.iterator().hasNext());
+
+        // Clean up
+        reader.close();
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with a 
non-existent file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithNonExistentFile() {
+        Path nonExistentPath = new Path(testFolder.toString(), 
"non_existent_file");
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
nonExistentPath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    e.getMessage().contains("Log file does not exist: " + 
nonExistentPath));
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with an 
invalid/corrupted file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithInvalidLogFile() throws IOException 
{
+        Path invalidFilePath = new 
Path(testFolder.newFile("invalid_file").toURI());
+        localFs.create(invalidFilePath).close(); // Create empty file
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
invalidFilePath);
+            fail("Should throw IOException for invalid file");
+        } catch (IOException e) {
+            // Should throw some kind of IOException when trying to read header
+            assertTrue("Should throw IOException", true);
+        } finally {
+            // Delete the invalid file
+            localFs.delete(invalidFilePath);
+        }
+    }
+
+    /**
+     * Tests the closeReader method with both null and valid LogFileReader 
instances.
+     */
+    @Test
+    public void testCloseReader() throws IOException {
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        replicationLogProcessor.closeReader(null);
+        Path filePath = new 
Path(testFolder.newFile("testCloseReader").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(filePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Test with valid reader
+        LogFileReader reader = Mockito.spy(new LogFileReader());
+
+        reader.init(new LogFileReaderContext(conf)
+                .setFileSystem(localFs)
+                .setFilePath(filePath));
+
+        replicationLogProcessor.closeReader(reader);
+
+        // Ensure reader's close method is called only once
+        Mockito.verify(reader, Mockito.times(1)).close();
+    }
+
+    /**
+     * Tests processing an empty mutation map - should complete without errors.
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithEmptyMap() {
+        Map<String, List<Mutation>> emptyMap = new HashMap<>();
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Process empty batch - should not throw any exceptions and should 
return immediately
+        try {
+            replicationLogProcessor.processReplicationLogBatch(emptyMap);
+            // If we reach here, the empty map was processed successfully
+            assertTrue("Processing empty map should complete without errors", 
true);
+        } catch (Exception e) {
+            fail("Processing empty map should not throw exception: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Tests exception handling when attempting to process mutations for 
non-existent tables.
+     */
+    @Test
+    public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() {
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5);
+        tableMutationsMap.put("NON_EXISTENT_TABLE", 
Collections.singletonList(mutation));
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        try {
+            
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+            fail("Should throw TableNotFoundException for non-existent table");
+        } catch (IOException exception) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    exception.getMessage().contains("TableNotFoundException"));
+        }
+    }
+
+    /**
+     * Tests behavior when HBase operations fail (simulated by disabling 
table).
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithHBaseFailure() throws 
Exception {
+        final String tableName = "T_" + generateUniqueName();
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Create table first
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
tableName));
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+            // Generate some mutations for the table
+            List<Mutation> mutations = 
generateHBaseMutations(phoenixConnection, 2, tableName, 10L);
+            tableMutationsMap.put(tableName, mutations);
+            TableName hbaseTableName = TableName.valueOf(tableName);
+            try (Admin admin = 
phoenixConnection.getQueryServices().getAdmin()) {
+                // Disable the table to simulate HBase failure
+                admin.disableTable(hbaseTableName);
+                LOG.info("Disabled table {} to simulate HBase failure", 
tableName);
+
+                ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+                // Attempt to process mutations on disabled table - should fail
+                try {
+                    
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+                    fail("Should throw IOException when trying to apply 
mutations to disabled table");
+                } catch (IOException e) {
+                    // Expected behavior - disabled table should cause 
IOException
+                    assertTrue("Should throw IOException for disabled table", 
true);
+                    LOG.info("Expected IOException caught when processing 
mutations on disabled table: " + e.getMessage());
+                }
+            }
+        }
+    }
+
+    /**
+     * Tests end-to-end processing of a valid log file with mutations for 
multiple tables.
+     */
+    @Test
+    public void testProcessLogFileForValidLogFile() throws Exception {
+        final String table1Name = "T_" + generateUniqueName();
+        final String table2Name = "T_" + generateUniqueName();
+        final Path filePath = new 
Path(testFolder.newFile("testProcessLogFileEnd2End").toURI());
+        LogFileWriter writer = initLogFileWriter(filePath);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1Name));
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2Name));
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+            List<Mutation> table1Mutations = 
generateHBaseMutations(phoenixConnection, 2, table1Name, 100L);
+            List<Mutation> table2Mutations = 
generateHBaseMutations(phoenixConnection, 5, table2Name, 101L);
+            table1Mutations.forEach(mutation -> {
+                try {
+                    writer.append(table1Name, mutation.hashCode(), mutation);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            table2Mutations.forEach(mutation -> {
+                try {
+                    writer.append(table2Name, mutation.hashCode(), mutation);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            writer.sync();
+            writer.close();
+
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.processLogFile(localFs, filePath);
+
+            validate(table1Name, table1Mutations);
+            validate(table2Name, table2Mutations);
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to process a non-existent log file.
+     */
+    @Test
+    public void testProcessLogFileWithNonExistentFile() throws Exception {
+        // Create a path to a file that doesn't exist
+        Path nonExistentFilePath = new 
Path(testFolder.getRoot().getAbsolutePath(), "non_existent_log_file.log");
+        // Verify the file doesn't exist
+        assertFalse("Non-existent file should not exist", 
localFs.exists(nonExistentFilePath));
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Attempt to process non-existent file - should throw IOException
+        try {
+            replicationLogProcessor.processLogFile(localFs, 
nonExistentFilePath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            // Expected behavior - non-existent file should cause IOException
+            assertTrue("Should throw IOException for non-existent file", true);
+        }
+    }
+
+    /**
+     * Tests batching logic when processing log files with mutations for 
multiple tables.
+     */
+    @Test
+    public void testProcessLogFileBatchingWithMultipleTables() throws 
Exception {
+        final Path multiTableBatchFilePath = new 
Path(testFolder.newFile("testMultiTableBatch").toURI());
+        final String table1Name = "T1_" + generateUniqueName();
+        final String table2Name = "T2_" + generateUniqueName();
+        final int batchSize = 4;
+        final int recordsPerTable = 3;
+        final int totalRecords = recordsPerTable * 2; // 6 total records
+        final int expectedBatchCalls = (totalRecords + batchSize - 1) / 
batchSize; // 2 calls
+        // Create log file with mutations for multiple tables
+        LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath);
+        // Add mutations alternating between tables using LogFileTestUtil
+        for (int i = 0; i < recordsPerTable; i++) {
+            // Add mutation for table1
+            Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, 
(i * 2) + 1);
+            writer.append(table1Name, (i * 2) + 1, put1);
+            writer.sync();
+            // Add mutation for table2
+            Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, 
(i * 2) + 2);
+            writer.append(table2Name, (i * 2) + 2, put2);
+            writer.sync();
+        }
+        writer.close();
+        // Create processor with custom batch size and spy on it
+        Configuration testConf = new Configuration(conf);
+        
testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
 batchSize);
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(testConf, executorService);
+        // Validate that the batch size is correctly set
+        assertEquals("Batch size should be set correctly", batchSize, 
replicationLogProcessor.getBatchSize());
+        ReplicationLogProcessor spyProcessor = 
Mockito.spy(replicationLogProcessor);
+        // Mock the processReplicationLogBatch method
+        
Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class));
+        // Process the log file
+        spyProcessor.processLogFile(localFs, multiTableBatchFilePath);
+        // Verify processReplicationLogBatch was called the expected number of 
times
+        Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls))
+            .processReplicationLogBatch(Mockito.any(Map.class));
+    }
+
+    /**
+     * Tests processing of empty log files (files with header/trailer but no 
mutation records).
+     */
+    @Test
+    public void testProcessLogFileWithEmptyFile() throws Exception {
+        final Path emptyFilePath = new 
Path(testFolder.newFile("testProcessLogFileEmpty").toURI());
+        LogFileWriter writer = initLogFileWriter(emptyFilePath);
+
+        // Close the writer without adding any records - this creates a valid 
empty log file
+        writer.close();
+
+        // Verify file exists and has some content (header + trailer)
+        assertTrue("Empty log file should exist", 
localFs.exists(emptyFilePath));
+        assertTrue("Empty log file should have header/trailer content", 
localFs.getFileStatus(emptyFilePath).getLen() > 0);
+
+        // Process the empty log file - should not throw any exceptions
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        try {
+            replicationLogProcessor.processLogFile(localFs, emptyFilePath);
+            // If we reach here, the empty file was processed successfully
+            assertTrue("Processing empty log file should complete without 
errors", true);
+        } catch (Exception e) {
+            fail("Processing empty log file should not throw exception: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Tests processing of log files that were not closed, ensuring it's 
successf.
+     */
+    @Test
+    public void testProcessLogFileForUnClosedFile() throws Exception {
+        final Path emptyFilePath = new 
Path(testFolder.newFile("testProcessLogFileForUnClosedFile").toURI());
+        LogFileWriter writer = initLogFileWriter(emptyFilePath);
+
+        // Add one mutation
+        Mutation put = LogFileTestUtil.newPut("row1", 3L, 4);
+        writer.append("table", 1, put);
+        writer.sync();
+
+        // Process the file without closing - should not throw any exceptions
+        ReplicationLogProcessor spyProcessor = Mockito.spy(new 
ReplicationLogProcessor(conf, executorService));
+        
Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class));
+
+        spyProcessor.processLogFile(localFs, emptyFilePath);
+
+        // Verify processReplicationLogBatch was called the expected number of 
times
+        Mockito.verify(spyProcessor, Mockito.times(1))
+                .processReplicationLogBatch(Mockito.any(Map.class));
+    }

Review Comment:
   Can also verify here that the expected mutation was seen, `put` with 
sequence `3L`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to