Himanshu-g81 commented on code in PR #2188: URL: https://github.com/apache/phoenix/pull/2188#discussion_r2160359792
########## phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java: ########## @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.log.LogFileReader; +import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.apache.phoenix.replication.log.LogFileTestUtil; +import org.apache.phoenix.replication.log.LogFileWriter; +import org.apache.phoenix.replication.log.LogFileWriterContext; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.*; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogProcessorTest.class); + + private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s (ID VARCHAR PRIMARY KEY, " + + "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)"; + + private static final String UPSERT_SQL_STATEMENT = "upsert into %s values ('%s', '%s', '%s', %s)"; + + private static final String PRINCIPAL = "replicationLogProcessor"; + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private static Configuration conf; + private static FileSystem localFs; + private static ExecutorService executorService; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf = getUtility().getConfiguration(); + localFs = FileSystem.getLocal(conf); + executorService = Executors.newSingleThreadExecutor(); + } + + @AfterClass + public static void tearDownAfterClass() { + if(executorService != null) { + executorService.shutdown(); + } + } + + /** + * Tests successful creation of LogFileReader with a properly formatted log file. + */ + @Test + public void testCreateLogFileReaderWithValidLogFile() throws IOException { + // Test with valid log file + Path validFilePath = new Path(testFolder.newFile("valid_log_file").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(validFilePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Verify file exists and has content + assertTrue("Valid log file should exist", localFs.exists(validFilePath)); + assertTrue("Valid log file should have content", localFs.getFileStatus(validFilePath).getLen() > 0); + + // Test createLogFileReader with valid file - should succeed + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + LogFileReader reader = replicationLogProcessor.createLogFileReader(localFs, validFilePath); + + // Verify reader is created successfully + assertNotNull("Reader should not be null for valid file", reader); + assertNotNull("Reader context should not be null", reader.getContext()); + assertEquals("File path should match", validFilePath, reader.getContext().getFilePath()); + assertEquals("File system should match", localFs, reader.getContext().getFileSystem()); + + // Verify we can read from the reader + assertTrue("Reader should have records", reader.iterator().hasNext()); + + // Clean up + reader.close(); + } + + /** + * Tests error handling when attempting to create LogFileReader with a non-existent file. + */ + @Test + public void testCreateLogFileReaderWithNonExistentFile() { + Path nonExistentPath = new Path(testFolder.toString(), "non_existent_file"); + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, nonExistentPath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + assertTrue("Error message should mention file does not exist and file path name", + e.getMessage().contains("Log file does not exist: " + nonExistentPath)); + } + } + + /** + * Tests error handling when attempting to create LogFileReader with an invalid/corrupted file. + */ + @Test + public void testCreateLogFileReaderWithInvalidLogFile() throws IOException { + Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI()); + localFs.create(invalidFilePath).close(); // Create empty file + try { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); + fail("Should throw IOException for invalid file"); + } catch (IOException e) { + // Should throw some kind of IOException when trying to read header + assertTrue("Should throw IOException", true); + } finally { + // Delete the invalid file + localFs.delete(invalidFilePath); + } + } + + /** + * Tests the closeReader method with both null and valid LogFileReader instances. + */ + @Test + public void testCloseReader() throws IOException { + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.closeReader(null); + Path filePath = new Path(testFolder.newFile("testCloseReader").toURI()); + String tableName = "T_" + generateUniqueName(); + + // Create a valid log file with proper structure and one record + LogFileWriter writer = initLogFileWriter(filePath); + + // Add a mutation to make it a proper log file with data + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + writer.close(); + + // Test with valid reader + LogFileReader reader = Mockito.spy(new LogFileReader()); + + reader.init(new LogFileReaderContext(conf) + .setFileSystem(localFs) + .setFilePath(filePath)); + + replicationLogProcessor.closeReader(reader); + + // Ensure reader's close method is called only once + Mockito.verify(reader, Mockito.times(1)).close(); + } + + /** + * Tests processing an empty mutation map - should complete without errors. + */ + @Test + public void testProcessReplicationLogBatchWithEmptyMap() { + Map<String, List<Mutation>> emptyMap = new HashMap<>(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Process empty batch - should not throw any exceptions and should return immediately + try { + replicationLogProcessor.processReplicationLogBatch(emptyMap); + // If we reach here, the empty map was processed successfully + assertTrue("Processing empty map should complete without errors", true); + } catch (Exception e) { + fail("Processing empty map should not throw exception: " + e.getMessage()); + } + } + + /** + * Tests exception handling when attempting to process mutations for non-existent tables. + */ + @Test + public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() { + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5); + tableMutationsMap.put("NON_EXISTENT_TABLE", Collections.singletonList(mutation)); + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw TableNotFoundException for non-existent table"); + } catch (IOException exception) { + assertTrue("Error message should mention file does not exist and file path name", + exception.getMessage().contains("TableNotFoundException")); + } + } + + /** + * Tests behavior when HBase operations fail (simulated by disabling table). + */ + @Test + public void testProcessReplicationLogBatchWithHBaseFailure() throws Exception { + final String tableName = "T_" + generateUniqueName(); + Map<String, List<Mutation>> tableMutationsMap = new HashMap<>(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + // Create table first + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, tableName)); + PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); + // Generate some mutations for the table + List<Mutation> mutations = generateHBaseMutations(phoenixConnection, 2, tableName, 10L); + tableMutationsMap.put(tableName, mutations); + TableName hbaseTableName = TableName.valueOf(tableName); + try (Admin admin = phoenixConnection.getQueryServices().getAdmin()) { + // Disable the table to simulate HBase failure + admin.disableTable(hbaseTableName); + LOG.info("Disabled table {} to simulate HBase failure", tableName); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Attempt to process mutations on disabled table - should fail + try { + replicationLogProcessor.processReplicationLogBatch(tableMutationsMap); + fail("Should throw IOException when trying to apply mutations to disabled table"); + } catch (IOException e) { + // Expected behavior - disabled table should cause IOException + assertTrue("Should throw IOException for disabled table", true); + LOG.info("Expected IOException caught when processing mutations on disabled table: " + e.getMessage()); + } + } + } + } + + /** + * Tests end-to-end processing of a valid log file with mutations for multiple tables. + */ + @Test + public void testProcessLogFileForValidLogFile() throws Exception { + final String table1Name = "T_" + generateUniqueName(); + final String table2Name = "T_" + generateUniqueName(); + final Path filePath = new Path(testFolder.newFile("testProcessLogFileEnd2End").toURI()); + LogFileWriter writer = initLogFileWriter(filePath); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, table1Name)); + conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, table2Name)); + PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); + + List<Mutation> table1Mutations = generateHBaseMutations(phoenixConnection, 2, table1Name, 100L); + List<Mutation> table2Mutations = generateHBaseMutations(phoenixConnection, 5, table2Name, 101L); + table1Mutations.forEach(mutation -> { + try { + writer.append(table1Name, mutation.hashCode(), mutation); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + table2Mutations.forEach(mutation -> { + try { + writer.append(table2Name, mutation.hashCode(), mutation); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + writer.sync(); + writer.close(); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + replicationLogProcessor.processLogFile(localFs, filePath); + + validate(table1Name, table1Mutations); + validate(table2Name, table2Mutations); + } + } + + /** + * Tests error handling when attempting to process a non-existent log file. + */ + @Test + public void testProcessLogFileWithNonExistentFile() throws Exception { + // Create a path to a file that doesn't exist + Path nonExistentFilePath = new Path(testFolder.getRoot().getAbsolutePath(), "non_existent_log_file.log"); + // Verify the file doesn't exist + assertFalse("Non-existent file should not exist", localFs.exists(nonExistentFilePath)); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, executorService); + // Attempt to process non-existent file - should throw IOException + try { + replicationLogProcessor.processLogFile(localFs, nonExistentFilePath); + fail("Should throw IOException for non-existent file"); + } catch (IOException e) { + // Expected behavior - non-existent file should cause IOException + assertTrue("Should throw IOException for non-existent file", true); + } + } + + /** + * Tests batching logic when processing log files with mutations for multiple tables. + */ + @Test + public void testProcessLogFileBatchingWithMultipleTables() throws Exception { + final Path multiTableBatchFilePath = new Path(testFolder.newFile("testMultiTableBatch").toURI()); + final String table1Name = "T1_" + generateUniqueName(); + final String table2Name = "T2_" + generateUniqueName(); + final int batchSize = 4; + final int recordsPerTable = 3; + final int totalRecords = recordsPerTable * 2; // 6 total records + final int expectedBatchCalls = (totalRecords + batchSize - 1) / batchSize; // 2 calls + // Create log file with mutations for multiple tables + LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath); + // Add mutations alternating between tables using LogFileTestUtil + for (int i = 0; i < recordsPerTable; i++) { + // Add mutation for table1 + Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, (i * 2) + 1); + writer.append(table1Name, (i * 2) + 1, put1); + writer.sync(); + // Add mutation for table2 + Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, (i * 2) + 2); + writer.append(table2Name, (i * 2) + 2, put2); + writer.sync(); + } + writer.close(); + // Create processor with custom batch size and spy on it + Configuration testConf = new Configuration(conf); + testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE, batchSize); + + ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(testConf, executorService); + // Validate that the batch size is correctly set + assertEquals("Batch size should be set correctly", batchSize, replicationLogProcessor.getBatchSize()); + ReplicationLogProcessor spyProcessor = Mockito.spy(replicationLogProcessor); + // Mock the processReplicationLogBatch method + Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class)); + // Process the log file + spyProcessor.processLogFile(localFs, multiTableBatchFilePath); + // Verify processReplicationLogBatch was called the expected number of times + Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls)) + .processReplicationLogBatch(Mockito.any(Map.class)); Review Comment: Sure, added in latest commit ([6b3fc83](https://github.com/apache/phoenix/pull/2188/commits/6b3fc83b0d626ea35a416bd172353d0784d26acd)). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org