Himanshu-g81 commented on code in PR #2188:
URL: https://github.com/apache/phoenix/pull/2188#discussion_r2160359792


##########
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogProcessorTest.class);
+
+    private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s 
(ID VARCHAR PRIMARY KEY, " +
+            "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)";
+
+    private static final String UPSERT_SQL_STATEMENT = "upsert into %s values 
('%s', '%s', '%s', %s)";
+
+    private static final String PRINCIPAL = "replicationLogProcessor";
+
+    @ClassRule
+    public static TemporaryFolder testFolder = new TemporaryFolder();
+
+    private static Configuration conf;
+    private static FileSystem localFs;
+    private static ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupBeforeClass() throws Exception {
+        conf = getUtility().getConfiguration();
+        localFs = FileSystem.getLocal(conf);
+        executorService = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        if(executorService != null) {
+            executorService.shutdown();
+        }
+    }
+
+    /**
+     * Tests successful creation of LogFileReader with a properly formatted 
log file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithValidLogFile() throws IOException {
+        // Test with valid log file
+        Path validFilePath = new 
Path(testFolder.newFile("valid_log_file").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(validFilePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Verify file exists and has content
+        assertTrue("Valid log file should exist", 
localFs.exists(validFilePath));
+        assertTrue("Valid log file should have content", 
localFs.getFileStatus(validFilePath).getLen() > 0);
+
+        // Test createLogFileReader with valid file - should succeed
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        LogFileReader reader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+
+        // Verify reader is created successfully
+        assertNotNull("Reader should not be null for valid file", reader);
+        assertNotNull("Reader context should not be null", 
reader.getContext());
+        assertEquals("File path should match", validFilePath, 
reader.getContext().getFilePath());
+        assertEquals("File system should match", localFs, 
reader.getContext().getFileSystem());
+
+        // Verify we can read from the reader
+        assertTrue("Reader should have records", reader.iterator().hasNext());
+
+        // Clean up
+        reader.close();
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with a 
non-existent file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithNonExistentFile() {
+        Path nonExistentPath = new Path(testFolder.toString(), 
"non_existent_file");
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
nonExistentPath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    e.getMessage().contains("Log file does not exist: " + 
nonExistentPath));
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to create LogFileReader with an 
invalid/corrupted file.
+     */
+    @Test
+    public void testCreateLogFileReaderWithInvalidLogFile() throws IOException 
{
+        Path invalidFilePath = new 
Path(testFolder.newFile("invalid_file").toURI());
+        localFs.create(invalidFilePath).close(); // Create empty file
+        try {
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.createLogFileReader(localFs, 
invalidFilePath);
+            fail("Should throw IOException for invalid file");
+        } catch (IOException e) {
+            // Should throw some kind of IOException when trying to read header
+            assertTrue("Should throw IOException", true);
+        } finally {
+            // Delete the invalid file
+            localFs.delete(invalidFilePath);
+        }
+    }
+
+    /**
+     * Tests the closeReader method with both null and valid LogFileReader 
instances.
+     */
+    @Test
+    public void testCloseReader() throws IOException {
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        replicationLogProcessor.closeReader(null);
+        Path filePath = new 
Path(testFolder.newFile("testCloseReader").toURI());
+        String tableName = "T_" + generateUniqueName();
+
+        // Create a valid log file with proper structure and one record
+        LogFileWriter writer = initLogFileWriter(filePath);
+
+        // Add a mutation to make it a proper log file with data
+        Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
+        writer.append(tableName, 1, put);
+        writer.sync();
+        writer.close();
+
+        // Test with valid reader
+        LogFileReader reader = Mockito.spy(new LogFileReader());
+
+        reader.init(new LogFileReaderContext(conf)
+                .setFileSystem(localFs)
+                .setFilePath(filePath));
+
+        replicationLogProcessor.closeReader(reader);
+
+        // Ensure reader's close method is called only once
+        Mockito.verify(reader, Mockito.times(1)).close();
+    }
+
+    /**
+     * Tests processing an empty mutation map - should complete without errors.
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithEmptyMap() {
+        Map<String, List<Mutation>> emptyMap = new HashMap<>();
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Process empty batch - should not throw any exceptions and should 
return immediately
+        try {
+            replicationLogProcessor.processReplicationLogBatch(emptyMap);
+            // If we reach here, the empty map was processed successfully
+            assertTrue("Processing empty map should complete without errors", 
true);
+        } catch (Exception e) {
+            fail("Processing empty map should not throw exception: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Tests exception handling when attempting to process mutations for 
non-existent tables.
+     */
+    @Test
+    public void testProcessReplicationLogBatchExceptionsMessageIsCorrect() {
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        Mutation mutation = LogFileTestUtil.newPut("abc", 6L, 5);
+        tableMutationsMap.put("NON_EXISTENT_TABLE", 
Collections.singletonList(mutation));
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        try {
+            
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+            fail("Should throw TableNotFoundException for non-existent table");
+        } catch (IOException exception) {
+            assertTrue("Error message should mention file does not exist and 
file path name",
+                    exception.getMessage().contains("TableNotFoundException"));
+        }
+    }
+
+    /**
+     * Tests behavior when HBase operations fail (simulated by disabling 
table).
+     */
+    @Test
+    public void testProcessReplicationLogBatchWithHBaseFailure() throws 
Exception {
+        final String tableName = "T_" + generateUniqueName();
+        Map<String, List<Mutation>> tableMutationsMap = new HashMap<>();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Create table first
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
tableName));
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+            // Generate some mutations for the table
+            List<Mutation> mutations = 
generateHBaseMutations(phoenixConnection, 2, tableName, 10L);
+            tableMutationsMap.put(tableName, mutations);
+            TableName hbaseTableName = TableName.valueOf(tableName);
+            try (Admin admin = 
phoenixConnection.getQueryServices().getAdmin()) {
+                // Disable the table to simulate HBase failure
+                admin.disableTable(hbaseTableName);
+                LOG.info("Disabled table {} to simulate HBase failure", 
tableName);
+
+                ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+                // Attempt to process mutations on disabled table - should fail
+                try {
+                    
replicationLogProcessor.processReplicationLogBatch(tableMutationsMap);
+                    fail("Should throw IOException when trying to apply 
mutations to disabled table");
+                } catch (IOException e) {
+                    // Expected behavior - disabled table should cause 
IOException
+                    assertTrue("Should throw IOException for disabled table", 
true);
+                    LOG.info("Expected IOException caught when processing 
mutations on disabled table: " + e.getMessage());
+                }
+            }
+        }
+    }
+
+    /**
+     * Tests end-to-end processing of a valid log file with mutations for 
multiple tables.
+     */
+    @Test
+    public void testProcessLogFileForValidLogFile() throws Exception {
+        final String table1Name = "T_" + generateUniqueName();
+        final String table2Name = "T_" + generateUniqueName();
+        final Path filePath = new 
Path(testFolder.newFile("testProcessLogFileEnd2End").toURI());
+        LogFileWriter writer = initLogFileWriter(filePath);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table1Name));
+            
conn.createStatement().execute(String.format(CREATE_TABLE_SQL_STATEMENT, 
table2Name));
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+
+            List<Mutation> table1Mutations = 
generateHBaseMutations(phoenixConnection, 2, table1Name, 100L);
+            List<Mutation> table2Mutations = 
generateHBaseMutations(phoenixConnection, 5, table2Name, 101L);
+            table1Mutations.forEach(mutation -> {
+                try {
+                    writer.append(table1Name, mutation.hashCode(), mutation);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            table2Mutations.forEach(mutation -> {
+                try {
+                    writer.append(table2Name, mutation.hashCode(), mutation);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            writer.sync();
+            writer.close();
+
+            ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+            replicationLogProcessor.processLogFile(localFs, filePath);
+
+            validate(table1Name, table1Mutations);
+            validate(table2Name, table2Mutations);
+        }
+    }
+
+    /**
+     * Tests error handling when attempting to process a non-existent log file.
+     */
+    @Test
+    public void testProcessLogFileWithNonExistentFile() throws Exception {
+        // Create a path to a file that doesn't exist
+        Path nonExistentFilePath = new 
Path(testFolder.getRoot().getAbsolutePath(), "non_existent_log_file.log");
+        // Verify the file doesn't exist
+        assertFalse("Non-existent file should not exist", 
localFs.exists(nonExistentFilePath));
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, executorService);
+        // Attempt to process non-existent file - should throw IOException
+        try {
+            replicationLogProcessor.processLogFile(localFs, 
nonExistentFilePath);
+            fail("Should throw IOException for non-existent file");
+        } catch (IOException e) {
+            // Expected behavior - non-existent file should cause IOException
+            assertTrue("Should throw IOException for non-existent file", true);
+        }
+    }
+
+    /**
+     * Tests batching logic when processing log files with mutations for 
multiple tables.
+     */
+    @Test
+    public void testProcessLogFileBatchingWithMultipleTables() throws 
Exception {
+        final Path multiTableBatchFilePath = new 
Path(testFolder.newFile("testMultiTableBatch").toURI());
+        final String table1Name = "T1_" + generateUniqueName();
+        final String table2Name = "T2_" + generateUniqueName();
+        final int batchSize = 4;
+        final int recordsPerTable = 3;
+        final int totalRecords = recordsPerTable * 2; // 6 total records
+        final int expectedBatchCalls = (totalRecords + batchSize - 1) / 
batchSize; // 2 calls
+        // Create log file with mutations for multiple tables
+        LogFileWriter writer = initLogFileWriter(multiTableBatchFilePath);
+        // Add mutations alternating between tables using LogFileTestUtil
+        for (int i = 0; i < recordsPerTable; i++) {
+            // Add mutation for table1
+            Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, 
(i * 2) + 1);
+            writer.append(table1Name, (i * 2) + 1, put1);
+            writer.sync();
+            // Add mutation for table2
+            Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, 
(i * 2) + 2);
+            writer.append(table2Name, (i * 2) + 2, put2);
+            writer.sync();
+        }
+        writer.close();
+        // Create processor with custom batch size and spy on it
+        Configuration testConf = new Configuration(conf);
+        
testConf.setInt(ReplicationLogProcessor.REPLICATION_STANDBY_LOG_REPLAY_BATCH_SIZE,
 batchSize);
+
+        ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(testConf, executorService);
+        // Validate that the batch size is correctly set
+        assertEquals("Batch size should be set correctly", batchSize, 
replicationLogProcessor.getBatchSize());
+        ReplicationLogProcessor spyProcessor = 
Mockito.spy(replicationLogProcessor);
+        // Mock the processReplicationLogBatch method
+        
Mockito.doNothing().when(spyProcessor).processReplicationLogBatch(Mockito.any(Map.class));
+        // Process the log file
+        spyProcessor.processLogFile(localFs, multiTableBatchFilePath);
+        // Verify processReplicationLogBatch was called the expected number of 
times
+        Mockito.verify(spyProcessor, Mockito.times(expectedBatchCalls))
+            .processReplicationLogBatch(Mockito.any(Map.class));

Review Comment:
   Sure, added in latest commit 
([6b3fc83](https://github.com/apache/phoenix/pull/2188/commits/6b3fc83b0d626ea35a416bd172353d0784d26acd)).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to