This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this 
push:
     new 11a1f5a680 Phoenix-7672 Adding failover implementation via replay 
(Addendum) (#2312)
11a1f5a680 is described below

commit 11a1f5a68005cc0247c9ba4b466b82b31e3700c5
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Wed Nov 12 03:58:28 2025 +0530

    Phoenix-7672 Adding failover implementation via replay (Addendum) (#2312)
---
 .../replication/ReplicationLogDiscovery.java       |  48 +-
 .../phoenix/replication/ReplicationLogTracker.java |   2 +-
 .../ReplicationShardDirectoryManager.java          |  10 +-
 .../reader/ReplicationLogDiscoveryReplay.java      |  79 ++-
 .../reader/ReplicationLogProcessor.java            |   2 +
 .../replication/reader/ReplicationLogReplay.java   |   2 +
 .../reader/ReplicationLogReplayService.java        |   4 +
 .../reader/RecoverLeaseFSUtilsTestIT.java}         |   5 +-
 .../ReplicationLogDiscoveryReplayTestIT.java}      | 546 ++++++++++++++++++++-
 .../reader/ReplicationLogProcessorTestIT.java}     |   5 +-
 .../reader/ReplicationLogReplayTestIT.java}        |   5 +-
 .../replication/ReplicationLogDiscoveryTest.java   |   2 +-
 .../ReplicationShardDirectoryManagerTest.java      | 166 ++++++-
 13 files changed, 796 insertions(+), 80 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 76051edef4..01e9e13651 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -115,6 +115,7 @@ public abstract class ReplicationLogDiscovery {
     }
 
     public void init() throws IOException {
+        LOG.info("Initializing ReplicationLogDiscovery for haGroup: {}", 
haGroupName);
         initializeLastRoundProcessed();
         this.metrics = createMetricsSource();
     }
@@ -134,7 +135,7 @@ public abstract class ReplicationLogDiscovery {
     public void start() throws IOException {
         synchronized (this) {
             if (isRunning) {
-                LOG.warn("ReplicationLogDiscovery is already running for 
group: {}", haGroupName);
+                LOG.warn("ReplicationLogDiscovery is already running for 
haGroup: {}", haGroupName);
                 return;
             }
             // Initialize and schedule the executors
@@ -150,7 +151,7 @@ public abstract class ReplicationLogDiscovery {
             }, 0, getReplayIntervalSeconds(), TimeUnit.SECONDS);
 
             isRunning = true;
-            LOG.info("ReplicationLogDiscovery started for group: {}", 
haGroupName);
+            LOG.info("ReplicationLogDiscovery started for haGroup: {}", 
haGroupName);
         }
     }
 
@@ -164,7 +165,7 @@ public abstract class ReplicationLogDiscovery {
 
         synchronized (this) {
             if (!isRunning) {
-                LOG.warn("ReplicationLogDiscovery is not running for group: 
{}", haGroupName);
+                LOG.warn("ReplicationLogDiscovery is not running for haGroup: 
{}", haGroupName);
                 return;
             }
 
@@ -185,7 +186,7 @@ public abstract class ReplicationLogDiscovery {
             }
         }
 
-        LOG.info("ReplicationLogDiscovery stopped for group: {}", haGroupName);
+        LOG.info("ReplicationLogDiscovery stopped for haGroup: {}", 
haGroupName);
     }
 
     /**
@@ -209,8 +210,8 @@ public abstract class ReplicationLogDiscovery {
             try {
                 processRound(replicationRound);
             } catch (IOException e) {
-                LOG.error("Failed processing replication round {}. Will retry 
in next "
-                        + "scheduled run.", replicationRound, e);
+                LOG.error("Failed processing replication round {} for haGroup 
{}. Will retry"
+                        + "in next scheduled run.", replicationRound, 
haGroupName, e);
                 break; // stop this run, retry later
             }
             setLastRoundProcessed(replicationRound);
@@ -244,7 +245,7 @@ public abstract class ReplicationLogDiscovery {
      * @throws IOException if there's an error during round processing
      */
     protected void processRound(ReplicationRound replicationRound) throws 
IOException {
-        LOG.info("Starting to process round: {}", replicationRound);
+        LOG.info("Starting to process round: {} for haGroup: {}", 
replicationRound, haGroupName);
         // Increment the number of rounds processed
         getMetrics().incrementNumRoundsProcessed();
 
@@ -254,7 +255,7 @@ public abstract class ReplicationLogDiscovery {
             // Conditionally process the in progress files for the round
             processInProgressDirectory();
         }
-        LOG.info("Finished processing round: {}", replicationRound);
+        LOG.info("Finished processing round: {} for haGroup: {}", 
replicationRound, haGroupName);
     }
 
     /**
@@ -275,7 +276,8 @@ public abstract class ReplicationLogDiscovery {
      * @throws IOException if there's an error during file processing
      */
     protected void processNewFilesForRound(ReplicationRound replicationRound) 
throws IOException {
-        LOG.info("Starting new files processing for round: {}", 
replicationRound);
+        LOG.info("Starting new files processing for round: {} for haGroup: {}",
+                replicationRound, haGroupName);
         long startTime = EnvironmentEdgeManager.currentTime();
         List<Path> files = 
replicationLogTracker.getNewFilesForRound(replicationRound);
         LOG.info("Number of new files for round {} is {}", replicationRound, 
files.size());
@@ -284,7 +286,8 @@ public abstract class ReplicationLogDiscovery {
             files = 
replicationLogTracker.getNewFilesForRound(replicationRound);
         }
         long duration = EnvironmentEdgeManager.currentTime() - startTime;
-        LOG.info("Finished new files processing for round: {} in {}ms", 
replicationRound, duration);
+        LOG.info("Finished new files processing for round: {} in {}ms for 
haGroup: {}",
+                replicationRound, duration, haGroupName);
         getMetrics().updateTimeToProcessNewFiles(duration);
     }
 
@@ -294,23 +297,25 @@ public abstract class ReplicationLogDiscovery {
      * @throws IOException if there's an error during file processing
      */
     protected void processInProgressDirectory() throws IOException {
+        LOG.info("Starting {} directory processing for haGroup: {}",
+                replicationLogTracker.getInProgressLogSubDirectoryName(), 
haGroupName);
         // Increase the count for number of times in progress directory is 
processed
         getMetrics().incrementNumInProgressDirectoryProcessed();
-        LOG.info("Starting {} directory processing", 
replicationLogTracker.getInProgressLogSubDirectoryName());
         long startTime = EnvironmentEdgeManager.currentTime();
         long oldestTimestampToProcess = 
replicationLogTracker.getReplicationShardDirectoryManager()
                 
.getNearestRoundStartTimestamp(EnvironmentEdgeManager.currentTime())
                 - getReplayIntervalSeconds() * 1000L;
         List<Path> files = 
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
-        LOG.info("Number of {} files with oldestTimestampToProcess {} is {}",
+        LOG.info("Number of {} files with oldestTimestampToProcess {} is {} 
for haGroup: {}",
                 replicationLogTracker.getInProgressLogSubDirectoryName(), 
oldestTimestampToProcess,
-                files.size());
+                files.size(), haGroupName);
         while (!files.isEmpty()) {
             processOneRandomFile(files);
             files = 
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
         }
         long duration = EnvironmentEdgeManager.currentTime() - startTime;
-        LOG.info("Finished in-progress files processing in {}ms", duration);
+        LOG.info("Finished in-progress files processing in {}ms for haGroup: 
{}",
+                duration, haGroupName);
         getMetrics().updateTimeToProcessInProgressFiles(duration);
     }
 
@@ -364,21 +369,26 @@ public abstract class ReplicationLogDiscovery {
         Optional<Long> minTimestampFromInProgressFiles =
                 getMinTimestampFromInProgressFiles();
         if (minTimestampFromInProgressFiles.isPresent()) {
-            LOG.info("Initializing lastRoundProcessed from {} files with 
minimum "
-                    + "timestamp as {}", 
replicationLogTracker.getInProgressLogSubDirectoryName(), 
minTimestampFromInProgressFiles.get());
+            LOG.info("Initializing lastRoundProcessed for haGroup: {} from {} 
files with minimum "
+                    + "timestamp as {}", haGroupName,
+                    replicationLogTracker.getInProgressLogSubDirectoryName(),
+                    minTimestampFromInProgressFiles.get());
             this.lastRoundProcessed = 
replicationLogTracker.getReplicationShardDirectoryManager()
                     
.getReplicationRoundFromEndTime(minTimestampFromInProgressFiles.get());
         } else {
             Optional<Long> minTimestampFromNewFiles = 
getMinTimestampFromNewFiles();
             if (minTimestampFromNewFiles.isPresent()) {
-                LOG.info("Initializing lastRoundProcessed from {} files with 
minimum timestamp "
-                        + "as {}", 
replicationLogTracker.getInSubDirectoryName(), minTimestampFromNewFiles.get());
+                LOG.info("Initializing lastRoundProcessed for haGroup: {} from 
{}"
+                        + "files with minimum timestamp as {}", haGroupName,
+                        replicationLogTracker.getInSubDirectoryName(),
+                        minTimestampFromNewFiles.get());
                 this.lastRoundProcessed = replicationLogTracker
                         .getReplicationShardDirectoryManager()
                         
.getReplicationRoundFromEndTime(minTimestampFromNewFiles.get());
             } else {
                 long currentTime = EnvironmentEdgeManager.currentTime();
-                LOG.info("Initializing lastRoundProcessed from current time 
{}", currentTime);
+                LOG.info("Initializing lastRoundProcessed for haGroup: {} from 
current time {}",
+                        haGroupName, currentTime);
                 this.lastRoundProcessed = replicationLogTracker
                         .getReplicationShardDirectoryManager()
                         
.getReplicationRoundFromEndTime(EnvironmentEdgeManager.currentTime());
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
index 6d16d9ed8a..5838a9de4b 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
@@ -116,7 +116,7 @@ public class ReplicationLogTracker {
      * @return List of valid log file paths that belong to the specified 
replication round
      * @throws IOException if there's an error accessing the file system
      */
-    protected List<Path> getNewFilesForRound(ReplicationRound 
replicationRound) throws IOException {
+    public List<Path> getNewFilesForRound(ReplicationRound replicationRound) 
throws IOException {
         Path roundDirectory = 
replicationShardDirectoryManager.getShardDirectory(replicationRound);
         LOG.info("Getting new files for round {} from shard {}", 
replicationRound, roundDirectory);
         if (!fileSystem.exists(roundDirectory)) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
index e5fd495534..7010131b36 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
@@ -175,7 +175,7 @@ public class ReplicationShardDirectoryManager {
      * @param timestamp The timestamp in milliseconds since epoch
      * @return The nearest replication round start timestamp
      */
-    protected long getNearestRoundStartTimestamp(long timestamp) {
+    public long getNearestRoundStartTimestamp(long timestamp) {
         // Convert round time from seconds to milliseconds
         long roundTimeMs = replicationRoundDurationSeconds * 1000L;
 
@@ -184,6 +184,14 @@ public class ReplicationShardDirectoryManager {
         return (timestamp / roundTimeMs) * roundTimeMs;
     }
 
+    public ReplicationRound getPreviousRound(final ReplicationRound 
replicationRound) {
+        return getReplicationRoundFromEndTime(replicationRound.getStartTime());
+    }
+
+    public ReplicationRound getNextRound(final ReplicationRound 
replicationRound) {
+        return getReplicationRoundFromStartTime(replicationRound.getEndTime());
+    }
+
     public int getReplicationRoundDurationSeconds() {
         return this.replicationRoundDurationSeconds;
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index 93fd18c947..faf078fa7f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -18,8 +18,6 @@
 package org.apache.phoenix.replication.reader;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -136,6 +134,9 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
 
     @Override
     public void init() throws IOException {
+
+        LOG.info("Initializing ReplicationLogDiscoveryReplay for haGroup: {}", 
haGroupName);
+
         HAGroupStateListener degradedListener = (groupName, fromState, 
toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
             if (clusterType == ClusterType.LOCAL && 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(toState)) {
                 replicationReplayState.set(ReplicationReplayState.DEGRADED);
@@ -223,7 +224,10 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
      */
     @Override
     protected void initializeLastRoundProcessed() throws IOException {
+        LOG.info("Initializing last round processed for haGroup: {}", 
haGroupName);
         HAGroupStoreRecord haGroupStoreRecord = getHAGroupRecord();
+        LOG.info("Found HA Group state during initialization as {} for 
haGroup: {}",
+                haGroupStoreRecord.getHAGroupState(), haGroupName);
         if 
(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(haGroupStoreRecord.getHAGroupState()))
 {
             
replicationReplayState.compareAndSet(ReplicationReplayState.NOT_INITIALIZED,
                     ReplicationReplayState.DEGRADED);
@@ -298,6 +302,8 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
         LOG.info("Starting replay with lastRoundProcessed={}, 
lastRoundInSync={}",
                 lastRoundProcessed, lastRoundInSync);
         Optional<ReplicationRound> optionalNextRound = 
getFirstRoundToProcess();
+        LOG.info("Found first round to process as {} for haGroup: {}",
+                optionalNextRound, haGroupName);
         while (optionalNextRound.isPresent()) {
             ReplicationRound replicationRound = optionalNextRound.get();
             try {
@@ -314,9 +320,14 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
             switch (currentState) {
             case SYNCED_RECOVERY:
                 // Rewind to last in-sync round
-                LOG.info("SYNCED_RECOVERY detected, rewinding to 
lastRoundInSync={}",
+                LOG.info("SYNCED_RECOVERY detected, rewinding with 
lastRoundInSync={}",
                         lastRoundInSync);
-                setLastRoundProcessed(lastRoundInSync);
+                Optional<ReplicationRound> firstRoundToProcess = 
getFirstRoundToProcess();
+                LOG.info("Calculated first round to process after 
SYNCED_RECOVERY as"
+                        + "{}", firstRoundToProcess);
+                firstRoundToProcess.ifPresent(round -> setLastRoundProcessed(
+                        
replicationLogTracker.getReplicationShardDirectoryManager()
+                                .getPreviousRound(round)));
                 // Only reset to NORMAL if state hasn't been flipped to 
DEGRADED
                 
replicationReplayState.compareAndSet(ReplicationReplayState.SYNCED_RECOVERY,
                         ReplicationReplayState.SYNC);
@@ -348,17 +359,10 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
         if (!optionalNextRound.isPresent() && shouldTriggerFailover()) {
             LOG.info("No more rounds to process, lastRoundInSync={}, 
lastRoundProcessed={}. "
                             + "Failover is triggered & in progress directory 
is empty. "
-                            + "Marking cluster state as {}",
+                            + "Attempting to mark cluster state as {}",
                     lastRoundInSync, lastRoundProcessed,
                     HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
-            try {
-                triggerFailover();
-                LOG.info("Successfully updated the cluster state");
-                failoverPending.set(false);
-            } catch (InvalidClusterRoleTransitionException exception) {
-                LOG.warn("Failed to update the cluster state.", exception);
-                failoverPending.set(false);
-            }
+            triggerFailover();
         }
     }
 
@@ -372,8 +376,15 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
      * @return Optional containing the first round to process, or empty if not 
enough time
      *         has passed
      */
-    private Optional<ReplicationRound> getFirstRoundToProcess() {
-        long lastRoundEndTimestamp = getLastRoundInSync().getEndTime();
+    private Optional<ReplicationRound> getFirstRoundToProcess() throws 
IOException {
+        ReplicationRound lastRoundInSync = getLastRoundInSync();
+        long lastRoundEndTimestamp = lastRoundInSync.getEndTime();
+        if (lastRoundInSync.getStartTime() == 0) {
+            Optional<Long> optionalMinimumNewFilesTimestamp = 
getMinTimestampFromNewFiles();
+            lastRoundEndTimestamp = 
replicationLogTracker.getReplicationShardDirectoryManager()
+                    
.getNearestRoundStartTimestamp(optionalMinimumNewFilesTimestamp
+                            .orElseGet(EnvironmentEdgeManager::currentTime));
+        }
         long currentTime = EnvironmentEdgeManager.currentTime();
         if (currentTime - lastRoundEndTimestamp < roundTimeMills + 
bufferMillis) {
             // nothing more to process
@@ -457,17 +468,39 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
         return optionalHAGroupStateRecord.get();
     }
 
+    /**
+     * Determines whether failover should be triggered based on completion 
criteria.
+     *
+     * Failover is safe to trigger when all of the following conditions are 
met:
+     * 1. A failover has been requested (failoverPending is true)
+     * 2. No files are currently in the in-progress directory
+     * 3. No new files exist for ongoing round
+     *
+     * These conditions ensure all replication logs have been processed before 
transitioning
+     * the cluster from STANDBY to ACTIVE state.
+     *
+     * @return true if all conditions are met and failover should be 
triggered, false otherwise
+     * @throws IOException if there's an error checking file status
+     */
     protected boolean shouldTriggerFailover() throws IOException {
-        return failoverPending.get() && 
lastRoundInSync.equals(lastRoundProcessed)
-                && replicationLogTracker.getInProgressFiles().isEmpty()
-                ;
-        // TODO: Check for in files of lastRoundProcessed, lastRoundProcessed 
+ roundTime as well - because there can be new files for upcoming round
+        return failoverPending.get() && 
replicationLogTracker.getInProgressFiles().isEmpty()
+                && 
replicationLogTracker.getNewFilesForRound(replicationLogTracker
+                .getReplicationShardDirectoryManager()
+                .getNextRound(getLastRoundProcessed())).isEmpty();
     }
 
-    protected void triggerFailover() throws IOException, 
InvalidClusterRoleTransitionException {
-        // TODO: Update cluster state to ACTIVE_IN_SYNC within try block
-        // (once API is supported in HA Store)
-        // Throw any exception back to caller.
+    protected void triggerFailover() {
+        try {
+            
HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName);
+            failoverPending.set(false);
+        } catch (InvalidClusterRoleTransitionException 
invalidClusterRoleTransitionException) {
+            LOG.warn("Failed to update the cluster state due to"
+                    + "InvalidClusterRoleTransitionException. Setting 
failoverPending"
+                    + "to false.", invalidClusterRoleTransitionException);
+            failoverPending.set(false);
+        } catch (Exception exception) {
+            LOG.error("Failed to update the cluster state.", exception);
+        }
     }
 
     public enum ReplicationReplayState {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index 4a47779d70..e44bcc5ba9 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -217,6 +217,8 @@ public class ReplicationLogProcessor implements Closeable {
 
     public void processLogFile(FileSystem fs, Path filePath) throws 
IOException {
 
+        LOG.info("Starting to process file {}", filePath);
+
         // Map from Table Name to List of Mutations
         Map<TableName, List<Mutation>> tableToMutationsMap = new HashMap<>();
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
index 5f12573403..587f9bdb02 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
@@ -107,6 +107,7 @@ public class ReplicationLogReplay {
      * @throws IOException if there's an error during initialization
      */
     protected void init() throws IOException {
+        LOG.info("Initializing ReplicationLogReplay for haGroup: {}", 
haGroupName);
         initializeFileSystem();
         Path newFilesDirectory = new Path(new Path(rootURI.getPath(), 
haGroupName), ReplicationLogReplay.IN_DIRECTORY_NAME);
         ReplicationShardDirectoryManager replicationShardDirectoryManager =
@@ -121,6 +122,7 @@ public class ReplicationLogReplay {
     }
 
     public void close() {
+        LOG.info("Closing ReplicationLogReplay for haGroup: {}", haGroupName);
         replicationLogDiscoveryReplay.getReplicationLogFileTracker().close();
         replicationLogDiscoveryReplay.close();
         // Remove the instance from cache
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
index 7df307f762..fef11daec6 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
@@ -195,6 +195,8 @@ public class ReplicationLogReplayService {
      */
     protected void startReplicationReplay() throws IOException, SQLException {
         List<String> replicationGroups = getReplicationGroups();
+        LOG.info("{} number of HA Groups found to start Replication Replay",
+                replicationGroups.size());
         for (String replicationGroup : replicationGroups) {
             ReplicationLogReplay.get(conf, replicationGroup).startReplay();
         }
@@ -205,6 +207,8 @@ public class ReplicationLogReplayService {
      */
     protected void stopReplicationReplay() throws IOException, SQLException {
         List<String> replicationGroups = getReplicationGroups();
+        LOG.info("{} number of HA Groups found to stop Replication Replay",
+                replicationGroups.size());
         for (String replicationGroup : replicationGroups) {
             ReplicationLogReplay replicationLogReplay =
                     ReplicationLogReplay.get(conf, replicationGroup);
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTestIT.java
similarity index 96%
rename from 
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
rename to 
phoenix-core/src/it/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTestIT.java
index 6df8577095..d688d55747 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTestIT.java
@@ -30,18 +30,21 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
 /**
  * Test our recoverLease loop against mocked up filesystem.
  */
-public class RecoverLeaseFSUtilsTest extends ParallelStatsDisabledIT {
+@Category(NeedsOwnMiniClusterTest.class)
+public class RecoverLeaseFSUtilsTestIT extends ParallelStatsDisabledIT {
 
     @ClassRule
     public static TemporaryFolder testFolder = new TemporaryFolder();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTest.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
similarity index 72%
rename from 
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTest.java
rename to 
phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index b0bb1d7e98..ca1171d76f 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTest.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -19,9 +19,11 @@ package org.apache.phoenix.replication.reader;
 
 import java.io.IOException;
 import java.net.URI;
+import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.jdbc.HAGroupStoreRecord;
 import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
 import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
@@ -65,9 +68,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.when;
 
 @Category(NeedsOwnMiniClusterTest.class)
-public class ReplicationLogDiscoveryReplayTest extends BaseTest {
+public class ReplicationLogDiscoveryReplayTestIT extends BaseTest {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogDiscoveryReplayTest.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogDiscoveryReplayTestIT.class);
 
     private static final 
HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new 
HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
     private String zkUrl;
@@ -94,7 +97,7 @@ public class ReplicationLogDiscoveryReplayTest extends 
BaseTest {
     public void setUp() throws Exception {
         zkUrl = getLocalZkUrl(config);
         peerZkUrl = CLUSTERS.getZkUrl2();
-        
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), 
zkUrl, peerZkUrl,
+        HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, 
zkUrl, peerZkUrl,
                 CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
                 ClusterRoleRecord.ClusterRole.ACTIVE, 
ClusterRoleRecord.ClusterRole.STANDBY,
                 null);
@@ -1191,6 +1194,354 @@ public class ReplicationLogDiscoveryReplayTest extends 
BaseTest {
         }
     }
 
+    /**
+     * Tests replay method when lastRoundInSync is not set, i.e. has start 
time 0.
+     * Validates that getFirstRoundToProcess uses minimum timestamp from new 
files
+     */
+    @Test
+    public void testReplay_LastRoundInSync_NotInitialized() throws IOException 
{
+        TestableReplicationLogTracker fileTracker = 
createReplicationLogTracker(config, haGroupName, localFs, standbyUri);
+
+        try {
+            long roundTimeMills = 
fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds()
 * 1000L;
+            long bufferMillis = (long) (roundTimeMills * 0.15);
+
+            // Create new files with specific timestamps
+            // The minimum timestamp should be used as the basis for the first 
round
+            long minFileTimestamp = 1704067200000L + (5 * roundTimeMills) + 
(30 * 1000L); // 2024-01-01 00:05:30
+            long maxFileTimestamp = 1704067200000L + (8 * roundTimeMills) + 
(45 * 1000L); // 2024-01-01 00:08:45
+            long middleFileTimestamp = 1704067200000L + (7 * roundTimeMills) + 
(15 * 1000L); // 2024-01-01 00:07:15
+
+            // Create files in different shard directories
+            ReplicationShardDirectoryManager shardManager = 
fileTracker.getReplicationShardDirectoryManager();
+
+            // Create file with minimum timestamp
+            Path shardPath1 = shardManager.getShardDirectory(minFileTimestamp);
+            localFs.mkdirs(shardPath1);
+            Path file1 = new Path(shardPath1, minFileTimestamp + "_rs-1.plog");
+            localFs.create(file1, true).close();
+
+            // Create file with middle timestamp
+            Path shardPath2 = 
shardManager.getShardDirectory(middleFileTimestamp);
+            localFs.mkdirs(shardPath2);
+            Path file2 = new Path(shardPath2, middleFileTimestamp + 
"_rs-2.plog");
+            localFs.create(file2, true).close();
+
+            // Create file with maximum timestamp
+            Path shardPath3 = shardManager.getShardDirectory(maxFileTimestamp);
+            localFs.mkdirs(shardPath3);
+            Path file3 = new Path(shardPath3, maxFileTimestamp + "_rs-3.plog");
+            localFs.create(file3, true).close();
+
+            // Create HAGroupStoreRecord for STANDBY state
+            HAGroupStoreRecord mockRecord = new 
HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName,
+                    HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(), peerZkUrl, zkUrl, peerZkUrl, 0L);
+
+            // Calculate expected first round start time (minimum timestamp 
rounded down to nearest round start)
+            long expectedFirstRoundStart = 
shardManager.getNearestRoundStartTimestamp(minFileTimestamp);
+            long expectedFirstRoundEnd = expectedFirstRoundStart + 
roundTimeMills;
+
+            // Set current time to allow processing exactly 5 rounds
+            // Round 1 ends at expectedFirstRoundEnd
+            // Round 2 ends at expectedFirstRoundEnd + roundTimeMills
+            // Round 3 ends at expectedFirstRoundEnd + 2*roundTimeMills
+            // Round 4 ends at expectedFirstRoundEnd + 3*roundTimeMills
+            // Round 5 ends at expectedFirstRoundEnd + 4*roundTimeMills
+            // Round 6 starts at expectedFirstRoundEnd + 4*roundTimeMills
+            // For round 6 to be processable: currentTime - 
(expectedFirstRoundEnd + 4*roundTimeMills) >= roundTimeMills + bufferMillis
+            // So: currentTime >= expectedFirstRoundEnd + 5*roundTimeMills + 
bufferMillis
+            // For round 6 NOT to be processable: currentTime - 
(expectedFirstRoundEnd + 5*roundTimeMills) < roundTimeMills + bufferMillis
+            // So exactly 5 rounds should be processed with currentTime = 
expectedFirstRoundEnd + 5*roundTimeMills + bufferMillis - 1
+            // To be safe, use: currentTime = expectedFirstRoundEnd + 
5*roundTimeMills + bufferMillis - 1000
+            long currentTime = expectedFirstRoundEnd + (5 * roundTimeMills) + 
bufferMillis - 1000;
+            EnvironmentEdge edge = () -> currentTime;
+            EnvironmentEdgeManager.injectEdge(edge);
+
+            // Calculate exact number of rounds that should be processed
+            // Round 1: from getFirstRoundToProcess (based on lastRoundInSync 
with start time 0)
+            //   - Uses minFileTimestamp rounded down = expectedFirstRoundStart
+            //   - Round 1: expectedFirstRoundStart to expectedFirstRoundEnd
+            // Round 2+: from getNextRoundToProcess (based on previous round's 
end time)
+            //   - Round 2: expectedFirstRoundEnd to expectedFirstRoundEnd + 
roundTimeMills
+            //   - Round 3: expectedFirstRoundEnd + roundTimeMills to 
expectedFirstRoundEnd + 2*roundTimeMills
+            //   - Round 4: expectedFirstRoundEnd + 2*roundTimeMills to 
expectedFirstRoundEnd + 3*roundTimeMills
+            //   - Round 5: expectedFirstRoundEnd + 3*roundTimeMills to 
expectedFirstRoundEnd + 4*roundTimeMills
+            int expectedRoundCount = 5;
+            long[] expectedRoundStarts = new long[expectedRoundCount];
+            long[] expectedRoundEnds = new long[expectedRoundCount];
+            expectedRoundStarts[0] = expectedFirstRoundStart;
+            expectedRoundEnds[0] = expectedFirstRoundEnd;
+            for (int i = 1; i < expectedRoundCount; i++) {
+                expectedRoundStarts[i] = expectedFirstRoundEnd + ((i - 1) * 
roundTimeMills);
+                expectedRoundEnds[i] = expectedRoundStarts[i] + roundTimeMills;
+            }
+
+            try {
+                TestableReplicationLogDiscoveryReplay discovery =
+                        new TestableReplicationLogDiscoveryReplay(fileTracker, 
mockRecord);
+
+                // Set lastRoundInSync with start time 0 (the new case being 
tested)
+                ReplicationRound lastRoundInSyncWithZeroStart = new 
ReplicationRound(0L, roundTimeMills);
+                discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
+
+                // Set lastRoundProcessed to some initial value
+                ReplicationRound initialLastRoundProcessed = new 
ReplicationRound(0L, roundTimeMills);
+                discovery.setLastRoundProcessed(initialLastRoundProcessed);
+                
discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC);
+
+                // Verify initial state
+                ReplicationRound lastRoundInSyncBefore = 
discovery.getLastRoundInSync();
+                assertEquals("lastRoundInSync should have start time 0 before 
replay", 0L, lastRoundInSyncBefore.getStartTime());
+
+                // Call replay - should start from minimum timestamp of new 
files
+                discovery.replay();
+
+                // Verify exact count of processRound calls
+                int actualProcessRoundCallCount = 
discovery.getProcessRoundCallCount();
+                assertEquals("processRound should be called exactly " + 
expectedRoundCount + " times",
+                        expectedRoundCount, actualProcessRoundCallCount);
+
+                // Get all processed rounds
+                List<ReplicationRound> processedRounds = 
discovery.getProcessedRounds();
+                assertEquals("Should have processed exactly " + 
expectedRoundCount + " rounds",
+                        expectedRoundCount, processedRounds.size());
+
+                // Verify each round's exact parameters
+                for (int i = 0; i < expectedRoundCount; i++) {
+                    ReplicationRound actualRound = processedRounds.get(i);
+                    assertEquals("Round " + (i + 1) + " should have correct 
start time",
+                            expectedRoundStarts[i], 
actualRound.getStartTime());
+                    assertEquals("Round " + (i + 1) + " should have correct 
end time",
+                            expectedRoundEnds[i], actualRound.getEndTime());
+                }
+
+                // Verify lastRoundProcessed was updated to the last processed 
round
+                ReplicationRound lastRoundAfterReplay = 
discovery.getLastRoundProcessed();
+                assertNotNull("Last round processed should not be null", 
lastRoundAfterReplay);
+                assertEquals("Last round processed should match the last 
processed round start time",
+                        expectedRoundStarts[expectedRoundCount - 1], 
lastRoundAfterReplay.getStartTime());
+                assertEquals("Last round processed should match the last 
processed round end time",
+                        expectedRoundEnds[expectedRoundCount - 1], 
lastRoundAfterReplay.getEndTime());
+
+                // Verify lastRoundInSync was updated (in SYNC state, both 
should advance together)
+                ReplicationRound lastRoundInSyncAfter = 
discovery.getLastRoundInSync();
+                assertNotNull("Last round in sync should not be null", 
lastRoundInSyncAfter);
+                assertEquals("Last round in sync should match last round 
processed in SYNC state",
+                        lastRoundAfterReplay, lastRoundInSyncAfter);
+                assertEquals("Last round in sync should have correct start 
time",
+                        expectedRoundStarts[expectedRoundCount - 1], 
lastRoundInSyncAfter.getStartTime());
+                assertEquals("Last round in sync should have correct end time",
+                        expectedRoundEnds[expectedRoundCount - 1], 
lastRoundInSyncAfter.getEndTime());
+
+                // Verify state remains SYNC
+                assertEquals("State should remain SYNC",
+                        
ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC,
+                        discovery.getReplicationReplayState());
+
+            } finally {
+                EnvironmentEdgeManager.reset();
+            }
+        } finally {
+            fileTracker.close();
+        }
+    }
+
+    /**
+     * Tests replay method when state changes from DEGRADED to SYNC between 
round processing
+     * and lastRoundInSync has start time 0.
+     *
+     * Expected behavior:
+     * - 1st round is processed in DEGRADED state
+     * - 2nd round is processed in DEGRADED state, but then state transitions 
to SYNCED_RECOVERY
+     * - When state becomes SYNCED_RECOVERY, lastRoundProcessed is rewound to 
the previous round of getFirstRoundToProcess()
+     *   (which uses minimum file timestamp when lastRoundInSync.startTime == 
0)
+     * - Then 5 rounds are processed starting from the first round (based on 
minimum file timestamp) in SYNC state
+     *
+     * Total: 7 rounds processed (2 in DEGRADED, then rewind, then 5 in SYNC)
+     */
+    @Test
+    public void testReplay_DegradedToSync_LastRoundInSyncStartTimeZero() 
throws IOException {
+        TestableReplicationLogTracker fileTracker = 
createReplicationLogTracker(config, haGroupName, localFs, standbyUri);
+
+        try {
+            long roundTimeMills = 
fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds()
 * 1000L;
+            long bufferMillis = (long) (roundTimeMills * 0.15);
+
+            // Create new files with specific timestamps
+            // The minimum timestamp should be used as the basis for the first 
round
+            long minFileTimestamp = 1704067200000L + (5 * roundTimeMills) + 
(30 * 1000L); // 2024-01-01 00:05:30
+            long maxFileTimestamp = 1704067200000L + (8 * roundTimeMills) + 
(45 * 1000L); // 2024-01-01 00:08:45
+
+            // Create files in different shard directories
+            ReplicationShardDirectoryManager shardManager = 
fileTracker.getReplicationShardDirectoryManager();
+
+            // Create file with minimum timestamp
+            Path shardPath1 = shardManager.getShardDirectory(minFileTimestamp);
+            localFs.mkdirs(shardPath1);
+            Path file1 = new Path(shardPath1, minFileTimestamp + "_rs-1.plog");
+            localFs.create(file1, true).close();
+
+            // Create file with maximum timestamp
+            Path shardPath2 = shardManager.getShardDirectory(maxFileTimestamp);
+            localFs.mkdirs(shardPath2);
+            Path file2 = new Path(shardPath2, maxFileTimestamp + "_rs-2.plog");
+            localFs.create(file2, true).close();
+
+            // Create HAGroupStoreRecord for STANDBY state
+            HAGroupStoreRecord mockRecord = new 
HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName,
+                    HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(), peerZkUrl, zkUrl, peerZkUrl, 0L);
+
+            // Calculate expected first round start time (minimum timestamp 
rounded down to nearest round start)
+            long expectedFirstRoundStart = 
shardManager.getNearestRoundStartTimestamp(minFileTimestamp);
+            long expectedFirstRoundEnd = expectedFirstRoundStart + 
roundTimeMills;
+
+            // After rewind, lastRoundProcessed will be set to previous round 
of firstRoundToProcess
+            // firstRoundToProcess = expectedFirstRoundStart to 
expectedFirstRoundEnd
+            // previous round = expectedFirstRoundStart - roundTimeMills to 
expectedFirstRoundStart
+            // Then getNextRoundToProcess will return the first round starting 
from expectedFirstRoundStart
+            // So the first SYNC round after rewind will be 
expectedFirstRoundStart to expectedFirstRoundEnd
+
+            // Set current time to allow processing 7 rounds total (2 in 
DEGRADED, then rewind, then 5 in SYNC)
+            // After rewind, we need time for 5 more rounds starting from 
expectedFirstRoundStart
+            // Round 7 (the 5th round after rewind) will end at 
expectedFirstRoundEnd + 4*roundTimeMills
+            //   = expectedFirstRoundStart + roundTimeMills + 4*roundTimeMills 
= expectedFirstRoundStart + 5*roundTimeMills
+            // For round 7 to be processable: currentTime - 
(expectedFirstRoundEnd + 4*roundTimeMills) >= roundTimeMills + bufferMillis
+            // So: currentTime >= expectedFirstRoundStart + 5*roundTimeMills + 
bufferMillis
+            // Set it slightly above to ensure round 7 is processable
+            long currentTime = expectedFirstRoundStart + (5 * roundTimeMills) 
+ bufferMillis + 1000;
+            EnvironmentEdge edge = () -> currentTime;
+            EnvironmentEdgeManager.injectEdge(edge);
+
+            // Calculate expected rounds
+            // Rounds 1-2: In DEGRADED state
+            //   Round 1: expectedFirstRoundStart to expectedFirstRoundEnd 
(DEGRADED)
+            //   Round 2: expectedFirstRoundEnd to expectedFirstRoundEnd + 
roundTimeMills (DEGRADED, transitions to SYNCED_RECOVERY at end)
+            // After round 2, state becomes SYNCED_RECOVERY and causes rewind
+            //   - getFirstRoundToProcess returns: expectedFirstRoundStart to 
expectedFirstRoundEnd
+            //   - lastRoundProcessed is set to: expectedFirstRoundStart - 
roundTimeMills to expectedFirstRoundStart
+            // Rounds 3-7: In SYNC state (starting from first round based on 
minimum file timestamp)
+            //   Round 3: expectedFirstRoundStart to expectedFirstRoundEnd 
(SYNC, after rewind - first round)
+            //   Round 4: expectedFirstRoundEnd to expectedFirstRoundEnd + 
roundTimeMills (SYNC)
+            //   Round 5: expectedFirstRoundEnd + roundTimeMills to 
expectedFirstRoundEnd + 2*roundTimeMills (SYNC)
+            //   Round 6: expectedFirstRoundEnd + 2*roundTimeMills to 
expectedFirstRoundEnd + 3*roundTimeMills (SYNC)
+            //   Round 7: expectedFirstRoundEnd + 3*roundTimeMills to 
expectedFirstRoundEnd + 4*roundTimeMills (SYNC)
+            int expectedRoundCount = 7;
+            int degradedRoundCount = 2; // First 2 rounds are in DEGRADED state
+            int syncRoundCount = 5; // Last 5 rounds are in SYNC state (after 
rewind)
+            long[] expectedRoundStarts = new long[expectedRoundCount];
+            long[] expectedRoundEnds = new long[expectedRoundCount];
+
+            // Rounds 1-2: DEGRADED
+            expectedRoundStarts[0] = expectedFirstRoundStart;
+            expectedRoundEnds[0] = expectedFirstRoundEnd;
+            expectedRoundStarts[1] = expectedFirstRoundEnd;
+            expectedRoundEnds[1] = expectedFirstRoundEnd + roundTimeMills;
+
+            // Rounds 3-7: SYNC (after rewind, starting from 
expectedFirstRoundStart - the first round based on minimum file timestamp)
+            expectedRoundStarts[2] = expectedFirstRoundStart; // Round 3: 
first round after rewind
+            expectedRoundEnds[2] = expectedFirstRoundEnd;
+            for (int i = 3; i < expectedRoundCount; i++) {
+                expectedRoundStarts[i] = expectedFirstRoundEnd + ((i - 3) * 
roundTimeMills);
+                expectedRoundEnds[i] = expectedRoundStarts[i] + roundTimeMills;
+            }
+
+            try {
+                // Create discovery that transitions from DEGRADED to 
SYNCED_RECOVERY after 2 rounds
+                // SYNCED_RECOVERY triggers a rewind, then transitions to SYNC
+                TestableReplicationLogDiscoveryReplay discovery =
+                        new TestableReplicationLogDiscoveryReplay(fileTracker, 
mockRecord) {
+                    private int roundCount = 0;
+
+                    @Override
+                    protected void processRound(ReplicationRound 
replicationRound) throws IOException {
+                        super.processRound(replicationRound);
+                        roundCount++;
+
+                        // Transition to SYNCED_RECOVERY after 2 rounds (after 
processing round 2)
+                        // This triggers a rewind similar to when coming back 
from DEGRADED to SYNC
+                        if (roundCount == degradedRoundCount) {
+                            
setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY);
+                        }
+                    }
+                };
+
+                // Set lastRoundInSync with start time 0
+                ReplicationRound lastRoundInSyncWithZeroStart = new 
ReplicationRound(0L, roundTimeMills);
+                discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
+
+                // Set lastRoundProcessed to some initial value
+                ReplicationRound initialLastRoundProcessed = new 
ReplicationRound(0L, roundTimeMills);
+                discovery.setLastRoundProcessed(initialLastRoundProcessed);
+                
discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED);
+
+                // Verify initial state
+                ReplicationRound lastRoundInSyncBefore = 
discovery.getLastRoundInSync();
+                assertEquals("lastRoundInSync should have start time 0 before 
replay", 0L, lastRoundInSyncBefore.getStartTime());
+                assertEquals("Initial state should be DEGRADED",
+                        
ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED,
+                        discovery.getReplicationReplayState());
+
+                // Call replay - should start from minimum timestamp of new 
files
+                discovery.replay();
+
+                // Verify exact count of processRound calls
+                int actualProcessRoundCallCount = 
discovery.getProcessRoundCallCount();
+                assertEquals("processRound should be called exactly " + 
expectedRoundCount + " times",
+                        expectedRoundCount, actualProcessRoundCallCount);
+
+                // Get all processed rounds
+                List<ReplicationRound> processedRounds = 
discovery.getProcessedRounds();
+                assertEquals("Should have processed exactly " + 
expectedRoundCount + " rounds",
+                        expectedRoundCount, processedRounds.size());
+
+                // Verify each round's exact parameters
+                for (int i = 0; i < expectedRoundCount; i++) {
+                    ReplicationRound actualRound = processedRounds.get(i);
+                    assertEquals("Round " + (i + 1) + " should have correct 
start time",
+                            expectedRoundStarts[i], 
actualRound.getStartTime());
+                    assertEquals("Round " + (i + 1) + " should have correct 
end time",
+                            expectedRoundEnds[i], actualRound.getEndTime());
+                }
+
+                // Verify lastRoundProcessed was updated to the last processed 
round
+                ReplicationRound lastRoundAfterReplay = 
discovery.getLastRoundProcessed();
+                assertNotNull("Last round processed should not be null", 
lastRoundAfterReplay);
+                assertEquals("Last round processed should match the last 
processed round start time",
+                        expectedRoundStarts[expectedRoundCount - 1], 
lastRoundAfterReplay.getStartTime());
+                assertEquals("Last round processed should match the last 
processed round end time",
+                        expectedRoundEnds[expectedRoundCount - 1], 
lastRoundAfterReplay.getEndTime());
+
+                // Verify lastRoundInSync behavior:
+                // - During first 2 rounds (DEGRADED), lastRoundInSync should 
NOT be updated (remains with start time 0)
+                // - After transition to SYNCED_RECOVERY and then SYNC (rounds 
3-7), lastRoundInSync should be updated
+                // - Final lastRoundInSync should match lastRoundProcessed 
(because we're in SYNC state at the end)
+                ReplicationRound lastRoundInSyncAfter = 
discovery.getLastRoundInSync();
+                assertNotNull("Last round in sync should not be null", 
lastRoundInSyncAfter);
+
+                // After processing, we're in SYNC state, so lastRoundInSync 
should be updated
+                // It should match the last processed round because state is 
SYNC
+                assertEquals("Last round in sync should match last round 
processed in SYNC state",
+                        lastRoundAfterReplay, lastRoundInSyncAfter);
+                assertEquals("Last round in sync should have correct start 
time",
+                        expectedRoundStarts[expectedRoundCount - 1], 
lastRoundInSyncAfter.getStartTime());
+                assertEquals("Last round in sync should have correct end time",
+                        expectedRoundEnds[expectedRoundCount - 1], 
lastRoundInSyncAfter.getEndTime());
+                assertTrue("Last round in sync start time should not be 0 
after processing in SYNC state",
+                        lastRoundInSyncAfter.getStartTime() > 0);
+
+                // Verify state transitioned to SYNC
+                assertEquals("State should be SYNC after transition",
+                        
ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC,
+                        discovery.getReplicationReplayState());
+
+            } finally {
+                EnvironmentEdgeManager.reset();
+            }
+        } finally {
+            fileTracker.close();
+        }
+    }
+
     /**
      * Tests replay method when failoverPending becomes true during processing 
and triggers failover after all rounds.
      * Validates that triggerFailover is called exactly once when all 
conditions are met.
@@ -1299,7 +1650,7 @@ public class ReplicationLogDiscoveryReplayTest extends 
BaseTest {
 
     /**
      * Tests the shouldTriggerFailover method with various combinations of 
failoverPending,
-     * lastRoundInSync, lastRoundProcessed and in-progress files state.
+     * in-progress files, and new files for next round.
      */
     @Test
     public void testShouldTriggerFailover() throws IOException {
@@ -1317,11 +1668,12 @@ public class ReplicationLogDiscoveryReplayTest extends 
BaseTest {
         try {
             // Create test rounds
             ReplicationRound testRound = new ReplicationRound(1704153600000L, 
1704153660000L);
-            ReplicationRound differentRound = new 
ReplicationRound(1704153540000L, 1704153600000L);
+            ReplicationRound nextRound = 
tracker.getReplicationShardDirectoryManager().getNextRound(testRound);
 
             // Test Case 1: All conditions true - should return true
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
@@ -1334,6 +1686,7 @@ public class ReplicationLogDiscoveryReplayTest extends 
BaseTest {
             // Test Case 2: failoverPending is false - should return false
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
@@ -1343,72 +1696,78 @@ public class ReplicationLogDiscoveryReplayTest extends 
BaseTest {
                         discovery.shouldTriggerFailover());
             }
 
-            // Test Case 3: lastRoundInSync not equals lastRoundProcessed - 
should return false
+            // Test Case 3: in-progress files not empty - should return false
             {
-                
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+                
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test.plog")));
+                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
-                discovery.setLastRoundProcessed(differentRound);
+                discovery.setLastRoundProcessed(testRound);
                 discovery.setFailoverPending(true);
 
-                assertFalse("Should not trigger failover when lastRoundInSync 
!= lastRoundProcessed",
+                assertFalse("Should not trigger failover when in-progress 
files are not empty",
                         discovery.shouldTriggerFailover());
             }
 
-            // Test Case 4: in-progress files not empty - should return false
+            // Test Case 4: new files exist for next round - should return 
false
             {
-                
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test.plog")));
+                
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
 Path("test.plog")));
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
                 discovery.setFailoverPending(true);
 
-                assertFalse("Should not trigger failover when in-progress 
files are not empty",
+                assertFalse("Should not trigger failover when new files exist 
for next round",
                         discovery.shouldTriggerFailover());
             }
 
-            // Test Case 5: failoverPending false AND lastRoundInSync != 
lastRoundProcessed - should return false
+            // Test Case 5: failoverPending false AND in-progress files not 
empty - should return false
             {
-                
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+                
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test.plog")));
+                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
-                discovery.setLastRoundProcessed(differentRound);
+                discovery.setLastRoundProcessed(testRound);
                 discovery.setFailoverPending(false);
 
-                assertFalse("Should not trigger failover when failoverPending 
is false and rounds differ",
+                assertFalse("Should not trigger failover when failoverPending 
is false and in-progress files exist",
                         discovery.shouldTriggerFailover());
             }
 
-            // Test Case 6: failoverPending false AND in-progress files not 
empty - should return false
+            // Test Case 6: failoverPending false AND new files exist - should 
return false
             {
-                
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test.plog")));
+                
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
 Path("test.plog")));
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
                 discovery.setFailoverPending(false);
 
-                assertFalse("Should not trigger failover when failoverPending 
is false and files exist",
+                assertFalse("Should not trigger failover when failoverPending 
is false and new files exist",
                         discovery.shouldTriggerFailover());
             }
 
-            // Test Case 7: lastRoundInSync != lastRoundProcessed AND 
in-progress files not empty - should return false
+            // Test Case 7: in-progress files not empty AND new files exist - 
should return false
             {
-                
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test.plog")));
+                
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test1.plog")));
+                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
 Path("test2.plog")));
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
-                discovery.setLastRoundProcessed(differentRound);
+                discovery.setLastRoundProcessed(testRound);
                 discovery.setFailoverPending(true);
 
-                assertFalse("Should not trigger failover when rounds differ 
and files exist",
+                assertFalse("Should not trigger failover when both in-progress 
and new files exist",
                         discovery.shouldTriggerFailover());
             }
 
             // Test Case 8: All conditions false - should return false
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test.plog")));
+                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
 Path("test2.plog")));
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
-                discovery.setLastRoundProcessed(differentRound);
+                discovery.setLastRoundProcessed(testRound);
                 discovery.setFailoverPending(false);
 
                 assertFalse("Should not trigger failover when all conditions 
are false",
@@ -1420,6 +1779,141 @@ public class ReplicationLogDiscoveryReplayTest extends 
BaseTest {
         }
     }
 
+    /**
+     * Tests the triggerFailover method to verify it properly updates cluster 
state
+     * and handles exceptions correctly.
+     */
+    @Test
+    public void testTriggerFailover() throws IOException, SQLException {
+        final String haGroupName = "testTriggerFailoverHAGroup";
+        // Set up HA group store record with STANDBY_TO_ACTIVE state for 
successful transition
+        HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, 
zkUrl, peerZkUrl,
+                CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+                ClusterRoleRecord.ClusterRole.STANDBY_TO_ACTIVE, 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                null);
+
+        TestableReplicationLogTracker fileTracker = null;
+        try {
+            fileTracker = createReplicationLogTracker(config, haGroupName, 
localFs, standbyUri);
+
+            // Create a spy of ReplicationLogDiscoveryReplay to test actual 
implementation
+            ReplicationLogDiscoveryReplay discovery = Mockito.spy(
+                    new ReplicationLogDiscoveryReplay(fileTracker));
+
+            // Initialize the discovery
+            discovery.init();
+
+            // Set failoverPending to true
+            discovery.setFailoverPending(true);
+            assertTrue("failoverPending should be true before 
triggerFailover", discovery.getFailoverPending());
+
+            // Verify initial state is STANDBY_TO_ACTIVE
+            Optional<HAGroupStoreRecord> recordBefore = 
HAGroupStoreManager.getInstance(config)
+                    .getHAGroupStoreRecord(haGroupName);
+            assertTrue("HA group record should exist", 
recordBefore.isPresent());
+            assertEquals("Initial state should be STANDBY_TO_ACTIVE",
+                    HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE,
+                    recordBefore.get().getHAGroupState());
+
+            // Call triggerFailover - should succeed and update cluster state 
to ACTIVE_IN_SYNC
+            discovery.triggerFailover();
+
+            // Verify triggerFailover was called once
+            Mockito.verify(discovery, Mockito.times(1)).triggerFailover();
+
+            // Verify cluster state is updated to ACTIVE_IN_SYNC after failover
+            Optional<HAGroupStoreRecord> recordAfter = 
HAGroupStoreManager.getInstance(config)
+                    .getHAGroupStoreRecord(haGroupName);
+            assertTrue("HA group record should exist after failover", 
recordAfter.isPresent());
+            assertEquals("Cluster state should be ACTIVE_IN_SYNC after 
successful failover",
+                    HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
+                    recordAfter.get().getHAGroupState());
+
+            // Verify failoverPending is set to false after successful failover
+            assertFalse("failoverPending should be set to false after 
successful triggerFailover",
+                    discovery.getFailoverPending());
+
+        } finally {
+            // Clean up
+            if (fileTracker != null) {
+                fileTracker.close();
+            }
+            try {
+                
HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, zkUrl);
+            } catch (Exception e) {
+                LOG.warn("Failed to clean up HA group store record", e);
+            }
+        }
+    }
+
+    /**
+     * Tests triggerFailover when 
HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName)
+     * throws InvalidClusterRoleTransitionException.
+     * Verifies that failoverPending is set to false even when the exception 
occurs.
+     *
+     * This test sets up the HA group in STANDBY state instead of 
STANDBY_TO_ACTIVE.
+     * Calling setHAGroupStatusToSync from STANDBY state will throw 
InvalidClusterRoleTransitionException
+     * because the transition from STANDBY directly to ACTIVE_IN_SYNC is not 
allowed.
+     */
+    @Test
+    public void 
testTriggerFailover_InvalidClusterRoleTransitionExceptionFromHAGroupStoreManager()
 throws IOException, SQLException {
+        final String haGroupName = 
"testTriggerFailoverInvalidTransitionHAGroup";
+
+        // Set up HA group store record in STANDBY state (not 
STANDBY_TO_ACTIVE)
+        // This will cause setHAGroupStatusToSync to throw 
InvalidClusterRoleTransitionException
+        // because transitioning from STANDBY directly to ACTIVE_IN_SYNC is 
invalid
+        HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, 
zkUrl, peerZkUrl,
+                CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+                ClusterRoleRecord.ClusterRole.STANDBY, 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                null);
+
+        TestableReplicationLogTracker fileTracker = null;
+        try {
+            fileTracker = createReplicationLogTracker(config, haGroupName, 
localFs, standbyUri);
+
+            // Create a spy of ReplicationLogDiscoveryReplay to test actual 
implementation
+            ReplicationLogDiscoveryReplay discovery = Mockito.spy(
+                    new ReplicationLogDiscoveryReplay(fileTracker));
+
+            // Initialize the discovery
+            discovery.init();
+
+            // Set failoverPending to true
+            discovery.setFailoverPending(true);
+            assertTrue("failoverPending should be true before 
triggerFailover", discovery.getFailoverPending());
+
+            // Verify initial state is STANDBY (not STANDBY_TO_ACTIVE)
+            Optional<HAGroupStoreRecord> recordBefore = 
HAGroupStoreManager.getInstance(config)
+                    .getHAGroupStoreRecord(haGroupName);
+            assertTrue("HA group record should exist", 
recordBefore.isPresent());
+            assertEquals("Initial state should be DEGRADED_STANDBY",
+                    HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY,
+                    recordBefore.get().getHAGroupState());
+
+            // Call triggerFailover - should handle 
InvalidClusterRoleTransitionException
+            // because transitioning from DEGRADED_STANDBY directly to 
ACTIVE_IN_SYNC is invalid
+            discovery.triggerFailover();
+
+            // Verify triggerFailover was called (implicitly through spy)
+            Mockito.verify(discovery, Mockito.times(1)).triggerFailover();
+
+            // Verify failoverPending is set to false even when 
InvalidClusterRoleTransitionException occurs
+            assertFalse("failoverPending should be set to false when 
InvalidClusterRoleTransitionException occurs",
+                    discovery.getFailoverPending());
+
+        } finally {
+            // Clean up
+            if (fileTracker != null) {
+                fileTracker.close();
+            }
+            try {
+                
HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, zkUrl);
+            } catch (Exception e) {
+                LOG.warn("Failed to clean up HA group store record", e);
+            }
+        }
+    }
+
     /**
      * Testable implementation of ReplicationLogDiscoveryReplay for unit 
testing.
      * Provides dependency injection for HAGroupStoreRecord, tracks processed 
rounds,
@@ -1482,9 +1976,7 @@ public class ReplicationLogDiscoveryReplayTest extends 
BaseTest {
         protected void triggerFailover() {
             // Track calls to triggerFailover for validation
             triggerFailoverCallCount++;
-
-            // Simulate the real behavior: set failoverPending to false after 
triggering failover
-            setFailoverPending(false);
+            super.triggerFailover();
         }
 
         public int getTriggerFailoverCallCount() {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
similarity index 99%
rename from 
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
rename to 
phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index 9540a8d20f..1ad72806f5 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
@@ -83,11 +84,13 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT {
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogProcessorTestIT extends ParallelStatsDisabledIT {
 
     private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s 
(ID VARCHAR PRIMARY KEY, " +
             "COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)";
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTest.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTestIT.java
similarity index 97%
rename from 
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTest.java
rename to 
phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTestIT.java
index 3412966abd..37c79b8852 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTest.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTestIT.java
@@ -34,6 +34,7 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.jdbc.ClusterRoleRecord;
 import org.apache.phoenix.jdbc.HAGroupStoreClient;
 import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
@@ -43,10 +44,12 @@ import 
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.HAGroupStoreTestUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.*;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
-public class ReplicationLogReplayTest extends BaseTest {
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogReplayTestIT extends BaseTest {
 
     @ClassRule
     public static TemporaryFolder testFolder = new TemporaryFolder();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
index f609a4aed2..0649b551b7 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
@@ -436,7 +436,7 @@ public class ReplicationLogDiscoveryTest {
     @Test
     public void testShouldProcessInProgressDirectory() {
         // Test multiple times to verify probability-based behavior
-        int totalTests = 1000;
+        int totalTests = 100000;
         int trueCount = 0;
 
         for (int i = 0; i < totalTests; i++) {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
index 0b413104fc..f26e8436e6 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
@@ -5,7 +5,7 @@
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * with the License.  You may obtain a copy of the License a
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -234,7 +234,7 @@ public class ReplicationShardDirectoryManagerTest {
 
         // Test 7: Multiple rounds later
         long multipleRoundsLater = dayStart + (300 * 1000L); // 00:05:00 (5 
minutes)
-        long expectedRoundStart = dayStart + (300 * 1000L); // Should be exact
+        long expectedRoundStart = dayStart + (300 * 1000L); // Should be exac
         long result7 = 
manager.getNearestRoundStartTimestamp(multipleRoundsLater);
         assertEquals("Multiple rounds later should return exact round start", 
expectedRoundStart, result7);
 
@@ -389,7 +389,7 @@ public class ReplicationShardDirectoryManagerTest {
 
     @Test
     public void testGetReplicationRoundFromStartTimeConsistency() {
-        // Test that the same input always produces the same output
+        // Test that the same input always produces the same outpu
         long timestamp = 1704110400000L; // 2024-01-01 12:00:00 UTC
 
         ReplicationRound result1 = 
manager.getReplicationRoundFromStartTime(timestamp);
@@ -543,7 +543,7 @@ public class ReplicationShardDirectoryManagerTest {
 
     @Test
     public void testGetReplicationRoundFromEndTimeConsistency() {
-        // Test that the same input always produces the same output
+        // Test that the same input always produces the same outpu
         long timestamp = 1704110400000L; // 2024-01-01 12:00:00 UTC
 
         ReplicationRound result1 = 
manager.getReplicationRoundFromEndTime(timestamp);
@@ -595,5 +595,161 @@ public class ReplicationShardDirectoryManagerTest {
             assertEquals("Shard " + i + " should have correct name", 
expectedShardName, shardPath.getName());
         }
     }
-}
 
+    @Test
+    public void testGetPreviousRound() {
+        // Use a specific day for consistent testing
+        // 2024-01-01 00:00:00 UTC = 1704067200000L
+        long dayStart = 1704067200000L; // 2024-01-01 00:00:00 UTC
+
+        // Default configuration: 60-second rounds
+        long roundDurationMs = 60 * 1000L; // 60 seconds in milliseconds
+
+        // Test 1: First round (00:00:00 to 00:01:00)
+        long firstRoundStart = dayStart; // 00:00:00
+        long firstRoundEnd = firstRoundStart + roundDurationMs; // 00:01:00
+        ReplicationRound firstRound = new ReplicationRound(firstRoundStart, 
firstRoundEnd);
+        ReplicationRound previousRound = manager.getPreviousRound(firstRound);
+
+        // Previous round should end at firstRoundStart (00:00:00), which 
rounds down to 00:00:00
+        // Start time should be end time - round duration = 00:00:00 - 60s = 
-60s (edge case)
+        long expectedEnd = dayStart; // 00:00:00
+        long expectedStart = expectedEnd - roundDurationMs; // -60 seconds
+        assertEquals("Previous round end time should be start time of current 
round rounded down",
+                   expectedEnd, previousRound.getEndTime());
+        assertEquals("Previous round start time should be end time - round 
duration",
+                   expectedStart, previousRound.getStartTime());
+
+        // Test 2: Second round (00:01:00 to 00:02:00)
+        long secondRoundStart = dayStart + roundDurationMs; // 00:01:00
+        long secondRoundEnd = secondRoundStart + roundDurationMs; // 00:02:00
+        ReplicationRound secondRound = new ReplicationRound(secondRoundStart, 
secondRoundEnd);
+        ReplicationRound previousRound2 = 
manager.getPreviousRound(secondRound);
+
+        // Previous round should end at secondRoundStart (00:01:00), which 
rounds down to 00:01:00
+        // Start time should be 00:01:00 - 60s = 00:00:00
+        long expectedEnd2 = secondRoundStart; // 00:01:00
+        long expectedStart2 = expectedEnd2 - roundDurationMs; // 00:00:00
+        assertEquals("Previous round end time should be start time of current 
round rounded down",
+                   expectedEnd2, previousRound2.getEndTime());
+        assertEquals("Previous round start time should be end time - round 
duration",
+                   expectedStart2, previousRound2.getStartTime());
+
+        // Test 3: Round with mid-timestamp start (00:01:30 would round to 
00:01:00)
+        long midRoundStart = dayStart + (90 * 1000L); // 00:01:30 (not aligned 
to round boundary)
+        long midRoundEnd = midRoundStart + roundDurationMs; // 00:02:30
+        ReplicationRound midRound = new ReplicationRound(midRoundStart, 
midRoundEnd);
+        ReplicationRound previousRound3 = manager.getPreviousRound(midRound);
+
+        // Previous round should end at midRoundStart rounded down = 00:01:00
+        // Start time should be 00:01:00 - 60s = 00:00:00
+        long expectedEnd3 = dayStart + roundDurationMs; // 00:01:00 (rounded 
down from 00:01:30)
+        long expectedStart3 = expectedEnd3 - roundDurationMs; // 00:00:00
+        assertEquals("Previous round end time should round down start time of 
current round",
+                   expectedEnd3, previousRound3.getEndTime());
+        assertEquals("Previous round start time should be end time - round 
duration",
+                   expectedStart3, previousRound3.getStartTime());
+
+        // Test 4: Multiple rounds later (00:05:00 to 00:06:00)
+        long multipleRoundsStart = dayStart + (5 * roundDurationMs); // 
00:05:00
+        long multipleRoundsEnd = multipleRoundsStart + roundDurationMs; // 
00:06:00
+        ReplicationRound multipleRounds = new 
ReplicationRound(multipleRoundsStart, multipleRoundsEnd);
+        ReplicationRound previousRound4 = 
manager.getPreviousRound(multipleRounds);
+
+        // Previous round should end at multipleRoundsStart = 00:05:00
+        // Start time should be 00:05:00 - 60s = 00:04:00
+        long expectedEnd4 = multipleRoundsStart; // 00:05:00
+        long expectedStart4 = expectedEnd4 - roundDurationMs; // 00:04:00
+        assertEquals("Previous round end time should be start time of current 
round",
+                   expectedEnd4, previousRound4.getEndTime());
+        assertEquals("Previous round start time should be end time - round 
duration",
+                   expectedStart4, previousRound4.getStartTime());
+
+        // Test 5: Verify round duration is consistent
+        long previousRoundDuration = previousRound4.getEndTime() - 
previousRound4.getStartTime();
+        assertEquals("Previous round duration should be 60 seconds", 
roundDurationMs, previousRoundDuration);
+    }
+
+    @Test
+    public void testGetNextRound() {
+        // Use a specific day for consistent testing
+        // 2024-01-01 00:00:00 UTC = 1704067200000L
+        long dayStart = 1704067200000L; // 2024-01-01 00:00:00 UTC
+
+        // Default configuration: 60-second rounds
+        long roundDurationMs = 60 * 1000L; // 60 seconds in milliseconds
+
+        // Test 1: First round (00:00:00 to 00:01:00)
+        long firstRoundStart = dayStart; // 00:00:00
+        long firstRoundEnd = firstRoundStart + roundDurationMs; // 00:01:00
+        ReplicationRound firstRound = new ReplicationRound(firstRoundStart, 
firstRoundEnd);
+        ReplicationRound nextRound = manager.getNextRound(firstRound);
+
+        // Next round should start at firstRoundEnd (00:01:00), which rounds 
down to 00:01:00
+        // End time should be 00:01:00 + 60s = 00:02:00
+        long expectedStart = firstRoundEnd; // 00:01:00
+        long expectedEnd = expectedStart + roundDurationMs; // 00:02:00
+        assertEquals("Next round start time should be end time of current 
round rounded down",
+                   expectedStart, nextRound.getStartTime());
+        assertEquals("Next round end time should be start time + round 
duration",
+                   expectedEnd, nextRound.getEndTime());
+
+        // Test 2: Second round (00:01:00 to 00:02:00)
+        long secondRoundStart = dayStart + roundDurationMs; // 00:01:00
+        long secondRoundEnd = secondRoundStart + roundDurationMs; // 00:02:00
+        ReplicationRound secondRound = new ReplicationRound(secondRoundStart, 
secondRoundEnd);
+        ReplicationRound nextRound2 = manager.getNextRound(secondRound);
+
+        // Next round should start at secondRoundEnd (00:02:00), which rounds 
down to 00:02:00
+        // End time should be 00:02:00 + 60s = 00:03:00
+        long expectedStart2 = secondRoundEnd; // 00:02:00
+        long expectedEnd2 = expectedStart2 + roundDurationMs; // 00:03:00
+        assertEquals("Next round start time should be end time of current 
round rounded down",
+                   expectedStart2, nextRound2.getStartTime());
+        assertEquals("Next round end time should be start time + round 
duration",
+                   expectedEnd2, nextRound2.getEndTime());
+
+        // Test 3: Round with mid-timestamp end (00:02:30 would round to 
00:02:00)
+        long midRoundStart = dayStart + (120 * 1000L); // 00:02:00
+        long midRoundEnd = dayStart + (150 * 1000L); // 00:02:30 (not aligned 
to round boundary)
+        ReplicationRound midRound = new ReplicationRound(midRoundStart, 
midRoundEnd);
+        ReplicationRound nextRound3 = manager.getNextRound(midRound);
+
+        // Next round should start at midRoundEnd rounded down = 00:02:00
+        // End time should be 00:02:00 + 60s = 00:03:00
+        long expectedStart3 = dayStart + (120 * 1000L); // 00:02:00 (rounded 
down from 00:02:30)
+        long expectedEnd3 = expectedStart3 + roundDurationMs; // 00:03:00
+        assertEquals("Next round start time should round down end time of 
current round",
+                   expectedStart3, nextRound3.getStartTime());
+        assertEquals("Next round end time should be start time + round 
duration",
+                   expectedEnd3, nextRound3.getEndTime());
+
+        // Test 4: Multiple rounds later (00:05:00 to 00:06:00)
+        long multipleRoundsStart = dayStart + (5 * roundDurationMs); // 
00:05:00
+        long multipleRoundsEnd = multipleRoundsStart + roundDurationMs; // 
00:06:00
+        ReplicationRound multipleRounds = new 
ReplicationRound(multipleRoundsStart, multipleRoundsEnd);
+        ReplicationRound nextRound4 = manager.getNextRound(multipleRounds);
+
+        // Next round should start at multipleRoundsEnd = 00:06:00
+        // End time should be 00:06:00 + 60s = 00:07:00
+        long expectedStart4 = multipleRoundsEnd; // 00:06:00
+        long expectedEnd4 = expectedStart4 + roundDurationMs; // 00:07:00
+        assertEquals("Next round start time should be end time of current 
round",
+                   expectedStart4, nextRound4.getStartTime());
+        assertEquals("Next round end time should be start time + round 
duration",
+                   expectedEnd4, nextRound4.getEndTime());
+
+        // Test 5: Verify round duration is consistent
+        long nextRoundDuration = nextRound4.getEndTime() - 
nextRound4.getStartTime();
+        assertEquals("Next round duration should be 60 seconds", 
roundDurationMs, nextRoundDuration);
+
+        // Test 6: Verify continuity - next round of previous round should 
equal original round
+        ReplicationRound originalRound = new ReplicationRound(dayStart + (3 * 
roundDurationMs), dayStart + (4 * roundDurationMs)); // 00:03:00 to 00:04:00
+        ReplicationRound prevRound = manager.getPreviousRound(originalRound);
+        ReplicationRound nextOfPrev = manager.getNextRound(prevRound);
+        assertEquals("Next round of previous round should equal original round 
start time",
+                   originalRound.getStartTime(), nextOfPrev.getStartTime());
+        assertEquals("Next round of previous round should equal original round 
end time",
+                   originalRound.getEndTime(), nextOfPrev.getEndTime());
+    }
+}

Reply via email to