apurtell commented on code in PR #2188: URL: https://github.com/apache/phoenix/pull/2188#discussion_r2146270282
########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.phoenix.replication.log.LogFile; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessor.class); + + /** + * The maximum count of mutations to process in single batch while reading replication log file + */ + public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = + "phoenix.replication.log.standby.replay.batch.size"; + + /** + * The default batch size for reading the replication log file. + * Assuming each log record to be 10 KB (un-compressed) and allow at max 64 MB of + * in-memory records to be processed + */ + public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = 6400; + + /** + * The maximum number of retries for HBase client operations while applying the mutations + */ + public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = + "phoenix.replication.standby.hbase.client.retries.number"; + + /** + * The default number of retries for HBase client operations while applying the mutations. + */ + public static final int DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4; Review Comment: Why 4? Why not 3? or 5? Does this align with other retry defaults, like in the code we added on the writer side? (It does not, fwiw..) These are just starting points for users, sure, but should work well without being changed. The default number of retries and the default timeouts work together to establish a time budget for transient issues. After this budget is exhausted the issues are raised by exception to higher layers, which may in some cases abort the server. Let's think them through. ########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.phoenix.replication.log.LogFile; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessor.class); + + /** + * The maximum count of mutations to process in single batch while reading replication log file + */ + public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = + "phoenix.replication.log.standby.replay.batch.size"; + + /** + * The default batch size for reading the replication log file. + * Assuming each log record to be 10 KB (un-compressed) and allow at max 64 MB of + * in-memory records to be processed + */ + public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = 6400; + + /** + * The maximum number of retries for HBase client operations while applying the mutations + */ + public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = + "phoenix.replication.standby.hbase.client.retries.number"; + + /** + * The default number of retries for HBase client operations while applying the mutations. + */ + public static final int DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4; + + /** + * The timeout for HBase client operations while applying the mutations. + */ + public static final String REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = + "phoenix.replication.standby.hbase.client.operations.timeout"; + + /** + * The default timeout for HBase client operations while applying the mutations. + */ + public static final int DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = 10000; Review Comment: So the total time we'd wait before throwing an exception up to the higher layers if consistently timing out is 50 seconds (10s + (4 retries * 10s))? Why 50? Should it be 60? or 30? or 120? or 300? We are applying edits on the standby, we can certainly be more tolerant than on the write site, where timeouts/retries need to be short to fail fast. We don't need to fail fast in the reader. On the other hand we don't want to wait so long as to complicate failover or miss a failover SLA. ########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.phoenix.replication.log.LogFile; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessor.class); + + /** + * The maximum count of mutations to process in single batch while reading replication log file + */ + public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = + "phoenix.replication.log.standby.replay.batch.size"; + + /** + * The default batch size for reading the replication log file. + * Assuming each log record to be 10 KB (un-compressed) and allow at max 64 MB of + * in-memory records to be processed + */ + public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = 6400; + + /** + * The maximum number of retries for HBase client operations while applying the mutations + */ + public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = + "phoenix.replication.standby.hbase.client.retries.number"; + + /** + * The default number of retries for HBase client operations while applying the mutations. + */ + public static final int DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4; + + /** + * The timeout for HBase client operations while applying the mutations. + */ + public static final String REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = + "phoenix.replication.standby.hbase.client.operations.timeout"; + + /** + * The default timeout for HBase client operations while applying the mutations. + */ + public static final int DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = 10000; + + private final Configuration conf; + + private final ExecutorService executorService; + + /** + * This {@link AsyncConnection} is used for handling mutations + */ + private volatile AsyncConnection asyncConnection; + + private final Object asyncConnectionLock = new Object(); Review Comment: Do you need this? Taking a lock on this to lazily initialize the `asyncConnection` is equivalent to `synchronized (this)` so just use that? ########## phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java: ########## @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.apache.phoenix.replication.log.LogFileTestUtil; +import org.apache.phoenix.replication.log.LogFileWriter; +import org.apache.phoenix.replication.log.LogFileWriterContext; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.*; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessorTest.class); + + private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s (ID VARCHAR PRIMARY KEY, " + + "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)"; + + private static final String UPSERT_SQL_STATEMENT = "upsert into %s values ('%s', '%s', '%s', %s)"; + + private static final String PRINCIPAL = "replicationLogProcessor"; + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private static Configuration conf; + private static FileSystem localFs; + private static ExecutorService executorService; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf = getUtility().getConfiguration(); + localFs = FileSystem.getLocal(conf); + executorService = Executors.newSingleThreadExecutor(); + } + + @AfterClass + public static void tearDownAfterClass() { + if(executorService != null) { + executorService.shutdown(); + } + } + + /** + * Tests successful creation of LogFileReader with a properly formatted log file. + */ + @Test + public void testCreateLogFileReaderWithValidLogFile() throws IOException { + // Test with valid log file + Path validFilePath = new Path(testFolder.newFile("valid_log_file").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(validFilePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Verify file exists and has content + assertTrue("Valid log file should exist", localFs.exists(validFilePath)); + assertTrue("Valid log file should have content", localFs.getFileStatus(validFilePath).getLen() > 0); + + // Test createLogFileReader with valid file - should succeed + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + LogFileReader reader = replicationLogProcessor.createLogFileReader(localFs, validFilePath); + + // Verify reader is created successfully + assertNotNull("Reader should not be null for valid file", reader); + assertNotNull("Reader context should not be null", reader.getContext()); + assertEquals("File path should match", validFilePath, reader.getContext().getFilePath()); + assertEquals("File system should match", localFs, reader.getContext().getFileSystem()); + + // Verify we can read from the reader + assertTrue("Reader should have records", reader.iterator().hasNext()); + + // Clean up + reader.close(); + } + + /** + * Tests error handling when attempting to create LogFileReader with a non-existent file. + */ + @Test + public void testCreateLogFileReaderWithNonExistentFile() { + Path nonExistentPath = new Path(testFolder.toString(), "non_existent_file"); + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, nonExistentPath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + assertTrue("Error message should mention file does not exist and file path name", + e.getMessage().contains("Log file does not exist: " + nonExistentPath)); + } + } + + /** + * Tests error handling when attempting to create LogFileReader with an invalid/corrupted file. + */ + @Test + public void testCreateLogFileReaderWithInvalidLogFile() throws IOException { + Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI()); + localFs.create(invalidFilePath).close(); // Create empty file + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); + fail("Should throw IOException for invalid file"); + } catch (IOException e) { + // Should throw some kind of IOException when trying to read header + assertTrue("Should throw IOException", true); + } finally { + // Delete the invalid file + localFs.delete(invalidFilePath); + } + } + + /** + * Tests the closeReader method with both null and valid LogFileReader instances. + */ + @Test + public void testCloseReader() throws IOException { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.closeReader(null); + Path filePath = new Path(testFolder.newFile("testCloseReader").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(filePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Test with valid reader + LogFileReader reader = Mockito.spy(new LogFileReader()); + + reader.init(new LogFileReaderContext(conf) + .setFileSystem(localFs) + .setFilePath(filePath)); + + replicationLogProcessor.closeReader(reader); + + // Ensure reader's close method is called only once + Mockito.verify(reader, Mockito.times(1)).close(); + } + + /** + * Tests processing an empty mutation map - should complete without errors. + */ + @Test + public void testProcessReplicationLogBatchWithEmptyMap() { + Map<String, List<Mutation>> emptyMap = new HashMap<>(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Process empty batch - should not throw any exceptions and should return immediately + try { + replicationLogProcessor.processReplicationLogBatch(emptyMap); + // If we reach here, the empty map was processed successfully + assertTrue("Processing empty map should complete without errors", true); + } catch (Exception e) { + fail("Processing empty map should not throw exception: " + e.getMessage()); + } + } + + /** + * Tests exception handling when attempting to process mutations for non-existent tables. + */ + @Test + public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() { + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5); + tableMutationsMap.put("NON_EXISTENT_TABLE", Collections.singletonList(mutation)); + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw TableNotFoundException for non-existent table"); + } catch (IOException exception) { + assertTrue("Error message should mention file does not exist and file path name", + exception.getMessage().contains("TableNotFoundException")); + } + } + + /** + * Tests behavior when HBase operations fail (simulated by disabling table). + */ + @Test + public void testProcessReplicationLogBatchWithHBaseFailure() throws Exception { + final String tableName = "T_" + generateUniqueName(); + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + // Create table first + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, tableName)); + PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); + // Generate some mutations for the table + List<Mutation> mutations = generateHBaseMutations(phoenixConnection, 2, tableName, 10L); + tableMutationsMap.put(tableName, mutations); + TableName hbaseTableName = TableName.valueOf(tableName); + try (Admin admin = phoenixConnection.getQueryServices().getAdmin()) { + // Disable the table to simulate HBase failure + admin.disableTable(hbaseTableName); + LOG.info("Disabled table {} to simulate HBase failure", tableName); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Attempt to process mutations on disabled table - should fail + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw IOException when trying to apply mutations to disabled table"); + } catch (IOException e) { + // Expected behavior - disabled table should cause IOException + assertTrue("Should throw IOException for disabled table", true); + LOG.info("Expected IOException caught when processing mutations on disabled table: " + e.getMessage()); + } + } + } + } + + /** + * Tests end-to-end processing of a valid log file with mutations for multiple tables. + */ + @Test + public void testProcessLogFileForValidLogFile() throws Exception { + final String table1Name = "T_" + generateUniqueName(); + final String table2Name = "T_" + generateUniqueName(); + final Path filePath = new Path(testFolder.newFile("testProcessLogFileEnd2End").toURI()); + LogFileWriter writer = initLogFileWriter(filePath); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, table1Name)); + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, table2Name)); + PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); + + List<Mutation> table1Mutations = generateHBaseMutations(phoenixConnection, 2, table1Name, 100L); + List<Mutation> table2Mutations = generateHBaseMutations(phoenixConnection, 5, table2Name, 101L); + table1Mutations.forEach(mutation -> { + try { + writer.append(table1Name, mutation.hashCode(), mutation); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + table2Mutations.forEach(mutation -> { + try { + writer.append(table2Name, mutation.hashCode(), mutation); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + writer.sync(); + writer.close(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.processLogFile(localFs, filePath); + + validate(table1Name, table1Mutations); + validate(table2Name, table2Mutations); + } + } + + /** + * Tests error handling when attempting to process a non-existent log file. + */ + @Test + public void testProcessLogFileWithNonExistentFile() throws Exception { + // Create a path to a file that doesn't exist + Path nonExistentFilePath = new Path(testFolder.getRoot().getAbsolutePath(), "non_existent_log_file.log"); + // Verify the file doesn't exist + assertFalse("Non-existent file should not exist", localFs.exists(nonExistentFilePath)); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Attempt to process non-existent file - should throw IOException + try { + replicationLogProcessor.processLogFile(localFs, nonExistentFilePath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + // Expected behavior - non-existent file should cause IOException + assertTrue("Should throw IOException for non-existent file", true); + } + } + + /** + * Tests batching logic when processing log files with mutations for multiple tables. + */ + @Test + public void testProcessLogFileBatchingWithMultipleTables() throws Exception { + final Path multiTableBatchFilePath = new Path(testFolder.newFile("testMultiTableBatch").toURI()); + final String table1Name = "T1_" + generateUniqueName(); + final String table2Name = "T2_" + generateUniqueName(); + final int batchSize = 4; + final int recordsPerTable = 3; + final int totalRecords = recordsPerTable * 2; // 6 total records + final int expectedBatchCalls = (totalRecords + batchSize - 1) / batchSize; // 2 calls + // Create log file with mutations for multiple tables + LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath); + // Add mutations alternating between tables using LogFileTestUtil + for (int i = 0; i < recordsPerTable; i++) { + // Add mutation for table1 + Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, (i * 2) + 1); + writer.append(table1Name, (i * 2) + 1, put1); + writer.sync(); + // Add mutation for table2 + Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, (i * 2) + 2); + writer.append(table2Name, (i * 2) + 2, put2); + writer.sync(); + } + writer.close(); + // Create processor with custom batch size and spy on it + Configuration testConf = new Configuration(conf); + testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE, batchSize); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(testConf, executorService); + // Validate that the batch size is correctly set + assertEquals("Batch size should be set correctly", batchSize, replicationLogProcessor.getBatchSize()); + ReplicationLogProcessor spyProcessor = Mockito.spy(replicationLogProcessor); + // Mock the processReplicationLogBatch method + Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class)); + // Process the log file + spyProcessor.processLogFile(localFs, multiTableBatchFilePath); + // Verify processReplicationLogBatch was called the expected number of times + Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls)) + .processReplicationLogBatch(Mockito.any(Map.class)); Review Comment: You can also use your spy to check if expected mutations were found in each invocation of processReplicationLogBatch, put1 in the first, put2 in the second, as a sanity check. ########## phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java: ########## @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.apache.phoenix.replication.log.LogFileTestUtil; +import org.apache.phoenix.replication.log.LogFileWriter; +import org.apache.phoenix.replication.log.LogFileWriterContext; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.*; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessorTest.class); + + private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s (ID VARCHAR PRIMARY KEY, " + + "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)"; + + private static final String UPSERT_SQL_STATEMENT = "upsert into %s values ('%s', '%s', '%s', %s)"; + + private static final String PRINCIPAL = "replicationLogProcessor"; + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private static Configuration conf; + private static FileSystem localFs; + private static ExecutorService executorService; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf = getUtility().getConfiguration(); + localFs = FileSystem.getLocal(conf); + executorService = Executors.newSingleThreadExecutor(); + } + + @AfterClass + public static void tearDownAfterClass() { + if(executorService != null) { + executorService.shutdown(); + } + } + + /** + * Tests successful creation of LogFileReader with a properly formatted log file. + */ + @Test + public void testCreateLogFileReaderWithValidLogFile() throws IOException { + // Test with valid log file + Path validFilePath = new Path(testFolder.newFile("valid_log_file").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(validFilePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Verify file exists and has content + assertTrue("Valid log file should exist", localFs.exists(validFilePath)); + assertTrue("Valid log file should have content", localFs.getFileStatus(validFilePath).getLen() > 0); + + // Test createLogFileReader with valid file - should succeed + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + LogFileReader reader = replicationLogProcessor.createLogFileReader(localFs, validFilePath); + + // Verify reader is created successfully + assertNotNull("Reader should not be null for valid file", reader); + assertNotNull("Reader context should not be null", reader.getContext()); + assertEquals("File path should match", validFilePath, reader.getContext().getFilePath()); + assertEquals("File system should match", localFs, reader.getContext().getFileSystem()); + + // Verify we can read from the reader + assertTrue("Reader should have records", reader.iterator().hasNext()); + + // Clean up + reader.close(); + } + + /** + * Tests error handling when attempting to create LogFileReader with a non-existent file. + */ + @Test + public void testCreateLogFileReaderWithNonExistentFile() { + Path nonExistentPath = new Path(testFolder.toString(), "non_existent_file"); + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, nonExistentPath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + assertTrue("Error message should mention file does not exist and file path name", + e.getMessage().contains("Log file does not exist: " + nonExistentPath)); + } + } + + /** + * Tests error handling when attempting to create LogFileReader with an invalid/corrupted file. + */ + @Test + public void testCreateLogFileReaderWithInvalidLogFile() throws IOException { + Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI()); + localFs.create(invalidFilePath).close(); // Create empty file + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); + fail("Should throw IOException for invalid file"); + } catch (IOException e) { + // Should throw some kind of IOException when trying to read header + assertTrue("Should throw IOException", true); + } finally { + // Delete the invalid file + localFs.delete(invalidFilePath); + } + } + + /** + * Tests the closeReader method with both null and valid LogFileReader instances. + */ + @Test + public void testCloseReader() throws IOException { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.closeReader(null); + Path filePath = new Path(testFolder.newFile("testCloseReader").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(filePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Test with valid reader + LogFileReader reader = Mockito.spy(new LogFileReader()); + + reader.init(new LogFileReaderContext(conf) + .setFileSystem(localFs) + .setFilePath(filePath)); + + replicationLogProcessor.closeReader(reader); + + // Ensure reader's close method is called only once + Mockito.verify(reader, Mockito.times(1)).close(); + } + + /** + * Tests processing an empty mutation map - should complete without errors. + */ + @Test + public void testProcessReplicationLogBatchWithEmptyMap() { + Map<String, List<Mutation>> emptyMap = new HashMap<>(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Process empty batch - should not throw any exceptions and should return immediately + try { + replicationLogProcessor.processReplicationLogBatch(emptyMap); + // If we reach here, the empty map was processed successfully + assertTrue("Processing empty map should complete without errors", true); + } catch (Exception e) { + fail("Processing empty map should not throw exception: " + e.getMessage()); + } + } + + /** + * Tests exception handling when attempting to process mutations for non-existent tables. + */ + @Test + public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() { + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5); + tableMutationsMap.put("NON_EXISTENT_TABLE", Collections.singletonList(mutation)); + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw TableNotFoundException for non-existent table"); + } catch (IOException exception) { + assertTrue("Error message should mention file does not exist and file path name", + exception.getMessage().contains("TableNotFoundException")); + } + } + + /** + * Tests behavior when HBase operations fail (simulated by disabling table). + */ + @Test + public void testProcessReplicationLogBatchWithHBaseFailure() throws Exception { + final String tableName = "T_" + generateUniqueName(); + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + // Create table first + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, tableName)); + PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); + // Generate some mutations for the table + List<Mutation> mutations = generateHBaseMutations(phoenixConnection, 2, tableName, 10L); + tableMutationsMap.put(tableName, mutations); + TableName hbaseTableName = TableName.valueOf(tableName); + try (Admin admin = phoenixConnection.getQueryServices().getAdmin()) { + // Disable the table to simulate HBase failure + admin.disableTable(hbaseTableName); + LOG.info("Disabled table {} to simulate HBase failure", tableName); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Attempt to process mutations on disabled table - should fail + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw IOException when trying to apply mutations to disabled table"); + } catch (IOException e) { + // Expected behavior - disabled table should cause IOException + assertTrue("Should throw IOException for disabled table", true); + LOG.info("Expected IOException caught when processing mutations on disabled table: " + e.getMessage()); + } + } + } + } + + /** + * Tests end-to-end processing of a valid log file with mutations for multiple tables. + */ + @Test + public void testProcessLogFileForValidLogFile() throws Exception { + final String table1Name = "T_" + generateUniqueName(); + final String table2Name = "T_" + generateUniqueName(); + final Path filePath = new Path(testFolder.newFile("testProcessLogFileEnd2End").toURI()); + LogFileWriter writer = initLogFileWriter(filePath); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, table1Name)); + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, table2Name)); + PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); + + List<Mutation> table1Mutations = generateHBaseMutations(phoenixConnection, 2, table1Name, 100L); + List<Mutation> table2Mutations = generateHBaseMutations(phoenixConnection, 5, table2Name, 101L); + table1Mutations.forEach(mutation -> { + try { + writer.append(table1Name, mutation.hashCode(), mutation); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + table2Mutations.forEach(mutation -> { + try { + writer.append(table2Name, mutation.hashCode(), mutation); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + writer.sync(); + writer.close(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.processLogFile(localFs, filePath); + + validate(table1Name, table1Mutations); + validate(table2Name, table2Mutations); + } + } + + /** + * Tests error handling when attempting to process a non-existent log file. + */ + @Test + public void testProcessLogFileWithNonExistentFile() throws Exception { + // Create a path to a file that doesn't exist + Path nonExistentFilePath = new Path(testFolder.getRoot().getAbsolutePath(), "non_existent_log_file.log"); + // Verify the file doesn't exist + assertFalse("Non-existent file should not exist", localFs.exists(nonExistentFilePath)); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Attempt to process non-existent file - should throw IOException + try { + replicationLogProcessor.processLogFile(localFs, nonExistentFilePath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + // Expected behavior - non-existent file should cause IOException + assertTrue("Should throw IOException for non-existent file", true); + } + } + + /** + * Tests batching logic when processing log files with mutations for multiple tables. + */ + @Test + public void testProcessLogFileBatchingWithMultipleTables() throws Exception { + final Path multiTableBatchFilePath = new Path(testFolder.newFile("testMultiTableBatch").toURI()); + final String table1Name = "T1_" + generateUniqueName(); + final String table2Name = "T2_" + generateUniqueName(); + final int batchSize = 4; + final int recordsPerTable = 3; + final int totalRecords = recordsPerTable * 2; // 6 total records + final int expectedBatchCalls = (totalRecords + batchSize - 1) / batchSize; // 2 calls + // Create log file with mutations for multiple tables + LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath); + // Add mutations alternating between tables using LogFileTestUtil + for (int i = 0; i < recordsPerTable; i++) { + // Add mutation for table1 + Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, (i * 2) + 1); + writer.append(table1Name, (i * 2) + 1, put1); + writer.sync(); + // Add mutation for table2 + Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, (i * 2) + 2); + writer.append(table2Name, (i * 2) + 2, put2); + writer.sync(); + } + writer.close(); + // Create processor with custom batch size and spy on it + Configuration testConf = new Configuration(conf); + testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE, batchSize); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(testConf, executorService); + // Validate that the batch size is correctly set + assertEquals("Batch size should be set correctly", batchSize, replicationLogProcessor.getBatchSize()); + ReplicationLogProcessor spyProcessor = Mockito.spy(replicationLogProcessor); + // Mock the processReplicationLogBatch method + Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class)); + // Process the log file + spyProcessor.processLogFile(localFs, multiTableBatchFilePath); + // Verify processReplicationLogBatch was called the expected number of times + Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls)) + .processReplicationLogBatch(Mockito.any(Map.class)); + } + + /** + * Tests processing of empty log files (files with header/trailer but no mutation records). + */ + @Test + public void testProcessLogFileWithEmptyFile() throws Exception { + final Path emptyFilePath = new Path(testFolder.newFile("testProcessLogFileEmpty").toURI()); + LogFileWriter writer = initLogFileWriter(emptyFilePath); + + // Close the writer without adding any records - this creates a valid empty log file + writer.close(); + + // Verify file exists and has some content (header + trailer) + assertTrue("Empty log file should exist", localFs.exists(emptyFilePath)); + assertTrue("Empty log file should have header/trailer content", localFs.getFileStatus(emptyFilePath).getLen() > 0); + + // Process the empty log file - should not throw any exceptions + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + try { + replicationLogProcessor.processLogFile(localFs, emptyFilePath); + // If we reach here, the empty file was processed successfully + assertTrue("Processing empty log file should complete without errors", true); + } catch (Exception e) { + fail("Processing empty log file should not throw exception: " + e.getMessage()); + } + } + + /** + * Tests processing of log files that were not closed, ensuring it's successf. + */ + @Test + public void testProcessLogFileForUnClosedFile() throws Exception { + final Path emptyFilePath = new Path(testFolder.newFile("testProcessLogFileForUnClosedFile").toURI()); + LogFileWriter writer = initLogFileWriter(emptyFilePath); + + // Add one mutation + Mutation put = LogFileTestUtil.newPut("row1", 3L, 4); + writer.append("table", 1, put); + writer.sync(); + + // Process the file without closing - should not throw any exceptions + ReplicationLogProcessor spyProcessor = Mockito.spy(new ReplicationLogProcessor(conf, executorService)); + Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class)); + + spyProcessor.processLogFile(localFs, emptyFilePath); + + // Verify processReplicationLogBatch was called the expected number of times + Mockito.verify(spyProcessor, Mockito.times(1)) + .processReplicationLogBatch(Mockito.any(Map.class)); + } + + /** + * Tests that configuration parameters are properly read and applied. + */ + @Test + public void testReplicationLogProcessorConfiguration() { + // Test that all default configurations are used when no custom configuration is provided + ReplicationLogProcessor defaultProcessor = new ReplicationLogProcessor(conf, executorService); + + // Validate default batch size + assertEquals("Default batch size should be used", + ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE, + defaultProcessor.getBatchSize()); + + // Validate default HBase client retries count + assertEquals("Default HBase client retries count should be used", + ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT, + defaultProcessor.getHBaseClientRetriesCount()); + + // Validate default HBase client operation timeout + assertEquals("Default HBase client operation timeout should be used", + ReplicationLogProcessor.DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS, + defaultProcessor.getHBaseClientOperationTimeout()); + + // Test that all custom configurations are honored + Configuration customConf = new Configuration(conf); + + // Set custom values for all configuration parameters + int customBatchSize = 1000; + int customRetriesCount = 6; + int customOperationTimeout = 15000; + + customConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE, customBatchSize); + customConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT, customRetriesCount); + customConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS, customOperationTimeout); + + ReplicationLogProcessor customProcessor = new ReplicationLogProcessor(customConf, executorService); + + // Validate all custom configurations are honored + assertEquals("Custom batch size should be honored", + customBatchSize, customProcessor.getBatchSize()); + + assertEquals("Custom HBase client retries count should be honored", + customRetriesCount, customProcessor.getHBaseClientRetriesCount()); + + assertEquals("Custom HBase client operation timeout should be honored", + customOperationTimeout, customProcessor.getHBaseClientOperationTimeout()); + } + + /** + * Tests batching logic with various record counts and batch sizes. + */ + @Test + public void testProcessLogFileBatchingLogic() throws Exception { + // Test multiple batching scenarios to ensure the logic works correctly + + // Test case 1: General case where total records don't align perfectly with batch size + testProcessLogFileBatching(10, 3); + + // Test case 2: Edge case where total records exactly matches batch size + testProcessLogFileBatching(5, 5); + + // Test case 3: Single record with large batch size + testProcessLogFileBatching(1, 10); + + // Test case 4: Multiple full batches + testProcessLogFileBatching(12, 4); + } + + /** + * Helper method to test batching scenarios with different record counts and batch sizes + */ + private void testProcessLogFileBatching(int totalRecords, int batchSize) throws Exception { + final Path batchTestFilePath = new Path(testFolder.newFile("test_" + new Random(1000)).toURI()); + final String tableName = "T_" + generateUniqueName(); + final int expectedBatchCalls = (totalRecords + batchSize - 1) / batchSize; // Ceiling division + + // Create log file with specific number of records + LogFileWriter writer = initLogFileWriter(batchTestFilePath); + + // Add exactly totalRecords mutations to the log file using LogFileTestUtil + for (int i = 0; i < totalRecords; i++) { + Mutation put = LogFileTestUtil.newPut("row" + i, i + 1, i + 1); + writer.append(tableName, i + 1, put); + writer.sync(); + } + writer.close(); + + // Create a configuration with custom batch size + Configuration testConf = new Configuration(conf); + testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE, batchSize); + + // Create processor with custom batch size and spy on it + ReplicationLogProcessor testProcessor = new ReplicationLogProcessor(testConf, executorService); + + // Validate that the batch size is correctly set + assertEquals("Batch size incorrectly set", batchSize, testProcessor.getBatchSize()); + + ReplicationLogProcessor spyProcessor = Mockito.spy(testProcessor); + + // Mock the processReplicationLogBatch method to do nothing but track calls + Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class)); + + // Process the log file + spyProcessor.processLogFile(localFs, batchTestFilePath); + + // Verify processReplicationLogBatch was called the expected number of times + Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls)) + .processReplicationLogBatch(Mockito.any(Map.class)); + } Review Comment: You can also verify that the expected mutations for the batch were found in the batch. ########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.phoenix.replication.log.LogFile; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessor.class); + + /** + * The maximum count of mutations to process in single batch while reading replication log file + */ + public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = + "phoenix.replication.log.standby.replay.batch.size"; + + /** + * The default batch size for reading the replication log file. + * Assuming each log record to be 10 KB (un-compressed) and allow at max 64 MB of + * in-memory records to be processed + */ + public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = 6400; + + /** + * The maximum number of retries for HBase client operations while applying the mutations + */ + public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = + "phoenix.replication.standby.hbase.client.retries.number"; + + /** + * The default number of retries for HBase client operations while applying the mutations. + */ + public static final int DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4; + + /** + * The timeout for HBase client operations while applying the mutations. + */ + public static final String REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = + "phoenix.replication.standby.hbase.client.operations.timeout"; + + /** + * The default timeout for HBase client operations while applying the mutations. + */ + public static final int DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = 10000; + + private final Configuration conf; + + private final ExecutorService executorService; + + /** + * This {@link AsyncConnection} is used for handling mutations + */ + private volatile AsyncConnection asyncConnection; + + private final Object asyncConnectionLock = new Object(); + + private final int batchSize; + + /** + * Creates a new ReplicationLogProcessor with the given configuration and executor service. + * @param conf The configuration to use + * @param executorService The executor service for processing mutations + * @throws IOException if initialization fails + */ + public ReplicationLogProcessor(final Configuration conf, + final ExecutorService executorService) { + // Create a copy of configuration as some of the properties would be + // overridden + this.conf = HBaseConfiguration.create(conf); + this.executorService = executorService; + this.batchSize = this.conf.getInt(REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE, + DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE); + decorateConf(); + } + + /** + * Decorate the Configuration object to make replication more receptive to delays by + * reducing the timeout and number of retries. + */ + private void decorateConf() { + this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + this.conf.getInt(REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT, + DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT)); + this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + this.conf.getInt(REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS, + DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS)); + } + + public void processLogFile(FileSystem fs, Path filePath) throws IOException { + + // Map from Table Name to List of Mutations + Map<String, List<Mutation>> tableToMutationsMap = new HashMap<>(); + + // Track the total number of processed records from input log file + long totalProcessed = 0; + + // Track the current batch size as records will be processed in batch size of + // {@link REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE} + long currentBatchSize = 0; + + LogFileReader logFileReader = null; + + try { + // Create the LogFileReader for given path + logFileReader = createLogFileReader(fs, filePath); + + for (LogFile.Record record : logFileReader) { + + final String tableName = record.getHBaseTableName(); + final Mutation mutation = record.getMutation(); + + tableToMutationsMap.computeIfAbsent(tableName, k -> new ArrayList<>()) + .add(mutation); + currentBatchSize++; + + // Process when we reach the batch size and reset the batch size and + // table to mutations map + if (currentBatchSize >= getBatchSize()) { + processReplicationLogBatch(tableToMutationsMap); + totalProcessed += currentBatchSize; + tableToMutationsMap.clear(); + currentBatchSize = 0; + } + } + + // Process any remaining mutations + if (currentBatchSize > 0) { + processReplicationLogBatch(tableToMutationsMap); + totalProcessed += currentBatchSize; + } + + LOG.info("Completed processing log file {}. Total mutations processed: {}", + logFileReader.getContext().getFilePath(), totalProcessed); + + } catch (Exception e) { + LOG.error("Error while processing replication log file", e); + throw new IOException("Failed to process log file " + filePath, e); + } finally { + closeReader(logFileReader); + } + } + + protected LogFileReader createLogFileReader(FileSystem fs, Path filePath) throws IOException { + // Ensure that file exists. If we face exception while checking the path itself, + // method would throw same exception back to the caller + if (!fs.exists(filePath)) { + throw new IOException("Log file does not exist: " + filePath); + } + LogFileReader logFileReader = new LogFileReader(); + try { + LogFileReaderContext logFileReaderContext = new LogFileReaderContext(conf) + .setFileSystem(fs).setFilePath(filePath); + logFileReader.init(logFileReaderContext); + } catch (IOException exception) { + LOG.error("Failed to initialize new LogFileReader for path {}", + filePath, exception); + throw exception; + } + return logFileReader; + } + + /** + * Closes the given writer, logging any errors that occur during close. + */ + protected void closeReader(LogFileReader logFileReader) { + if (logFileReader == null) { + return; + } + try { + logFileReader.close(); + } catch (IOException exception) { + LOG.error("Error while closing LogFileReader: {}", + logFileReader, exception); + } + } + + protected void processReplicationLogBatch( + Map<String, List<Mutation>> tableMutationMap) throws IOException { + + if (tableMutationMap == null || tableMutationMap.isEmpty()) { + return; + } + + List<Future<?>> futures = new ArrayList<>(); + for (Map.Entry<String, List<Mutation>> entry : tableMutationMap.entrySet()) { + String tableName = entry.getKey(); + List<Mutation> mutations = entry.getValue(); + AsyncTable<?> table = getAsyncConnection() + .getTable(TableName.valueOf(tableName), executorService); Review Comment: How about `Map<TableName, List<Mutation>>` instead? ########## phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java: ########## @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.apache.phoenix.replication.log.LogFileTestUtil; +import org.apache.phoenix.replication.log.LogFileWriter; +import org.apache.phoenix.replication.log.LogFileWriterContext; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.*; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessorTest.class); + + private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s (ID VARCHAR PRIMARY KEY, " + + "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)"; + + private static final String UPSERT_SQL_STATEMENT = "upsert into %s values ('%s', '%s', '%s', %s)"; + + private static final String PRINCIPAL = "replicationLogProcessor"; + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private static Configuration conf; + private static FileSystem localFs; + private static ExecutorService executorService; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf = getUtility().getConfiguration(); + localFs = FileSystem.getLocal(conf); + executorService = Executors.newSingleThreadExecutor(); + } + + @AfterClass + public static void tearDownAfterClass() { + if(executorService != null) { + executorService.shutdown(); + } + } + + /** + * Tests successful creation of LogFileReader with a properly formatted log file. + */ + @Test + public void testCreateLogFileReaderWithValidLogFile() throws IOException { + // Test with valid log file + Path validFilePath = new Path(testFolder.newFile("valid_log_file").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(validFilePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Verify file exists and has content + assertTrue("Valid log file should exist", localFs.exists(validFilePath)); + assertTrue("Valid log file should have content", localFs.getFileStatus(validFilePath).getLen() > 0); + + // Test createLogFileReader with valid file - should succeed + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + LogFileReader reader = replicationLogProcessor.createLogFileReader(localFs, validFilePath); + + // Verify reader is created successfully + assertNotNull("Reader should not be null for valid file", reader); + assertNotNull("Reader context should not be null", reader.getContext()); + assertEquals("File path should match", validFilePath, reader.getContext().getFilePath()); + assertEquals("File system should match", localFs, reader.getContext().getFileSystem()); + + // Verify we can read from the reader + assertTrue("Reader should have records", reader.iterator().hasNext()); + + // Clean up + reader.close(); + } + + /** + * Tests error handling when attempting to create LogFileReader with a non-existent file. + */ + @Test + public void testCreateLogFileReaderWithNonExistentFile() { + Path nonExistentPath = new Path(testFolder.toString(), "non_existent_file"); + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, nonExistentPath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + assertTrue("Error message should mention file does not exist and file path name", + e.getMessage().contains("Log file does not exist: " + nonExistentPath)); + } + } + + /** + * Tests error handling when attempting to create LogFileReader with an invalid/corrupted file. + */ + @Test + public void testCreateLogFileReaderWithInvalidLogFile() throws IOException { + Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI()); + localFs.create(invalidFilePath).close(); // Create empty file + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); + fail("Should throw IOException for invalid file"); + } catch (IOException e) { + // Should throw some kind of IOException when trying to read header + assertTrue("Should throw IOException", true); + } finally { + // Delete the invalid file + localFs.delete(invalidFilePath); + } + } + + /** + * Tests the closeReader method with both null and valid LogFileReader instances. + */ + @Test + public void testCloseReader() throws IOException { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.closeReader(null); + Path filePath = new Path(testFolder.newFile("testCloseReader").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(filePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Test with valid reader + LogFileReader reader = Mockito.spy(new LogFileReader()); + + reader.init(new LogFileReaderContext(conf) + .setFileSystem(localFs) + .setFilePath(filePath)); + + replicationLogProcessor.closeReader(reader); + + // Ensure reader's close method is called only once + Mockito.verify(reader, Mockito.times(1)).close(); + } + + /** + * Tests processing an empty mutation map - should complete without errors. + */ + @Test + public void testProcessReplicationLogBatchWithEmptyMap() { + Map<String, List<Mutation>> emptyMap = new HashMap<>(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Process empty batch - should not throw any exceptions and should return immediately + try { + replicationLogProcessor.processReplicationLogBatch(emptyMap); + // If we reach here, the empty map was processed successfully + assertTrue("Processing empty map should complete without errors", true); + } catch (Exception e) { + fail("Processing empty map should not throw exception: " + e.getMessage()); + } + } + + /** + * Tests exception handling when attempting to process mutations for non-existent tables. + */ + @Test + public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() { + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5); + tableMutationsMap.put("NON_EXISTENT_TABLE", Collections.singletonList(mutation)); + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw TableNotFoundException for non-existent table"); + } catch (IOException exception) { + assertTrue("Error message should mention file does not exist and file path name", + exception.getMessage().contains("TableNotFoundException")); + } + } + + /** + * Tests behavior when HBase operations fail (simulated by disabling table). + */ + @Test + public void testProcessReplicationLogBatchWithHBaseFailure() throws Exception { Review Comment: It would also be useful to have a test case where only part of a batch fails. This means making a table with multiple regions, building a set of mutations that would be applied to all regions, and taking one of those regions offline. ########## phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java: ########## @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.apache.phoenix.replication.log.LogFileTestUtil; +import org.apache.phoenix.replication.log.LogFileWriter; +import org.apache.phoenix.replication.log.LogFileWriterContext; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.*; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessorTest.class); + + private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s (ID VARCHAR PRIMARY KEY, " + + "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)"; + + private static final String UPSERT_SQL_STATEMENT = "upsert into %s values ('%s', '%s', '%s', %s)"; + + private static final String PRINCIPAL = "replicationLogProcessor"; + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private static Configuration conf; + private static FileSystem localFs; + private static ExecutorService executorService; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf = getUtility().getConfiguration(); + localFs = FileSystem.getLocal(conf); + executorService = Executors.newSingleThreadExecutor(); + } + + @AfterClass + public static void tearDownAfterClass() { + if(executorService != null) { + executorService.shutdown(); + } + } + + /** + * Tests successful creation of LogFileReader with a properly formatted log file. + */ + @Test + public void testCreateLogFileReaderWithValidLogFile() throws IOException { + // Test with valid log file + Path validFilePath = new Path(testFolder.newFile("valid_log_file").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(validFilePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Verify file exists and has content + assertTrue("Valid log file should exist", localFs.exists(validFilePath)); + assertTrue("Valid log file should have content", localFs.getFileStatus(validFilePath).getLen() > 0); + + // Test createLogFileReader with valid file - should succeed + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + LogFileReader reader = replicationLogProcessor.createLogFileReader(localFs, validFilePath); + + // Verify reader is created successfully + assertNotNull("Reader should not be null for valid file", reader); + assertNotNull("Reader context should not be null", reader.getContext()); + assertEquals("File path should match", validFilePath, reader.getContext().getFilePath()); + assertEquals("File system should match", localFs, reader.getContext().getFileSystem()); + + // Verify we can read from the reader + assertTrue("Reader should have records", reader.iterator().hasNext()); + + // Clean up + reader.close(); + } + + /** + * Tests error handling when attempting to create LogFileReader with a non-existent file. + */ + @Test + public void testCreateLogFileReaderWithNonExistentFile() { + Path nonExistentPath = new Path(testFolder.toString(), "non_existent_file"); + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, nonExistentPath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + assertTrue("Error message should mention file does not exist and file path name", + e.getMessage().contains("Log file does not exist: " + nonExistentPath)); + } + } + + /** + * Tests error handling when attempting to create LogFileReader with an invalid/corrupted file. + */ + @Test + public void testCreateLogFileReaderWithInvalidLogFile() throws IOException { + Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI()); + localFs.create(invalidFilePath).close(); // Create empty file + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); + fail("Should throw IOException for invalid file"); + } catch (IOException e) { + // Should throw some kind of IOException when trying to read header + assertTrue("Should throw IOException", true); + } finally { + // Delete the invalid file + localFs.delete(invalidFilePath); + } + } + + /** + * Tests the closeReader method with both null and valid LogFileReader instances. + */ + @Test + public void testCloseReader() throws IOException { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.closeReader(null); + Path filePath = new Path(testFolder.newFile("testCloseReader").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(filePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Test with valid reader + LogFileReader reader = Mockito.spy(new LogFileReader()); + + reader.init(new LogFileReaderContext(conf) + .setFileSystem(localFs) + .setFilePath(filePath)); + + replicationLogProcessor.closeReader(reader); + + // Ensure reader's close method is called only once + Mockito.verify(reader, Mockito.times(1)).close(); + } + + /** + * Tests processing an empty mutation map - should complete without errors. + */ + @Test + public void testProcessReplicationLogBatchWithEmptyMap() { + Map<String, List<Mutation>> emptyMap = new HashMap<>(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Process empty batch - should not throw any exceptions and should return immediately + try { + replicationLogProcessor.processReplicationLogBatch(emptyMap); + // If we reach here, the empty map was processed successfully + assertTrue("Processing empty map should complete without errors", true); + } catch (Exception e) { + fail("Processing empty map should not throw exception: " + e.getMessage()); + } + } + + /** + * Tests exception handling when attempting to process mutations for non-existent tables. + */ + @Test + public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() { + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5); + tableMutationsMap.put("NON_EXISTENT_TABLE", Collections.singletonList(mutation)); + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw TableNotFoundException for non-existent table"); + } catch (IOException exception) { + assertTrue("Error message should mention file does not exist and file path name", + exception.getMessage().contains("TableNotFoundException")); + } + } + + /** + * Tests behavior when HBase operations fail (simulated by disabling table). + */ + @Test + public void testProcessReplicationLogBatchWithHBaseFailure() throws Exception { Review Comment: Right now we'd have an equivalent result... a thrown IOException. However as the implementation evolves we might have a more granular response. Resubmission of only the mutations involved with failed futures, etc. Do you want to try to tackle this now, even? It's much more likely a regionserver is down than a table is disabled, so batches are likely to only partially fail. ########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.phoenix.replication.log.LogFile; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessor.class); + + /** + * The maximum count of mutations to process in single batch while reading replication log file + */ + public static final String REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = + "phoenix.replication.log.standby.replay.batch.size"; + + /** + * The default batch size for reading the replication log file. + * Assuming each log record to be 10 KB (un-compressed) and allow at max 64 MB of + * in-memory records to be processed + */ + public static final int DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE = 6400; + + /** + * The maximum number of retries for HBase client operations while applying the mutations + */ + public static final String REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = + "phoenix.replication.standby.hbase.client.retries.number"; + + /** + * The default number of retries for HBase client operations while applying the mutations. + */ + public static final int DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT = 4; + + /** + * The timeout for HBase client operations while applying the mutations. + */ + public static final String REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = + "phoenix.replication.standby.hbase.client.operations.timeout"; + + /** + * The default timeout for HBase client operations while applying the mutations. + */ + public static final int DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS = 10000; + + private final Configuration conf; + + private final ExecutorService executorService; + + /** + * This {@link AsyncConnection} is used for handling mutations + */ + private volatile AsyncConnection asyncConnection; + + private final Object asyncConnectionLock = new Object(); + + private final int batchSize; + + /** + * Creates a new ReplicationLogProcessor with the given configuration and executor service. + * @param conf The configuration to use + * @param executorService The executor service for processing mutations + * @throws IOException if initialization fails + */ + public ReplicationLogProcessor(final Configuration conf, + final ExecutorService executorService) { + // Create a copy of configuration as some of the properties would be + // overridden + this.conf = HBaseConfiguration.create(conf); + this.executorService = executorService; + this.batchSize = this.conf.getInt(REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE, + DEFAULT_REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE); + decorateConf(); + } + + /** + * Decorate the Configuration object to make replication more receptive to delays by + * reducing the timeout and number of retries. + */ + private void decorateConf() { + this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + this.conf.getInt(REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT, + DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_RETRIES_COUNT)); + this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + this.conf.getInt(REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS, + DEFAULT_REPLICATION_STANDBY_HBASE_CLIENT_OPERATION_TIMEOUT_MS)); + } + + public void processLogFile(FileSystem fs, Path filePath) throws IOException { + + // Map from Table Name to List of Mutations + Map<String, List<Mutation>> tableToMutationsMap = new HashMap<>(); + + // Track the total number of processed records from input log file + long totalProcessed = 0; + + // Track the current batch size as records will be processed in batch size of + // {@link REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE} + long currentBatchSize = 0; + + LogFileReader logFileReader = null; + + try { + // Create the LogFileReader for given path + logFileReader = createLogFileReader(fs, filePath); + + for (LogFile.Record record : logFileReader) { + + final String tableName = record.getHBaseTableName(); + final Mutation mutation = record.getMutation(); + + tableToMutationsMap.computeIfAbsent(tableName, k -> new ArrayList<>()) + .add(mutation); + currentBatchSize++; + + // Process when we reach the batch size and reset the batch size and + // table to mutations map + if (currentBatchSize >= getBatchSize()) { + processReplicationLogBatch(tableToMutationsMap); + totalProcessed += currentBatchSize; + tableToMutationsMap.clear(); + currentBatchSize = 0; + } + } + + // Process any remaining mutations + if (currentBatchSize > 0) { + processReplicationLogBatch(tableToMutationsMap); + totalProcessed += currentBatchSize; + } + + LOG.info("Completed processing log file {}. Total mutations processed: {}", + logFileReader.getContext().getFilePath(), totalProcessed); + + } catch (Exception e) { + LOG.error("Error while processing replication log file", e); + throw new IOException("Failed to process log file " + filePath, e); + } finally { + closeReader(logFileReader); + } + } + + protected LogFileReader createLogFileReader(FileSystem fs, Path filePath) throws IOException { + // Ensure that file exists. If we face exception while checking the path itself, + // method would throw same exception back to the caller + if (!fs.exists(filePath)) { + throw new IOException("Log file does not exist: " + filePath); + } + LogFileReader logFileReader = new LogFileReader(); + try { + LogFileReaderContext logFileReaderContext = new LogFileReaderContext(conf) + .setFileSystem(fs).setFilePath(filePath); + logFileReader.init(logFileReaderContext); + } catch (IOException exception) { + LOG.error("Failed to initialize new LogFileReader for path {}", + filePath, exception); + throw exception; + } + return logFileReader; + } + + /** + * Closes the given writer, logging any errors that occur during close. + */ + protected void closeReader(LogFileReader logFileReader) { + if (logFileReader == null) { + return; + } + try { + logFileReader.close(); + } catch (IOException exception) { + LOG.error("Error while closing LogFileReader: {}", + logFileReader, exception); + } + } + + protected void processReplicationLogBatch( + Map<String, List<Mutation>> tableMutationMap) throws IOException { + + if (tableMutationMap == null || tableMutationMap.isEmpty()) { + return; + } + + List<Future<?>> futures = new ArrayList<>(); + for (Map.Entry<String, List<Mutation>> entry : tableMutationMap.entrySet()) { + String tableName = entry.getKey(); + List<Mutation> mutations = entry.getValue(); + AsyncTable<?> table = getAsyncConnection() + .getTable(TableName.valueOf(tableName), executorService); + futures.add(table.batchAll(mutations)); + } + + IOException error = null; + + for (Future<?> future : futures) { + try { + FutureUtils.get(future); + } catch (RetriesExhaustedException e) { Review Comment: Is this the only exception type `get()`might throw here? Should we be catching `IOException` instead? ########## phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java: ########## @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.apache.phoenix.replication.log.LogFileTestUtil; +import org.apache.phoenix.replication.log.LogFileWriter; +import org.apache.phoenix.replication.log.LogFileWriterContext; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.*; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessorTest.class); + + private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s (ID VARCHAR PRIMARY KEY, " + + "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)"; + + private static final String UPSERT_SQL_STATEMENT = "upsert into %s values ('%s', '%s', '%s', %s)"; + + private static final String PRINCIPAL = "replicationLogProcessor"; + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private static Configuration conf; + private static FileSystem localFs; + private static ExecutorService executorService; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf = getUtility().getConfiguration(); + localFs = FileSystem.getLocal(conf); + executorService = Executors.newSingleThreadExecutor(); + } + + @AfterClass + public static void tearDownAfterClass() { + if(executorService != null) { + executorService.shutdown(); + } + } + + /** + * Tests successful creation of LogFileReader with a properly formatted log file. + */ + @Test + public void testCreateLogFileReaderWithValidLogFile() throws IOException { + // Test with valid log file + Path validFilePath = new Path(testFolder.newFile("valid_log_file").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(validFilePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Verify file exists and has content + assertTrue("Valid log file should exist", localFs.exists(validFilePath)); + assertTrue("Valid log file should have content", localFs.getFileStatus(validFilePath).getLen() > 0); + + // Test createLogFileReader with valid file - should succeed + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + LogFileReader reader = replicationLogProcessor.createLogFileReader(localFs, validFilePath); + + // Verify reader is created successfully + assertNotNull("Reader should not be null for valid file", reader); + assertNotNull("Reader context should not be null", reader.getContext()); + assertEquals("File path should match", validFilePath, reader.getContext().getFilePath()); + assertEquals("File system should match", localFs, reader.getContext().getFileSystem()); + + // Verify we can read from the reader + assertTrue("Reader should have records", reader.iterator().hasNext()); + + // Clean up + reader.close(); + } + + /** + * Tests error handling when attempting to create LogFileReader with a non-existent file. + */ + @Test + public void testCreateLogFileReaderWithNonExistentFile() { + Path nonExistentPath = new Path(testFolder.toString(), "non_existent_file"); + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, nonExistentPath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + assertTrue("Error message should mention file does not exist and file path name", + e.getMessage().contains("Log file does not exist: " + nonExistentPath)); + } + } + + /** + * Tests error handling when attempting to create LogFileReader with an invalid/corrupted file. + */ + @Test + public void testCreateLogFileReaderWithInvalidLogFile() throws IOException { + Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI()); + localFs.create(invalidFilePath).close(); // Create empty file + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); + fail("Should throw IOException for invalid file"); + } catch (IOException e) { + // Should throw some kind of IOException when trying to read header + assertTrue("Should throw IOException", true); + } finally { + // Delete the invalid file + localFs.delete(invalidFilePath); + } + } + + /** + * Tests the closeReader method with both null and valid LogFileReader instances. + */ + @Test + public void testCloseReader() throws IOException { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.closeReader(null); + Path filePath = new Path(testFolder.newFile("testCloseReader").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(filePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Test with valid reader + LogFileReader reader = Mockito.spy(new LogFileReader()); + + reader.init(new LogFileReaderContext(conf) + .setFileSystem(localFs) + .setFilePath(filePath)); + + replicationLogProcessor.closeReader(reader); + + // Ensure reader's close method is called only once + Mockito.verify(reader, Mockito.times(1)).close(); + } + + /** + * Tests processing an empty mutation map - should complete without errors. + */ + @Test + public void testProcessReplicationLogBatchWithEmptyMap() { + Map<String, List<Mutation>> emptyMap = new HashMap<>(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Process empty batch - should not throw any exceptions and should return immediately + try { + replicationLogProcessor.processReplicationLogBatch(emptyMap); + // If we reach here, the empty map was processed successfully + assertTrue("Processing empty map should complete without errors", true); + } catch (Exception e) { + fail("Processing empty map should not throw exception: " + e.getMessage()); + } + } + + /** + * Tests exception handling when attempting to process mutations for non-existent tables. + */ + @Test + public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() { + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5); + tableMutationsMap.put("NON_EXISTENT_TABLE", Collections.singletonList(mutation)); + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw TableNotFoundException for non-existent table"); + } catch (IOException exception) { + assertTrue("Error message should mention file does not exist and file path name", + exception.getMessage().contains("TableNotFoundException")); + } + } + + /** + * Tests behavior when HBase operations fail (simulated by disabling table). + */ + @Test + public void testProcessReplicationLogBatchWithHBaseFailure() throws Exception { + final String tableName = "T_" + generateUniqueName(); + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + // Create table first + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, tableName)); + PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); + // Generate some mutations for the table + List<Mutation> mutations = generateHBaseMutations(phoenixConnection, 2, tableName, 10L); + tableMutationsMap.put(tableName, mutations); + TableName hbaseTableName = TableName.valueOf(tableName); + try (Admin admin = phoenixConnection.getQueryServices().getAdmin()) { + // Disable the table to simulate HBase failure + admin.disableTable(hbaseTableName); + LOG.info("Disabled table {} to simulate HBase failure", tableName); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Attempt to process mutations on disabled table - should fail + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw IOException when trying to apply mutations to disabled table"); + } catch (IOException e) { + // Expected behavior - disabled table should cause IOException + assertTrue("Should throw IOException for disabled table", true); + LOG.info("Expected IOException caught when processing mutations on disabled table: " + e.getMessage()); + } + } + } + } + + /** + * Tests end-to-end processing of a valid log file with mutations for multiple tables. + */ + @Test + public void testProcessLogFileForValidLogFile() throws Exception { + final String table1Name = "T_" + generateUniqueName(); + final String table2Name = "T_" + generateUniqueName(); + final Path filePath = new Path(testFolder.newFile("testProcessLogFileEnd2End").toURI()); + LogFileWriter writer = initLogFileWriter(filePath); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, table1Name)); + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, table2Name)); + PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); + + List<Mutation> table1Mutations = generateHBaseMutations(phoenixConnection, 2, table1Name, 100L); + List<Mutation> table2Mutations = generateHBaseMutations(phoenixConnection, 5, table2Name, 101L); + table1Mutations.forEach(mutation -> { + try { + writer.append(table1Name, mutation.hashCode(), mutation); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + table2Mutations.forEach(mutation -> { + try { + writer.append(table2Name, mutation.hashCode(), mutation); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + writer.sync(); + writer.close(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.processLogFile(localFs, filePath); + + validate(table1Name, table1Mutations); + validate(table2Name, table2Mutations); + } + } + + /** + * Tests error handling when attempting to process a non-existent log file. + */ + @Test + public void testProcessLogFileWithNonExistentFile() throws Exception { + // Create a path to a file that doesn't exist + Path nonExistentFilePath = new Path(testFolder.getRoot().getAbsolutePath(), "non_existent_log_file.log"); + // Verify the file doesn't exist + assertFalse("Non-existent file should not exist", localFs.exists(nonExistentFilePath)); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Attempt to process non-existent file - should throw IOException + try { + replicationLogProcessor.processLogFile(localFs, nonExistentFilePath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + // Expected behavior - non-existent file should cause IOException + assertTrue("Should throw IOException for non-existent file", true); + } + } + + /** + * Tests batching logic when processing log files with mutations for multiple tables. + */ + @Test + public void testProcessLogFileBatchingWithMultipleTables() throws Exception { + final Path multiTableBatchFilePath = new Path(testFolder.newFile("testMultiTableBatch").toURI()); + final String table1Name = "T1_" + generateUniqueName(); + final String table2Name = "T2_" + generateUniqueName(); + final int batchSize = 4; + final int recordsPerTable = 3; + final int totalRecords = recordsPerTable * 2; // 6 total records + final int expectedBatchCalls = (totalRecords + batchSize - 1) / batchSize; // 2 calls + // Create log file with mutations for multiple tables + LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath); + // Add mutations alternating between tables using LogFileTestUtil + for (int i = 0; i < recordsPerTable; i++) { + // Add mutation for table1 + Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, (i * 2) + 1); + writer.append(table1Name, (i * 2) + 1, put1); + writer.sync(); + // Add mutation for table2 + Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, (i * 2) + 2); + writer.append(table2Name, (i * 2) + 2, put2); + writer.sync(); + } + writer.close(); + // Create processor with custom batch size and spy on it + Configuration testConf = new Configuration(conf); + testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE, batchSize); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(testConf, executorService); + // Validate that the batch size is correctly set + assertEquals("Batch size should be set correctly", batchSize, replicationLogProcessor.getBatchSize()); + ReplicationLogProcessor spyProcessor = Mockito.spy(replicationLogProcessor); + // Mock the processReplicationLogBatch method + Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class)); + // Process the log file + spyProcessor.processLogFile(localFs, multiTableBatchFilePath); + // Verify processReplicationLogBatch was called the expected number of times + Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls)) + .processReplicationLogBatch(Mockito.any(Map.class)); + } + + /** + * Tests processing of empty log files (files with header/trailer but no mutation records). + */ + @Test + public void testProcessLogFileWithEmptyFile() throws Exception { + final Path emptyFilePath = new Path(testFolder.newFile("testProcessLogFileEmpty").toURI()); + LogFileWriter writer = initLogFileWriter(emptyFilePath); + + // Close the writer without adding any records - this creates a valid empty log file + writer.close(); + + // Verify file exists and has some content (header + trailer) + assertTrue("Empty log file should exist", localFs.exists(emptyFilePath)); + assertTrue("Empty log file should have header/trailer content", localFs.getFileStatus(emptyFilePath).getLen() > 0); + + // Process the empty log file - should not throw any exceptions + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + try { + replicationLogProcessor.processLogFile(localFs, emptyFilePath); + // If we reach here, the empty file was processed successfully + assertTrue("Processing empty log file should complete without errors", true); + } catch (Exception e) { + fail("Processing empty log file should not throw exception: " + e.getMessage()); + } + } + + /** + * Tests processing of log files that were not closed, ensuring it's successf. Review Comment: Spelling ########## phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java: ########## @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.apache.phoenix.replication.log.LogFileTestUtil; +import org.apache.phoenix.replication.log.LogFileWriter; +import org.apache.phoenix.replication.log.LogFileWriterContext; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.*; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessorTest.class); + + private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s (ID VARCHAR PRIMARY KEY, " + + "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)"; + + private static final String UPSERT_SQL_STATEMENT = "upsert into %s values ('%s', '%s', '%s', %s)"; + + private static final String PRINCIPAL = "replicationLogProcessor"; + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private static Configuration conf; + private static FileSystem localFs; + private static ExecutorService executorService; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf = getUtility().getConfiguration(); + localFs = FileSystem.getLocal(conf); + executorService = Executors.newSingleThreadExecutor(); + } + + @AfterClass + public static void tearDownAfterClass() { + if(executorService != null) { + executorService.shutdown(); + } + } + + /** + * Tests successful creation of LogFileReader with a properly formatted log file. + */ + @Test + public void testCreateLogFileReaderWithValidLogFile() throws IOException { + // Test with valid log file + Path validFilePath = new Path(testFolder.newFile("valid_log_file").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(validFilePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Verify file exists and has content + assertTrue("Valid log file should exist", localFs.exists(validFilePath)); + assertTrue("Valid log file should have content", localFs.getFileStatus(validFilePath).getLen() > 0); + + // Test createLogFileReader with valid file - should succeed + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + LogFileReader reader = replicationLogProcessor.createLogFileReader(localFs, validFilePath); + + // Verify reader is created successfully + assertNotNull("Reader should not be null for valid file", reader); + assertNotNull("Reader context should not be null", reader.getContext()); + assertEquals("File path should match", validFilePath, reader.getContext().getFilePath()); + assertEquals("File system should match", localFs, reader.getContext().getFileSystem()); + + // Verify we can read from the reader + assertTrue("Reader should have records", reader.iterator().hasNext()); + + // Clean up + reader.close(); + } + + /** + * Tests error handling when attempting to create LogFileReader with a non-existent file. + */ + @Test + public void testCreateLogFileReaderWithNonExistentFile() { + Path nonExistentPath = new Path(testFolder.toString(), "non_existent_file"); + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, nonExistentPath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + assertTrue("Error message should mention file does not exist and file path name", + e.getMessage().contains("Log file does not exist: " + nonExistentPath)); + } + } + + /** + * Tests error handling when attempting to create LogFileReader with an invalid/corrupted file. + */ + @Test + public void testCreateLogFileReaderWithInvalidLogFile() throws IOException { + Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI()); + localFs.create(invalidFilePath).close(); // Create empty file + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); + fail("Should throw IOException for invalid file"); + } catch (IOException e) { + // Should throw some kind of IOException when trying to read header + assertTrue("Should throw IOException", true); + } finally { + // Delete the invalid file + localFs.delete(invalidFilePath); + } + } + + /** + * Tests the closeReader method with both null and valid LogFileReader instances. + */ + @Test + public void testCloseReader() throws IOException { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.closeReader(null); + Path filePath = new Path(testFolder.newFile("testCloseReader").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(filePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Test with valid reader + LogFileReader reader = Mockito.spy(new LogFileReader()); + + reader.init(new LogFileReaderContext(conf) + .setFileSystem(localFs) + .setFilePath(filePath)); + + replicationLogProcessor.closeReader(reader); + + // Ensure reader's close method is called only once + Mockito.verify(reader, Mockito.times(1)).close(); + } + + /** + * Tests processing an empty mutation map - should complete without errors. + */ + @Test + public void testProcessReplicationLogBatchWithEmptyMap() { + Map<String, List<Mutation>> emptyMap = new HashMap<>(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Process empty batch - should not throw any exceptions and should return immediately + try { + replicationLogProcessor.processReplicationLogBatch(emptyMap); + // If we reach here, the empty map was processed successfully + assertTrue("Processing empty map should complete without errors", true); + } catch (Exception e) { + fail("Processing empty map should not throw exception: " + e.getMessage()); + } + } + + /** + * Tests exception handling when attempting to process mutations for non-existent tables. + */ + @Test + public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() { + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5); + tableMutationsMap.put("NON_EXISTENT_TABLE", Collections.singletonList(mutation)); + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw TableNotFoundException for non-existent table"); + } catch (IOException exception) { + assertTrue("Error message should mention file does not exist and file path name", + exception.getMessage().contains("TableNotFoundException")); + } + } + + /** + * Tests behavior when HBase operations fail (simulated by disabling table). + */ + @Test + public void testProcessReplicationLogBatchWithHBaseFailure() throws Exception { + final String tableName = "T_" + generateUniqueName(); + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + // Create table first + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, tableName)); + PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); + // Generate some mutations for the table + List<Mutation> mutations = generateHBaseMutations(phoenixConnection, 2, tableName, 10L); + tableMutationsMap.put(tableName, mutations); + TableName hbaseTableName = TableName.valueOf(tableName); + try (Admin admin = phoenixConnection.getQueryServices().getAdmin()) { + // Disable the table to simulate HBase failure + admin.disableTable(hbaseTableName); + LOG.info("Disabled table {} to simulate HBase failure", tableName); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Attempt to process mutations on disabled table - should fail + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw IOException when trying to apply mutations to disabled table"); + } catch (IOException e) { + // Expected behavior - disabled table should cause IOException + assertTrue("Should throw IOException for disabled table", true); + LOG.info("Expected IOException caught when processing mutations on disabled table: " + e.getMessage()); + } + } + } + } + + /** + * Tests end-to-end processing of a valid log file with mutations for multiple tables. + */ + @Test + public void testProcessLogFileForValidLogFile() throws Exception { + final String table1Name = "T_" + generateUniqueName(); + final String table2Name = "T_" + generateUniqueName(); + final Path filePath = new Path(testFolder.newFile("testProcessLogFileEnd2End").toURI()); + LogFileWriter writer = initLogFileWriter(filePath); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, table1Name)); + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, table2Name)); + PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); + + List<Mutation> table1Mutations = generateHBaseMutations(phoenixConnection, 2, table1Name, 100L); + List<Mutation> table2Mutations = generateHBaseMutations(phoenixConnection, 5, table2Name, 101L); + table1Mutations.forEach(mutation -> { + try { + writer.append(table1Name, mutation.hashCode(), mutation); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + table2Mutations.forEach(mutation -> { + try { + writer.append(table2Name, mutation.hashCode(), mutation); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + writer.sync(); + writer.close(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.processLogFile(localFs, filePath); + + validate(table1Name, table1Mutations); + validate(table2Name, table2Mutations); + } + } + + /** + * Tests error handling when attempting to process a non-existent log file. + */ + @Test + public void testProcessLogFileWithNonExistentFile() throws Exception { + // Create a path to a file that doesn't exist + Path nonExistentFilePath = new Path(testFolder.getRoot().getAbsolutePath(), "non_existent_log_file.log"); + // Verify the file doesn't exist + assertFalse("Non-existent file should not exist", localFs.exists(nonExistentFilePath)); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Attempt to process non-existent file - should throw IOException + try { + replicationLogProcessor.processLogFile(localFs, nonExistentFilePath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + // Expected behavior - non-existent file should cause IOException + assertTrue("Should throw IOException for non-existent file", true); + } + } + + /** + * Tests batching logic when processing log files with mutations for multiple tables. + */ + @Test + public void testProcessLogFileBatchingWithMultipleTables() throws Exception { + final Path multiTableBatchFilePath = new Path(testFolder.newFile("testMultiTableBatch").toURI()); + final String table1Name = "T1_" + generateUniqueName(); + final String table2Name = "T2_" + generateUniqueName(); + final int batchSize = 4; + final int recordsPerTable = 3; + final int totalRecords = recordsPerTable * 2; // 6 total records + final int expectedBatchCalls = (totalRecords + batchSize - 1) / batchSize; // 2 calls + // Create log file with mutations for multiple tables + LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath); + // Add mutations alternating between tables using LogFileTestUtil + for (int i = 0; i < recordsPerTable; i++) { + // Add mutation for table1 + Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, (i * 2) + 1); + writer.append(table1Name, (i * 2) + 1, put1); + writer.sync(); + // Add mutation for table2 + Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, (i * 2) + 2); + writer.append(table2Name, (i * 2) + 2, put2); + writer.sync(); + } + writer.close(); + // Create processor with custom batch size and spy on it + Configuration testConf = new Configuration(conf); + testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE, batchSize); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(testConf, executorService); + // Validate that the batch size is correctly set + assertEquals("Batch size should be set correctly", batchSize, replicationLogProcessor.getBatchSize()); + ReplicationLogProcessor spyProcessor = Mockito.spy(replicationLogProcessor); + // Mock the processReplicationLogBatch method + Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class)); + // Process the log file + spyProcessor.processLogFile(localFs, multiTableBatchFilePath); + // Verify processReplicationLogBatch was called the expected number of times + Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls)) + .processReplicationLogBatch(Mockito.any(Map.class)); + } + + /** + * Tests processing of empty log files (files with header/trailer but no mutation records). + */ + @Test + public void testProcessLogFileWithEmptyFile() throws Exception { + final Path emptyFilePath = new Path(testFolder.newFile("testProcessLogFileEmpty").toURI()); + LogFileWriter writer = initLogFileWriter(emptyFilePath); + + // Close the writer without adding any records - this creates a valid empty log file + writer.close(); + + // Verify file exists and has some content (header + trailer) + assertTrue("Empty log file should exist", localFs.exists(emptyFilePath)); + assertTrue("Empty log file should have header/trailer content", localFs.getFileStatus(emptyFilePath).getLen() > 0); + + // Process the empty log file - should not throw any exceptions + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + try { + replicationLogProcessor.processLogFile(localFs, emptyFilePath); + // If we reach here, the empty file was processed successfully + assertTrue("Processing empty log file should complete without errors", true); + } catch (Exception e) { + fail("Processing empty log file should not throw exception: " + e.getMessage()); + } + } + + /** + * Tests processing of log files that were not closed, ensuring it's successf. + */ + @Test + public void testProcessLogFileForUnClosedFile() throws Exception { + final Path emptyFilePath = new Path(testFolder.newFile("testProcessLogFileForUnClosedFile").toURI()); + LogFileWriter writer = initLogFileWriter(emptyFilePath); + + // Add one mutation + Mutation put = LogFileTestUtil.newPut("row1", 3L, 4); + writer.append("table", 1, put); + writer.sync(); + + // Process the file without closing - should not throw any exceptions + ReplicationLogProcessor spyProcessor = Mockito.spy(new ReplicationLogProcessor(conf, executorService)); + Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class)); + + spyProcessor.processLogFile(localFs, emptyFilePath); + + // Verify processReplicationLogBatch was called the expected number of times + Mockito.verify(spyProcessor, Mockito.times(1)) + .processReplicationLogBatch(Mockito.any(Map.class)); + } Review Comment: Can also verify here that the expected mutation was seen, `put` with sequence `3L`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org