This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 37de6ecbf3 Phoenix-7672 Adding failover implementation via replay 
(Addendum) (#2351)
37de6ecbf3 is described below

commit 37de6ecbf366371793b583af57b6238a474c0034
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Fri Jan 23 04:29:25 2026 +0530

    Phoenix-7672 Adding failover implementation via replay (Addendum) (#2351)
---
 .../replication/ReplicationLogDiscovery.java       |  50 +-
 .../phoenix/replication/ReplicationLogTracker.java |   2 +-
 .../phoenix/replication/ReplicationRound.java      |   3 -
 .../ReplicationShardDirectoryManager.java          |  13 +-
 .../reader/ReplicationLogDiscoveryReplay.java      |  70 ++-
 .../reader/ReplicationLogProcessor.java            |   2 +
 .../replication/reader/ReplicationLogReplay.java   |   2 +
 .../reader/ReplicationLogReplayService.java        |   2 +
 .../ReplicationLogDiscoveryReplayTestIT.java       | 595 ++++++++++++++++++++-
 .../reader/ReplicationLogProcessorTestIT.java      |  10 +-
 .../ReplicationShardDirectoryManagerTest.java      | 167 +++++-
 11 files changed, 837 insertions(+), 79 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 1123c20564..37ef9084c7 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -103,6 +103,7 @@ public abstract class ReplicationLogDiscovery {
   }
 
   public void init() throws IOException {
+    LOG.info("Initializing ReplicationLogDiscovery for haGroup: {}", 
haGroupName);
     initializeLastRoundProcessed();
     this.metrics = createMetricsSource();
   }
@@ -122,7 +123,7 @@ public abstract class ReplicationLogDiscovery {
   public void start() throws IOException {
     synchronized (this) {
       if (isRunning) {
-        LOG.warn("ReplicationLogDiscovery is already running for group: {}", 
haGroupName);
+        LOG.warn("ReplicationLogDiscovery is already running for haGroup: {}", 
haGroupName);
         return;
       }
       // Initialize and schedule the executors
@@ -137,7 +138,7 @@ public abstract class ReplicationLogDiscovery {
       }, 0, getReplayIntervalSeconds(), TimeUnit.SECONDS);
 
       isRunning = true;
-      LOG.info("ReplicationLogDiscovery started for group: {}", haGroupName);
+      LOG.info("ReplicationLogDiscovery started for haGroup: {}", haGroupName);
     }
   }
 
@@ -151,7 +152,7 @@ public abstract class ReplicationLogDiscovery {
 
     synchronized (this) {
       if (!isRunning) {
-        LOG.warn("ReplicationLogDiscovery is not running for group: {}", 
haGroupName);
+        LOG.warn("ReplicationLogDiscovery is not running for haGroup: {}", 
haGroupName);
         return;
       }
 
@@ -171,7 +172,7 @@ public abstract class ReplicationLogDiscovery {
       }
     }
 
-    LOG.info("ReplicationLogDiscovery stopped for group: {}", haGroupName);
+    LOG.info("ReplicationLogDiscovery stopped for haGroup: {}", haGroupName);
   }
 
   /**
@@ -190,8 +191,8 @@ public abstract class ReplicationLogDiscovery {
       try {
         processRound(replicationRound);
       } catch (IOException e) {
-        LOG.error("Failed processing replication round {}. Will retry in next 
" + "scheduled run.",
-          replicationRound, e);
+        LOG.error("Failed processing replication round {} for haGroup {}. Will 
retry"
+          + "in next scheduled run.", replicationRound, haGroupName, e);
         break; // stop this run, retry later
       }
       setLastRoundProcessed(replicationRound);
@@ -223,7 +224,7 @@ public abstract class ReplicationLogDiscovery {
    * @throws IOException if there's an error during round processing
    */
   protected void processRound(ReplicationRound replicationRound) throws 
IOException {
-    LOG.info("Starting to process round: {}", replicationRound);
+    LOG.info("Starting to process round: {} for haGroup: {}", 
replicationRound, haGroupName);
     // Increment the number of rounds processed
     getMetrics().incrementNumRoundsProcessed();
 
@@ -233,7 +234,7 @@ public abstract class ReplicationLogDiscovery {
       // Conditionally process the in progress files for the round
       processInProgressDirectory();
     }
-    LOG.info("Finished processing round: {}", replicationRound);
+    LOG.info("Finished processing round: {} for haGroup: {}", 
replicationRound, haGroupName);
   }
 
   /**
@@ -253,7 +254,8 @@ public abstract class ReplicationLogDiscovery {
    * @throws IOException if there's an error during file processing
    */
   protected void processNewFilesForRound(ReplicationRound replicationRound) 
throws IOException {
-    LOG.info("Starting new files processing for round: {}", replicationRound);
+    LOG.info("Starting new files processing for round: {} for haGroup: {}", 
replicationRound,
+      haGroupName);
     long startTime = EnvironmentEdgeManager.currentTime();
     List<Path> files = 
replicationLogTracker.getNewFilesForRound(replicationRound);
     LOG.info("Number of new files for round {} is {}", replicationRound, 
files.size());
@@ -262,7 +264,8 @@ public abstract class ReplicationLogDiscovery {
       files = replicationLogTracker.getNewFilesForRound(replicationRound);
     }
     long duration = EnvironmentEdgeManager.currentTime() - startTime;
-    LOG.info("Finished new files processing for round: {} in {}ms", 
replicationRound, duration);
+    LOG.info("Finished new files processing for round: {} in {}ms for haGroup: 
{}",
+      replicationRound, duration, haGroupName);
     getMetrics().updateTimeToProcessNewFiles(duration);
   }
 
@@ -272,24 +275,25 @@ public abstract class ReplicationLogDiscovery {
    * @throws IOException if there's an error during file processing
    */
   protected void processInProgressDirectory() throws IOException {
+    LOG.info("Starting {} directory processing for haGroup: {}",
+      replicationLogTracker.getInProgressLogSubDirectoryName(), haGroupName);
     // Increase the count for number of times in progress directory is 
processed
     getMetrics().incrementNumInProgressDirectoryProcessed();
-    LOG.info("Starting {} directory processing",
-      replicationLogTracker.getInProgressLogSubDirectoryName());
     long startTime = EnvironmentEdgeManager.currentTime();
     long oldestTimestampToProcess =
       
replicationLogTracker.getReplicationShardDirectoryManager().getNearestRoundStartTimestamp(
         EnvironmentEdgeManager.currentTime()) - getReplayIntervalSeconds() * 
1000L;
     List<Path> files = 
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
-    LOG.info("Number of {} files with oldestTimestampToProcess {} is {}",
+    LOG.info("Number of {} files with oldestTimestampToProcess {} is {} for 
haGroup: {}",
       replicationLogTracker.getInProgressLogSubDirectoryName(), 
oldestTimestampToProcess,
-      files.size());
+      files.size(), haGroupName);
     while (!files.isEmpty()) {
       processOneRandomFile(files);
       files = 
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
     }
     long duration = EnvironmentEdgeManager.currentTime() - startTime;
-    LOG.info("Finished in-progress files processing in {}ms", duration);
+    LOG.info("Finished in-progress files processing in {}ms for haGroup: {}", 
duration,
+      haGroupName);
     getMetrics().updateTimeToProcessInProgressFiles(duration);
   }
 
@@ -339,21 +343,27 @@ public abstract class ReplicationLogDiscovery {
   protected void initializeLastRoundProcessed() throws IOException {
     Optional<Long> minTimestampFromInProgressFiles = 
getMinTimestampFromInProgressFiles();
     if (minTimestampFromInProgressFiles.isPresent()) {
-      LOG.info("Initializing lastRoundProcessed from {} files with minimum " + 
"timestamp as {}",
-        replicationLogTracker.getInProgressLogSubDirectoryName(),
+      LOG.info(
+        "Initializing lastRoundProcessed for haGroup: {} from {} files with 
minimum "
+          + "timestamp as {}",
+        haGroupName, replicationLogTracker.getInProgressLogSubDirectoryName(),
         minTimestampFromInProgressFiles.get());
       this.lastRoundProcessed = 
replicationLogTracker.getReplicationShardDirectoryManager()
         .getReplicationRoundFromEndTime(minTimestampFromInProgressFiles.get());
     } else {
       Optional<Long> minTimestampFromNewFiles = getMinTimestampFromNewFiles();
       if (minTimestampFromNewFiles.isPresent()) {
-        LOG.info("Initializing lastRoundProcessed from {} files with minimum 
timestamp " + "as {}",
-          replicationLogTracker.getInSubDirectoryName(), 
minTimestampFromNewFiles.get());
+        LOG.info(
+          "Initializing lastRoundProcessed for haGroup: {} from {}"
+            + "files with minimum timestamp as {}",
+          haGroupName, replicationLogTracker.getInSubDirectoryName(),
+          minTimestampFromNewFiles.get());
         this.lastRoundProcessed = 
replicationLogTracker.getReplicationShardDirectoryManager()
           .getReplicationRoundFromEndTime(minTimestampFromNewFiles.get());
       } else {
         long currentTime = EnvironmentEdgeManager.currentTime();
-        LOG.info("Initializing lastRoundProcessed from current time {}", 
currentTime);
+        LOG.info("Initializing lastRoundProcessed for haGroup: {} from current 
time {}",
+          haGroupName, currentTime);
         this.lastRoundProcessed = 
replicationLogTracker.getReplicationShardDirectoryManager()
           
.getReplicationRoundFromEndTime(EnvironmentEdgeManager.currentTime());
       }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
index de12f81fee..f7cb1f2cab 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
@@ -116,7 +116,7 @@ public class ReplicationLogTracker {
    * @return List of valid log file paths that belong to the specified 
replication round
    * @throws IOException if there's an error accessing the file system
    */
-  protected List<Path> getNewFilesForRound(ReplicationRound replicationRound) 
throws IOException {
+  public List<Path> getNewFilesForRound(ReplicationRound replicationRound) 
throws IOException {
     Path roundDirectory = 
replicationShardDirectoryManager.getShardDirectory(replicationRound);
     LOG.info("Getting new files for round {} from shard {}", replicationRound, 
roundDirectory);
     if (!fileSystem.exists(roundDirectory)) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationRound.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationRound.java
index fe5d38143b..64dc3f1db8 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationRound.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationRound.java
@@ -19,8 +19,6 @@ package org.apache.phoenix.replication;
 
 import java.util.Objects;
 
-import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
-
 /**
  * Represents a replication round with defined start and end timestamp. Used 
for grouping
  * replication log files into time-based processing windows and managing file 
distribution across
@@ -32,7 +30,6 @@ public class ReplicationRound {
   private final long endTime;
 
   public ReplicationRound(long startTime, long endTime) {
-    Preconditions.checkArgument(startTime < endTime);
     this.startTime = startTime;
     this.endTime = endTime;
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
index 76c2afcb19..cbbe5fc3a0 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
@@ -150,7 +150,8 @@ public class ReplicationShardDirectoryManager {
    */
   public ReplicationRound getReplicationRoundFromEndTime(long roundEndTime) {
     long validRoundEndTime = getNearestRoundStartTimestamp(roundEndTime);
-    long validRoundStartTime = validRoundEndTime - 
replicationRoundDurationSeconds * 1000L;
+    long validRoundStartTime =
+      Math.max(0L, validRoundEndTime - replicationRoundDurationSeconds * 
1000L);
     return new ReplicationRound(validRoundStartTime, validRoundEndTime);
   }
 
@@ -172,7 +173,7 @@ public class ReplicationShardDirectoryManager {
    * @param timestamp The timestamp in milliseconds since epoch
    * @return The nearest replication round start timestamp
    */
-  protected long getNearestRoundStartTimestamp(long timestamp) {
+  public long getNearestRoundStartTimestamp(long timestamp) {
     // Convert round time from seconds to milliseconds
     long roundTimeMs = replicationRoundDurationSeconds * 1000L;
 
@@ -181,6 +182,14 @@ public class ReplicationShardDirectoryManager {
     return (timestamp / roundTimeMs) * roundTimeMs;
   }
 
+  public ReplicationRound getPreviousRound(final ReplicationRound 
replicationRound) {
+    return getReplicationRoundFromEndTime(replicationRound.getStartTime());
+  }
+
+  public ReplicationRound getNextRound(final ReplicationRound 
replicationRound) {
+    return getReplicationRoundFromStartTime(replicationRound.getEndTime());
+  }
+
   public int getReplicationRoundDurationSeconds() {
     return this.replicationRoundDurationSeconds;
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index f1292519a5..36daca8b97 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -127,6 +127,9 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
 
   @Override
   public void init() throws IOException {
+
+    LOG.info("Initializing ReplicationLogDiscoveryReplay for haGroup: {}", 
haGroupName);
+
     HAGroupStateListener degradedListener =
       (groupName, fromState, toState, modifiedTime, clusterType, 
lastSyncStateTimeInMs) -> {
         if (
@@ -220,7 +223,10 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
    */
   @Override
   protected void initializeLastRoundProcessed() throws IOException {
+    LOG.info("Initializing last round processed for haGroup: {}", haGroupName);
     HAGroupStoreRecord haGroupStoreRecord = getHAGroupRecord();
+    LOG.info("Found HA Group state during initialization as {} for haGroup: 
{}",
+      haGroupStoreRecord.getHAGroupState(), haGroupName);
     if (
       
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(haGroupStoreRecord.getHAGroupState())
     ) {
@@ -285,6 +291,7 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
     LOG.info("Starting replay with lastRoundProcessed={}, lastRoundInSync={}", 
lastRoundProcessed,
       lastRoundInSync);
     Optional<ReplicationRound> optionalNextRound = getFirstRoundToProcess();
+    LOG.info("Found first round to process as {} for haGroup: {}", 
optionalNextRound, haGroupName);
     while (optionalNextRound.isPresent()) {
       ReplicationRound replicationRound = optionalNextRound.get();
       try {
@@ -301,8 +308,12 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
       switch (currentState) {
         case SYNCED_RECOVERY:
           // Rewind to last in-sync round
-          LOG.info("SYNCED_RECOVERY detected, rewinding to 
lastRoundInSync={}", lastRoundInSync);
-          setLastRoundProcessed(lastRoundInSync);
+          LOG.info("SYNCED_RECOVERY detected, rewinding with 
lastRoundInSync={}", lastRoundInSync);
+          Optional<ReplicationRound> firstRoundToProcess = 
getFirstRoundToProcess();
+          LOG.info("Calculated first round to process after SYNCED_RECOVERY 
as" + "{}",
+            firstRoundToProcess);
+          firstRoundToProcess.ifPresent(round -> setLastRoundProcessed(
+            
replicationLogTracker.getReplicationShardDirectoryManager().getPreviousRound(round)));
           // Only reset to NORMAL if state hasn't been flipped to DEGRADED
           
replicationReplayState.compareAndSet(ReplicationReplayState.SYNCED_RECOVERY,
             ReplicationReplayState.SYNC);
@@ -336,16 +347,9 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
       LOG.info(
         "No more rounds to process, lastRoundInSync={}, lastRoundProcessed={}. 
"
           + "Failover is triggered & in progress directory is empty. "
-          + "Marking cluster state as {}",
+          + "Attempting to mark cluster state as {}",
         lastRoundInSync, lastRoundProcessed, 
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
-      try {
-        triggerFailover();
-        LOG.info("Successfully updated the cluster state");
-        failoverPending.set(false);
-      } catch (InvalidClusterRoleTransitionException exception) {
-        LOG.warn("Failed to update the cluster state.", exception);
-        failoverPending.set(false);
-      }
+      triggerFailover();
     }
   }
 
@@ -356,8 +360,15 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
    * scenarios where lastRoundProcessed may be ahead of lastRoundInSync.
    * @return Optional containing the first round to process, or empty if not 
enough time has passed
    */
-  private Optional<ReplicationRound> getFirstRoundToProcess() {
-    long lastRoundEndTimestamp = getLastRoundInSync().getEndTime();
+  private Optional<ReplicationRound> getFirstRoundToProcess() throws 
IOException {
+    ReplicationRound lastRoundInSync = getLastRoundInSync();
+    long lastRoundEndTimestamp = lastRoundInSync.getEndTime();
+    if (lastRoundInSync.getStartTime() == 0) {
+      Optional<Long> optionalMinimumNewFilesTimestamp = 
getMinTimestampFromNewFiles();
+      lastRoundEndTimestamp =
+        
replicationLogTracker.getReplicationShardDirectoryManager().getNearestRoundStartTimestamp(
+          
optionalMinimumNewFilesTimestamp.orElseGet(EnvironmentEdgeManager::currentTime));
+    }
     long currentTime = EnvironmentEdgeManager.currentTime();
     if (currentTime - lastRoundEndTimestamp < roundTimeMills + bufferMillis) {
       // nothing more to process
@@ -440,17 +451,34 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
     return optionalHAGroupStateRecord.get();
   }
 
+  /**
+   * Determines whether failover should be triggered based on completion 
criteria. Failover is safe
+   * to trigger when all of the following conditions are met: 1. A failover 
has been requested
+   * (failoverPending is true) 2. No files are currently in the in-progress 
directory 3. No new
+   * files exist for ongoing round These conditions ensure all replication 
logs have been processed
+   * before transitioning the cluster from STANDBY to ACTIVE state.
+   * @return true if all conditions are met and failover should be triggered, 
false otherwise
+   * @throws IOException if there's an error checking file status
+   */
   protected boolean shouldTriggerFailover() throws IOException {
-    return failoverPending.get() && lastRoundInSync.equals(lastRoundProcessed)
-      && replicationLogTracker.getInProgressFiles().isEmpty();
-    // TODO: Check for in files of lastRoundProcessed, lastRoundProcessed + 
roundTime as well -
-    // because there can be new files for upcoming round
+    return failoverPending.get() && 
replicationLogTracker.getInProgressFiles().isEmpty()
+      && replicationLogTracker.getNewFilesForRound(replicationLogTracker
+        
.getReplicationShardDirectoryManager().getNextRound(getLastRoundProcessed())).isEmpty();
   }
 
-  protected void triggerFailover() throws IOException, 
InvalidClusterRoleTransitionException {
-    // TODO: Update cluster state to ACTIVE_IN_SYNC within try block
-    // (once API is supported in HA Store)
-    // Throw any exception back to caller.
+  protected void triggerFailover() {
+    try {
+      
HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName);
+      failoverPending.set(false);
+    } catch (InvalidClusterRoleTransitionException 
invalidClusterRoleTransitionException) {
+      LOG.warn(
+        "Failed to update the cluster state due to"
+          + "InvalidClusterRoleTransitionException. Setting failoverPending" + 
"to false.",
+        invalidClusterRoleTransitionException);
+      failoverPending.set(false);
+    } catch (Exception exception) {
+      LOG.error("Failed to update the cluster state.", exception);
+    }
   }
 
   public enum ReplicationReplayState {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index be744a5f7e..84d4f1ba42 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -213,6 +213,8 @@ public class ReplicationLogProcessor implements Closeable {
 
   public void processLogFile(FileSystem fs, Path filePath) throws IOException {
 
+    LOG.info("Starting to process file {}", filePath);
+
     // Map from Table Name to List of Mutations
     Map<TableName, List<Mutation>> tableToMutationsMap = new HashMap<>();
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
index 89c05ce63c..e0e3811ed3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
@@ -105,6 +105,7 @@ public class ReplicationLogReplay {
    * @throws IOException if there's an error during initialization
    */
   protected void init() throws IOException {
+    LOG.info("Initializing ReplicationLogReplay for haGroup: {}", haGroupName);
     initializeFileSystem();
     Path newFilesDirectory =
       new Path(new Path(rootURI.getPath(), haGroupName), 
ReplicationLogReplay.IN_DIRECTORY_NAME);
@@ -120,6 +121,7 @@ public class ReplicationLogReplay {
   }
 
   public void close() {
+    LOG.info("Closing ReplicationLogReplay for haGroup: {}", haGroupName);
     replicationLogDiscoveryReplay.getReplicationLogFileTracker().close();
     replicationLogDiscoveryReplay.close();
     // Remove the instance from cache
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
index 1485675050..fe380dea85 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
@@ -191,6 +191,7 @@ public class ReplicationLogReplayService {
    */
   protected void startReplicationReplay() throws IOException, SQLException {
     List<String> replicationGroups = getReplicationGroups();
+    LOG.info("{} number of HA Groups found to start Replication Replay", 
replicationGroups.size());
     for (String replicationGroup : replicationGroups) {
       ReplicationLogReplay.get(conf, replicationGroup).startReplay();
     }
@@ -201,6 +202,7 @@ public class ReplicationLogReplayService {
    */
   protected void stopReplicationReplay() throws IOException, SQLException {
     List<String> replicationGroups = getReplicationGroups();
+    LOG.info("{} number of HA Groups found to stop Replication Replay", 
replicationGroups.size());
     for (String replicationGroup : replicationGroups) {
       ReplicationLogReplay replicationLogReplay = 
ReplicationLogReplay.get(conf, replicationGroup);
       replicationLogReplay.stopReplay();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index 6eb8cd8b3e..68045bf5e0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -26,9 +26,11 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.URI;
+import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.jdbc.HAGroupStoreRecord;
 import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
 import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
@@ -97,8 +100,8 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
   public void setUp() throws Exception {
     zkUrl = getLocalZkUrl(config);
     peerZkUrl = CLUSTERS.getZkUrl2();
-    
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), 
zkUrl,
-      peerZkUrl, CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+    HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, zkUrl, 
peerZkUrl,
+      CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
       ClusterRoleRecord.ClusterRole.ACTIVE, 
ClusterRoleRecord.ClusterRole.STANDBY, null);
     localFs = FileSystem.getLocal(config);
     standbyUri = testFolder.getRoot().toURI();
@@ -1293,6 +1296,396 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
     }
   }
 
+  /**
+   * Tests replay method when lastRoundInSync is not set, i.e. has start time 
0. Validates that
+   * getFirstRoundToProcess uses minimum timestamp from new files
+   */
+  @Test
+  public void testReplay_LastRoundInSync_NotInitialized() throws IOException {
+    TestableReplicationLogTracker fileTracker =
+      createReplicationLogTracker(config, haGroupName, localFs, standbyUri);
+
+    try {
+      long roundTimeMills =
+        
fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds()
+          * 1000L;
+      long bufferMillis = (long) (roundTimeMills * 0.15);
+
+      // Create new files with specific timestamps
+      // The minimum timestamp should be used as the basis for the first round
+      long minFileTimestamp = 1704067200000L + (5 * roundTimeMills) + (30 * 
1000L); // 2024-01-01
+                                                                               
     // 00:05:30
+      long maxFileTimestamp = 1704067200000L + (8 * roundTimeMills) + (45 * 
1000L); // 2024-01-01
+                                                                               
     // 00:08:45
+      long middleFileTimestamp = 1704067200000L + (7 * roundTimeMills) + (15 * 
1000L); // 2024-01-01
+                                                                               
        // 00:07:15
+
+      // Create files in different shard directories
+      ReplicationShardDirectoryManager shardManager =
+        fileTracker.getReplicationShardDirectoryManager();
+
+      // Create file with minimum timestamp
+      Path shardPath1 = shardManager.getShardDirectory(minFileTimestamp);
+      localFs.mkdirs(shardPath1);
+      Path file1 = new Path(shardPath1, minFileTimestamp + "_rs-1.plog");
+      localFs.create(file1, true).close();
+
+      // Create file with middle timestamp
+      Path shardPath2 = shardManager.getShardDirectory(middleFileTimestamp);
+      localFs.mkdirs(shardPath2);
+      Path file2 = new Path(shardPath2, middleFileTimestamp + "_rs-2.plog");
+      localFs.create(file2, true).close();
+
+      // Create file with maximum timestamp
+      Path shardPath3 = shardManager.getShardDirectory(maxFileTimestamp);
+      localFs.mkdirs(shardPath3);
+      Path file3 = new Path(shardPath3, maxFileTimestamp + "_rs-3.plog");
+      localFs.create(file3, true).close();
+
+      // Create HAGroupStoreRecord for STANDBY state
+      HAGroupStoreRecord mockRecord =
+        new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
+          HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+          peerZkUrl, zkUrl, peerZkUrl, 0L);
+
+      // Calculate expected first round start time (minimum timestamp rounded 
down to nearest round
+      // start)
+      long expectedFirstRoundStart = 
shardManager.getNearestRoundStartTimestamp(minFileTimestamp);
+      long expectedFirstRoundEnd = expectedFirstRoundStart + roundTimeMills;
+
+      // Set current time to allow processing exactly 5 rounds
+      // Round 1 ends at expectedFirstRoundEnd
+      // Round 2 ends at expectedFirstRoundEnd + roundTimeMills
+      // Round 3 ends at expectedFirstRoundEnd + 2*roundTimeMills
+      // Round 4 ends at expectedFirstRoundEnd + 3*roundTimeMills
+      // Round 5 ends at expectedFirstRoundEnd + 4*roundTimeMills
+      // Round 6 starts at expectedFirstRoundEnd + 4*roundTimeMills
+      // For round 6 to be processable: currentTime - (expectedFirstRoundEnd + 
4*roundTimeMills) >=
+      // roundTimeMills + bufferMillis
+      // So: currentTime >= expectedFirstRoundEnd + 5*roundTimeMills + 
bufferMillis
+      // For round 6 NOT to be processable: currentTime - 
(expectedFirstRoundEnd + 5*roundTimeMills)
+      // < roundTimeMills + bufferMillis
+      // So exactly 5 rounds should be processed with currentTime = 
expectedFirstRoundEnd +
+      // 5*roundTimeMills + bufferMillis - 1
+      // To be safe, use: currentTime = expectedFirstRoundEnd + 
5*roundTimeMills + bufferMillis -
+      // 1000
+      long currentTime = expectedFirstRoundEnd + (5 * roundTimeMills) + 
bufferMillis - 1000;
+      EnvironmentEdge edge = () -> currentTime;
+      EnvironmentEdgeManager.injectEdge(edge);
+
+      // Calculate exact number of rounds that should be processed
+      // Round 1: from getFirstRoundToProcess (based on lastRoundInSync with 
start time 0)
+      // - Uses minFileTimestamp rounded down = expectedFirstRoundStart
+      // - Round 1: expectedFirstRoundStart to expectedFirstRoundEnd
+      // Round 2+: from getNextRoundToProcess (based on previous round's end 
time)
+      // - Round 2: expectedFirstRoundEnd to expectedFirstRoundEnd + 
roundTimeMills
+      // - Round 3: expectedFirstRoundEnd + roundTimeMills to 
expectedFirstRoundEnd +
+      // 2*roundTimeMills
+      // - Round 4: expectedFirstRoundEnd + 2*roundTimeMills to 
expectedFirstRoundEnd +
+      // 3*roundTimeMills
+      // - Round 5: expectedFirstRoundEnd + 3*roundTimeMills to 
expectedFirstRoundEnd +
+      // 4*roundTimeMills
+      int expectedRoundCount = 5;
+      long[] expectedRoundStarts = new long[expectedRoundCount];
+      long[] expectedRoundEnds = new long[expectedRoundCount];
+      expectedRoundStarts[0] = expectedFirstRoundStart;
+      expectedRoundEnds[0] = expectedFirstRoundEnd;
+      for (int i = 1; i < expectedRoundCount; i++) {
+        expectedRoundStarts[i] = expectedFirstRoundEnd + ((i - 1) * 
roundTimeMills);
+        expectedRoundEnds[i] = expectedRoundStarts[i] + roundTimeMills;
+      }
+
+      try {
+        TestableReplicationLogDiscoveryReplay discovery =
+          new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+        // Set lastRoundInSync with start time 0 (the new case being tested)
+        ReplicationRound lastRoundInSyncWithZeroStart = new 
ReplicationRound(0L, roundTimeMills);
+        discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
+
+        // Set lastRoundProcessed to some initial value
+        ReplicationRound initialLastRoundProcessed = new ReplicationRound(0L, 
roundTimeMills);
+        discovery.setLastRoundProcessed(initialLastRoundProcessed);
+        discovery
+          
.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC);
+
+        // Verify initial state
+        ReplicationRound lastRoundInSyncBefore = 
discovery.getLastRoundInSync();
+        assertEquals("lastRoundInSync should have start time 0 before replay", 
0L,
+          lastRoundInSyncBefore.getStartTime());
+
+        // Call replay - should start from minimum timestamp of new files
+        discovery.replay();
+
+        // Verify exact count of processRound calls
+        int actualProcessRoundCallCount = discovery.getProcessRoundCallCount();
+        assertEquals("processRound should be called exactly " + 
expectedRoundCount + " times",
+          expectedRoundCount, actualProcessRoundCallCount);
+
+        // Get all processed rounds
+        List<ReplicationRound> processedRounds = 
discovery.getProcessedRounds();
+        assertEquals("Should have processed exactly " + expectedRoundCount + " 
rounds",
+          expectedRoundCount, processedRounds.size());
+
+        // Verify each round's exact parameters
+        for (int i = 0; i < expectedRoundCount; i++) {
+          ReplicationRound actualRound = processedRounds.get(i);
+          assertEquals("Round " + (i + 1) + " should have correct start time",
+            expectedRoundStarts[i], actualRound.getStartTime());
+          assertEquals("Round " + (i + 1) + " should have correct end time", 
expectedRoundEnds[i],
+            actualRound.getEndTime());
+        }
+
+        // Verify lastRoundProcessed was updated to the last processed round
+        ReplicationRound lastRoundAfterReplay = 
discovery.getLastRoundProcessed();
+        assertNotNull("Last round processed should not be null", 
lastRoundAfterReplay);
+        assertEquals("Last round processed should match the last processed 
round start time",
+          expectedRoundStarts[expectedRoundCount - 1], 
lastRoundAfterReplay.getStartTime());
+        assertEquals("Last round processed should match the last processed 
round end time",
+          expectedRoundEnds[expectedRoundCount - 1], 
lastRoundAfterReplay.getEndTime());
+
+        // Verify lastRoundInSync was updated (in SYNC state, both should 
advance together)
+        ReplicationRound lastRoundInSyncAfter = discovery.getLastRoundInSync();
+        assertNotNull("Last round in sync should not be null", 
lastRoundInSyncAfter);
+        assertEquals("Last round in sync should match last round processed in 
SYNC state",
+          lastRoundAfterReplay, lastRoundInSyncAfter);
+        assertEquals("Last round in sync should have correct start time",
+          expectedRoundStarts[expectedRoundCount - 1], 
lastRoundInSyncAfter.getStartTime());
+        assertEquals("Last round in sync should have correct end time",
+          expectedRoundEnds[expectedRoundCount - 1], 
lastRoundInSyncAfter.getEndTime());
+
+        // Verify state remains SYNC
+        assertEquals("State should remain SYNC",
+          ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC,
+          discovery.getReplicationReplayState());
+
+      } finally {
+        EnvironmentEdgeManager.reset();
+      }
+    } finally {
+      fileTracker.close();
+    }
+  }
+
+  /**
+   * Tests replay method when state changes from DEGRADED to SYNC between 
round processing and
+   * lastRoundInSync has start time 0. Expected behavior: - 1st round is 
processed in DEGRADED state
+   * - 2nd round is processed in DEGRADED state, but then state transitions to 
SYNCED_RECOVERY -
+   * When state becomes SYNCED_RECOVERY, lastRoundProcessed is rewound to the 
previous round of
+   * getFirstRoundToProcess() (which uses minimum file timestamp when 
lastRoundInSync.startTime ==
+   * 0) - Then 5 rounds are processed starting from the first round (based on 
minimum file
+   * timestamp) in SYNC state Total: 7 rounds processed (2 in DEGRADED, then 
rewind, then 5 in SYNC)
+   */
+  @Test
+  public void testReplay_DegradedToSync_LastRoundInSyncStartTimeZero() throws 
IOException {
+    TestableReplicationLogTracker fileTracker =
+      createReplicationLogTracker(config, haGroupName, localFs, standbyUri);
+
+    try {
+      long roundTimeMills =
+        
fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds()
+          * 1000L;
+      long bufferMillis = (long) (roundTimeMills * 0.15);
+
+      // Create new files with specific timestamps
+      // The minimum timestamp should be used as the basis for the first round
+      long minFileTimestamp = 1704067200000L + (5 * roundTimeMills) + (30 * 
1000L); // 2024-01-01
+                                                                               
     // 00:05:30
+      long maxFileTimestamp = 1704067200000L + (8 * roundTimeMills) + (45 * 
1000L); // 2024-01-01
+                                                                               
     // 00:08:45
+
+      // Create files in different shard directories
+      ReplicationShardDirectoryManager shardManager =
+        fileTracker.getReplicationShardDirectoryManager();
+
+      // Create file with minimum timestamp
+      Path shardPath1 = shardManager.getShardDirectory(minFileTimestamp);
+      localFs.mkdirs(shardPath1);
+      Path file1 = new Path(shardPath1, minFileTimestamp + "_rs-1.plog");
+      localFs.create(file1, true).close();
+
+      // Create file with maximum timestamp
+      Path shardPath2 = shardManager.getShardDirectory(maxFileTimestamp);
+      localFs.mkdirs(shardPath2);
+      Path file2 = new Path(shardPath2, maxFileTimestamp + "_rs-2.plog");
+      localFs.create(file2, true).close();
+
+      // Create HAGroupStoreRecord for STANDBY state
+      HAGroupStoreRecord mockRecord =
+        new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
+          HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+          peerZkUrl, zkUrl, peerZkUrl, 0L);
+
+      // Calculate expected first round start time (minimum timestamp rounded 
down to nearest round
+      // start)
+      long expectedFirstRoundStart = 
shardManager.getNearestRoundStartTimestamp(minFileTimestamp);
+      long expectedFirstRoundEnd = expectedFirstRoundStart + roundTimeMills;
+
+      // After rewind, lastRoundProcessed will be set to previous round of 
firstRoundToProcess
+      // firstRoundToProcess = expectedFirstRoundStart to expectedFirstRoundEnd
+      // previous round = expectedFirstRoundStart - roundTimeMills to 
expectedFirstRoundStart
+      // Then getNextRoundToProcess will return the first round starting from
+      // expectedFirstRoundStart
+      // So the first SYNC round after rewind will be expectedFirstRoundStart 
to
+      // expectedFirstRoundEnd
+
+      // Set current time to allow processing 7 rounds total (2 in DEGRADED, 
then rewind, then 5 in
+      // SYNC)
+      // After rewind, we need time for 5 more rounds starting from 
expectedFirstRoundStart
+      // Round 7 (the 5th round after rewind) will end at 
expectedFirstRoundEnd + 4*roundTimeMills
+      // = expectedFirstRoundStart + roundTimeMills + 4*roundTimeMills = 
expectedFirstRoundStart +
+      // 5*roundTimeMills
+      // For round 7 to be processable: currentTime - (expectedFirstRoundEnd + 
4*roundTimeMills) >=
+      // roundTimeMills + bufferMillis
+      // So: currentTime >= expectedFirstRoundStart + 5*roundTimeMills + 
bufferMillis
+      // Set it slightly above to ensure round 7 is processable
+      long currentTime = expectedFirstRoundStart + (5 * roundTimeMills) + 
bufferMillis + 1000;
+      EnvironmentEdge edge = () -> currentTime;
+      EnvironmentEdgeManager.injectEdge(edge);
+
+      // Calculate expected rounds
+      // Rounds 1-2: In DEGRADED state
+      // Round 1: expectedFirstRoundStart to expectedFirstRoundEnd (DEGRADED)
+      // Round 2: expectedFirstRoundEnd to expectedFirstRoundEnd + 
roundTimeMills (DEGRADED,
+      // transitions to SYNCED_RECOVERY at end)
+      // After round 2, state becomes SYNCED_RECOVERY and causes rewind
+      // - getFirstRoundToProcess returns: expectedFirstRoundStart to 
expectedFirstRoundEnd
+      // - lastRoundProcessed is set to: expectedFirstRoundStart - 
roundTimeMills to
+      // expectedFirstRoundStart
+      // Rounds 3-7: In SYNC state (starting from first round based on minimum 
file timestamp)
+      // Round 3: expectedFirstRoundStart to expectedFirstRoundEnd (SYNC, 
after rewind - first
+      // round)
+      // Round 4: expectedFirstRoundEnd to expectedFirstRoundEnd + 
roundTimeMills (SYNC)
+      // Round 5: expectedFirstRoundEnd + roundTimeMills to 
expectedFirstRoundEnd + 2*roundTimeMills
+      // (SYNC)
+      // Round 6: expectedFirstRoundEnd + 2*roundTimeMills to 
expectedFirstRoundEnd +
+      // 3*roundTimeMills (SYNC)
+      // Round 7: expectedFirstRoundEnd + 3*roundTimeMills to 
expectedFirstRoundEnd +
+      // 4*roundTimeMills (SYNC)
+      int expectedRoundCount = 7;
+      int degradedRoundCount = 2; // First 2 rounds are in DEGRADED state
+      int syncRoundCount = 5; // Last 5 rounds are in SYNC state (after rewind)
+      long[] expectedRoundStarts = new long[expectedRoundCount];
+      long[] expectedRoundEnds = new long[expectedRoundCount];
+
+      // Rounds 1-2: DEGRADED
+      expectedRoundStarts[0] = expectedFirstRoundStart;
+      expectedRoundEnds[0] = expectedFirstRoundEnd;
+      expectedRoundStarts[1] = expectedFirstRoundEnd;
+      expectedRoundEnds[1] = expectedFirstRoundEnd + roundTimeMills;
+
+      // Rounds 3-7: SYNC (after rewind, starting from expectedFirstRoundStart 
- the first round
+      // based on minimum file timestamp)
+      expectedRoundStarts[2] = expectedFirstRoundStart; // Round 3: first 
round after rewind
+      expectedRoundEnds[2] = expectedFirstRoundEnd;
+      for (int i = 3; i < expectedRoundCount; i++) {
+        expectedRoundStarts[i] = expectedFirstRoundEnd + ((i - 3) * 
roundTimeMills);
+        expectedRoundEnds[i] = expectedRoundStarts[i] + roundTimeMills;
+      }
+
+      try {
+        // Create discovery that transitions from DEGRADED to SYNCED_RECOVERY 
after 2 rounds
+        // SYNCED_RECOVERY triggers a rewind, then transitions to SYNC
+        TestableReplicationLogDiscoveryReplay discovery =
+          new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord) {
+            private int roundCount = 0;
+
+            @Override
+            protected void processRound(ReplicationRound replicationRound) 
throws IOException {
+              super.processRound(replicationRound);
+              roundCount++;
+
+              // Transition to SYNCED_RECOVERY after 2 rounds (after 
processing round 2)
+              // This triggers a rewind similar to when coming back from 
DEGRADED to SYNC
+              if (roundCount == degradedRoundCount) {
+                setReplicationReplayState(
+                  
ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY);
+              }
+            }
+          };
+
+        // Set lastRoundInSync with start time 0
+        ReplicationRound lastRoundInSyncWithZeroStart = new 
ReplicationRound(0L, roundTimeMills);
+        discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
+
+        // Set lastRoundProcessed to some initial value
+        ReplicationRound initialLastRoundProcessed = new ReplicationRound(0L, 
roundTimeMills);
+        discovery.setLastRoundProcessed(initialLastRoundProcessed);
+        discovery
+          
.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED);
+
+        // Verify initial state
+        ReplicationRound lastRoundInSyncBefore = 
discovery.getLastRoundInSync();
+        assertEquals("lastRoundInSync should have start time 0 before replay", 
0L,
+          lastRoundInSyncBefore.getStartTime());
+        assertEquals("Initial state should be DEGRADED",
+          ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED,
+          discovery.getReplicationReplayState());
+
+        // Call replay - should start from minimum timestamp of new files
+        discovery.replay();
+
+        // Verify exact count of processRound calls
+        int actualProcessRoundCallCount = discovery.getProcessRoundCallCount();
+        assertEquals("processRound should be called exactly " + 
expectedRoundCount + " times",
+          expectedRoundCount, actualProcessRoundCallCount);
+
+        // Get all processed rounds
+        List<ReplicationRound> processedRounds = 
discovery.getProcessedRounds();
+        assertEquals("Should have processed exactly " + expectedRoundCount + " 
rounds",
+          expectedRoundCount, processedRounds.size());
+
+        // Verify each round's exact parameters
+        for (int i = 0; i < expectedRoundCount; i++) {
+          ReplicationRound actualRound = processedRounds.get(i);
+          assertEquals("Round " + (i + 1) + " should have correct start time",
+            expectedRoundStarts[i], actualRound.getStartTime());
+          assertEquals("Round " + (i + 1) + " should have correct end time", 
expectedRoundEnds[i],
+            actualRound.getEndTime());
+        }
+
+        // Verify lastRoundProcessed was updated to the last processed round
+        ReplicationRound lastRoundAfterReplay = 
discovery.getLastRoundProcessed();
+        assertNotNull("Last round processed should not be null", 
lastRoundAfterReplay);
+        assertEquals("Last round processed should match the last processed 
round start time",
+          expectedRoundStarts[expectedRoundCount - 1], 
lastRoundAfterReplay.getStartTime());
+        assertEquals("Last round processed should match the last processed 
round end time",
+          expectedRoundEnds[expectedRoundCount - 1], 
lastRoundAfterReplay.getEndTime());
+
+        // Verify lastRoundInSync behavior:
+        // - During first 2 rounds (DEGRADED), lastRoundInSync should NOT be 
updated (remains with
+        // start time 0)
+        // - After transition to SYNCED_RECOVERY and then SYNC (rounds 3-7), 
lastRoundInSync should
+        // be updated
+        // - Final lastRoundInSync should match lastRoundProcessed (because 
we're in SYNC state at
+        // the end)
+        ReplicationRound lastRoundInSyncAfter = discovery.getLastRoundInSync();
+        assertNotNull("Last round in sync should not be null", 
lastRoundInSyncAfter);
+
+        // After processing, we're in SYNC state, so lastRoundInSync should be 
updated
+        // It should match the last processed round because state is SYNC
+        assertEquals("Last round in sync should match last round processed in 
SYNC state",
+          lastRoundAfterReplay, lastRoundInSyncAfter);
+        assertEquals("Last round in sync should have correct start time",
+          expectedRoundStarts[expectedRoundCount - 1], 
lastRoundInSyncAfter.getStartTime());
+        assertEquals("Last round in sync should have correct end time",
+          expectedRoundEnds[expectedRoundCount - 1], 
lastRoundInSyncAfter.getEndTime());
+        assertTrue("Last round in sync start time should not be 0 after 
processing in SYNC state",
+          lastRoundInSyncAfter.getStartTime() > 0);
+
+        // Verify state transitioned to SYNC
+        assertEquals("State should be SYNC after transition",
+          ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC,
+          discovery.getReplicationReplayState());
+
+      } finally {
+        EnvironmentEdgeManager.reset();
+      }
+    } finally {
+      fileTracker.close();
+    }
+  }
+
   /**
    * Tests replay method when failoverPending becomes true during processing 
and triggers failover
    * after all rounds. Validates that triggerFailover is called exactly once 
when all conditions are
@@ -1423,7 +1816,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
 
   /**
    * Tests the shouldTriggerFailover method with various combinations of 
failoverPending,
-   * lastRoundInSync, lastRoundProcessed and in-progress files state.
+   * in-progress files, and new files for next round.
    */
   @Test
   public void testShouldTriggerFailover() throws IOException {
@@ -1445,11 +1838,13 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
     try {
       // Create test rounds
       ReplicationRound testRound = new ReplicationRound(1704153600000L, 
1704153660000L);
-      ReplicationRound differentRound = new ReplicationRound(1704153540000L, 
1704153600000L);
+      ReplicationRound nextRound =
+        tracker.getReplicationShardDirectoryManager().getNextRound(testRound);
 
       // Test Case 1: All conditions true - should return true
       {
         when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+        
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
         discovery.setLastRoundInSync(testRound);
@@ -1463,6 +1858,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       // Test Case 2: failoverPending is false - should return false
       {
         when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+        
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
         discovery.setLastRoundInSync(testRound);
@@ -1473,22 +1869,25 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
           discovery.shouldTriggerFailover());
       }
 
-      // Test Case 3: lastRoundInSync not equals lastRoundProcessed - should 
return false
+      // Test Case 3: in-progress files not empty - should return false
       {
-        when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+        when(tracker.getInProgressFiles())
+          .thenReturn(Collections.singletonList(new Path("test.plog")));
+        
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
         discovery.setLastRoundInSync(testRound);
-        discovery.setLastRoundProcessed(differentRound);
+        discovery.setLastRoundProcessed(testRound);
         discovery.setFailoverPending(true);
 
-        assertFalse("Should not trigger failover when lastRoundInSync != 
lastRoundProcessed",
+        assertFalse("Should not trigger failover when in-progress files are 
not empty",
           discovery.shouldTriggerFailover());
       }
 
-      // Test Case 4: in-progress files not empty - should return false
+      // Test Case 4: new files exist for next round - should return false
       {
-        when(tracker.getInProgressFiles())
+        when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+        when(tracker.getNewFilesForRound(nextRound))
           .thenReturn(Collections.singletonList(new Path("test.plog")));
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
@@ -1496,27 +1895,30 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
         discovery.setLastRoundProcessed(testRound);
         discovery.setFailoverPending(true);
 
-        assertFalse("Should not trigger failover when in-progress files are 
not empty",
+        assertFalse("Should not trigger failover when new files exist for next 
round",
           discovery.shouldTriggerFailover());
       }
 
-      // Test Case 5: failoverPending false AND lastRoundInSync != 
lastRoundProcessed - should
-      // return false
+      // Test Case 5: failoverPending false AND in-progress files not empty - 
should return false
       {
-        when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+        when(tracker.getInProgressFiles())
+          .thenReturn(Collections.singletonList(new Path("test.plog")));
+        
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
         discovery.setLastRoundInSync(testRound);
-        discovery.setLastRoundProcessed(differentRound);
+        discovery.setLastRoundProcessed(testRound);
         discovery.setFailoverPending(false);
 
-        assertFalse("Should not trigger failover when failoverPending is false 
and rounds differ",
+        assertFalse(
+          "Should not trigger failover when failoverPending is false and 
in-progress files exist",
           discovery.shouldTriggerFailover());
       }
 
-      // Test Case 6: failoverPending false AND in-progress files not empty - 
should return false
+      // Test Case 6: failoverPending false AND new files exist - should 
return false
       {
-        when(tracker.getInProgressFiles())
+        when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+        when(tracker.getNewFilesForRound(nextRound))
           .thenReturn(Collections.singletonList(new Path("test.plog")));
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
@@ -1524,22 +1926,23 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
         discovery.setLastRoundProcessed(testRound);
         discovery.setFailoverPending(false);
 
-        assertFalse("Should not trigger failover when failoverPending is false 
and files exist",
+        assertFalse("Should not trigger failover when failoverPending is false 
and new files exist",
           discovery.shouldTriggerFailover());
       }
 
-      // Test Case 7: lastRoundInSync != lastRoundProcessed AND in-progress 
files not empty - should
-      // return false
+      // Test Case 7: in-progress files not empty AND new files exist - should 
return false
       {
         when(tracker.getInProgressFiles())
-          .thenReturn(Collections.singletonList(new Path("test.plog")));
+          .thenReturn(Collections.singletonList(new Path("test1.plog")));
+        when(tracker.getNewFilesForRound(nextRound))
+          .thenReturn(Collections.singletonList(new Path("test2.plog")));
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
         discovery.setLastRoundInSync(testRound);
-        discovery.setLastRoundProcessed(differentRound);
+        discovery.setLastRoundProcessed(testRound);
         discovery.setFailoverPending(true);
 
-        assertFalse("Should not trigger failover when rounds differ and files 
exist",
+        assertFalse("Should not trigger failover when both in-progress and new 
files exist",
           discovery.shouldTriggerFailover());
       }
 
@@ -1547,10 +1950,12 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
       {
         when(tracker.getInProgressFiles())
           .thenReturn(Collections.singletonList(new Path("test.plog")));
+        when(tracker.getNewFilesForRound(nextRound))
+          .thenReturn(Collections.singletonList(new Path("test2.plog")));
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(tracker, 
haGroupStoreRecord);
         discovery.setLastRoundInSync(testRound);
-        discovery.setLastRoundProcessed(differentRound);
+        discovery.setLastRoundProcessed(testRound);
         discovery.setFailoverPending(false);
 
         assertFalse("Should not trigger failover when all conditions are 
false",
@@ -1562,6 +1967,142 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
     }
   }
 
+  /**
+   * Tests the triggerFailover method to verify it properly updates cluster 
state and handles
+   * exceptions correctly.
+   */
+  @Test
+  public void testTriggerFailover() throws IOException, SQLException {
+    final String haGroupName = "testTriggerFailoverHAGroup";
+    // Set up HA group store record with STANDBY_TO_ACTIVE state for 
successful transition
+    HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, zkUrl, 
peerZkUrl,
+      CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+      ClusterRoleRecord.ClusterRole.STANDBY_TO_ACTIVE,
+      ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, null);
+
+    TestableReplicationLogTracker fileTracker = null;
+    try {
+      fileTracker = createReplicationLogTracker(config, haGroupName, localFs, 
standbyUri);
+
+      // Create a spy of ReplicationLogDiscoveryReplay to test actual 
implementation
+      ReplicationLogDiscoveryReplay discovery =
+        Mockito.spy(new ReplicationLogDiscoveryReplay(fileTracker));
+
+      // Initialize the discovery
+      discovery.init();
+
+      // Set failoverPending to true
+      discovery.setFailoverPending(true);
+      assertTrue("failoverPending should be true before triggerFailover",
+        discovery.getFailoverPending());
+
+      // Verify initial state is STANDBY_TO_ACTIVE
+      Optional<HAGroupStoreRecord> recordBefore =
+        
HAGroupStoreManager.getInstance(config).getHAGroupStoreRecord(haGroupName);
+      assertTrue("HA group record should exist", recordBefore.isPresent());
+      assertEquals("Initial state should be STANDBY_TO_ACTIVE",
+        HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE, 
recordBefore.get().getHAGroupState());
+
+      // Call triggerFailover - should succeed and update cluster state to 
ACTIVE_IN_SYNC
+      discovery.triggerFailover();
+
+      // Verify triggerFailover was called once
+      Mockito.verify(discovery, Mockito.times(1)).triggerFailover();
+
+      // Verify cluster state is updated to ACTIVE_IN_SYNC after failover
+      Optional<HAGroupStoreRecord> recordAfter =
+        
HAGroupStoreManager.getInstance(config).getHAGroupStoreRecord(haGroupName);
+      assertTrue("HA group record should exist after failover", 
recordAfter.isPresent());
+      assertEquals("Cluster state should be ACTIVE_IN_SYNC after successful 
failover",
+        HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, 
recordAfter.get().getHAGroupState());
+
+      // Verify failoverPending is set to false after successful failover
+      assertFalse("failoverPending should be set to false after successful 
triggerFailover",
+        discovery.getFailoverPending());
+
+    } finally {
+      // Clean up
+      if (fileTracker != null) {
+        fileTracker.close();
+      }
+      try {
+        HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, 
zkUrl);
+      } catch (Exception e) {
+        LOG.warn("Failed to clean up HA group store record", e);
+      }
+    }
+  }
+
+  /**
+   * Tests triggerFailover when
+   * HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName) 
throws
+   * InvalidClusterRoleTransitionException. Verifies that failoverPending is 
set to false even when
+   * the exception occurs. This test sets up the HA group in STANDBY state 
instead of
+   * STANDBY_TO_ACTIVE. Calling setHAGroupStatusToSync from STANDBY state will 
throw
+   * InvalidClusterRoleTransitionException because the transition from STANDBY 
directly to
+   * ACTIVE_IN_SYNC is not allowed.
+   */
+  @Test
+  public void 
testTriggerFailover_InvalidClusterRoleTransitionExceptionFromHAGroupStoreManager()
+    throws IOException, SQLException {
+    final String haGroupName = "testTriggerFailoverInvalidTransitionHAGroup";
+
+    // Set up HA group store record in STANDBY state (not STANDBY_TO_ACTIVE)
+    // This will cause setHAGroupStatusToSync to throw 
InvalidClusterRoleTransitionException
+    // because transitioning from STANDBY directly to ACTIVE_IN_SYNC is invalid
+    HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, zkUrl, 
peerZkUrl,
+      CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+      ClusterRoleRecord.ClusterRole.STANDBY, 
ClusterRoleRecord.ClusterRole.ACTIVE, null);
+
+    TestableReplicationLogTracker fileTracker = null;
+    try {
+      fileTracker = createReplicationLogTracker(config, haGroupName, localFs, 
standbyUri);
+
+      // Create a spy of ReplicationLogDiscoveryReplay to test actual 
implementation
+      ReplicationLogDiscoveryReplay discovery =
+        Mockito.spy(new ReplicationLogDiscoveryReplay(fileTracker));
+
+      // Initialize the discovery
+      discovery.init();
+
+      // Set failoverPending to true
+      discovery.setFailoverPending(true);
+      assertTrue("failoverPending should be true before triggerFailover",
+        discovery.getFailoverPending());
+
+      // Verify initial state is STANDBY (not STANDBY_TO_ACTIVE)
+      Optional<HAGroupStoreRecord> recordBefore =
+        
HAGroupStoreManager.getInstance(config).getHAGroupStoreRecord(haGroupName);
+      assertTrue("HA group record should exist", recordBefore.isPresent());
+      assertEquals("Initial state should be DEGRADED_STANDBY",
+        HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, 
recordBefore.get().getHAGroupState());
+
+      // Call triggerFailover - should handle 
InvalidClusterRoleTransitionException
+      // because transitioning from DEGRADED_STANDBY directly to 
ACTIVE_IN_SYNC is invalid
+      discovery.triggerFailover();
+
+      // Verify triggerFailover was called (implicitly through spy)
+      Mockito.verify(discovery, Mockito.times(1)).triggerFailover();
+
+      // Verify failoverPending is set to false even when 
InvalidClusterRoleTransitionException
+      // occurs
+      assertFalse(
+        "failoverPending should be set to false when 
InvalidClusterRoleTransitionException occurs",
+        discovery.getFailoverPending());
+
+    } finally {
+      // Clean up
+      if (fileTracker != null) {
+        fileTracker.close();
+      }
+      try {
+        HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, 
zkUrl);
+      } catch (Exception e) {
+        LOG.warn("Failed to clean up HA group store record", e);
+      }
+    }
+  }
+
   /**
    * Testable implementation of ReplicationLogDiscoveryReplay for unit 
testing. Provides dependency
    * injection for HAGroupStoreRecord, tracks processed rounds, and supports 
simulating state
@@ -1632,9 +2173,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
     protected void triggerFailover() {
       // Track calls to triggerFailover for validation
       triggerFailoverCallCount++;
-
-      // Simulate the real behavior: set failoverPending to false after 
triggering failover
-      setFailoverPending(false);
+      super.triggerFailover();
     }
 
     public int getTriggerFailoverCallCount() {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index 22075604ee..8a985f3ead 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -29,7 +29,15 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
index 9bb27e356f..189b7f0ba5 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
@@ -237,7 +237,7 @@ public class ReplicationShardDirectoryManagerTest {
 
     // Test 7: Multiple rounds later
     long multipleRoundsLater = dayStart + (300 * 1000L); // 00:05:00 (5 
minutes)
-    long expectedRoundStart = dayStart + (300 * 1000L); // Should be exact
+    long expectedRoundStart = dayStart + (300 * 1000L); // Should be exac
     long result7 = manager.getNearestRoundStartTimestamp(multipleRoundsLater);
     assertEquals("Multiple rounds later should return exact round start", 
expectedRoundStart,
       result7);
@@ -410,7 +410,7 @@ public class ReplicationShardDirectoryManagerTest {
 
   @Test
   public void testGetReplicationRoundFromStartTimeConsistency() {
-    // Test that the same input always produces the same output
+    // Test that the same input always produces the same outpu
     long timestamp = 1704110400000L; // 2024-01-01 12:00:00 UTC
 
     ReplicationRound result1 = 
manager.getReplicationRoundFromStartTime(timestamp);
@@ -593,7 +593,7 @@ public class ReplicationShardDirectoryManagerTest {
 
   @Test
   public void testGetReplicationRoundFromEndTimeConsistency() {
-    // Test that the same input always produces the same output
+    // Test that the same input always produces the same outpu
     long timestamp = 1704110400000L; // 2024-01-01 12:00:00 UTC
 
     ReplicationRound result1 = 
manager.getReplicationRoundFromEndTime(timestamp);
@@ -654,4 +654,165 @@ public class ReplicationShardDirectoryManagerTest {
         shardPath.getName());
     }
   }
+
+  @Test
+  public void testGetPreviousRound() {
+    // Use a specific day for consistent testing
+    // 2024-01-01 00:00:00 UTC = 1704067200000L
+    long dayStart = 1704067200000L; // 2024-01-01 00:00:00 UTC
+
+    // Default configuration: 60-second rounds
+    long roundDurationMs = 60 * 1000L; // 60 seconds in milliseconds
+
+    // Test 1: First round (00:00:00 to 00:01:00)
+    long firstRoundStart = dayStart; // 00:00:00
+    long firstRoundEnd = firstRoundStart + roundDurationMs; // 00:01:00
+    ReplicationRound firstRound = new ReplicationRound(firstRoundStart, 
firstRoundEnd);
+    ReplicationRound previousRound = manager.getPreviousRound(firstRound);
+
+    // Previous round should end at firstRoundStart (00:00:00), which rounds 
down to 00:00:00
+    // Start time should be end time - round duration = 00:00:00 - 60s = -60s 
(edge case)
+    long expectedEnd = dayStart; // 00:00:00
+    long expectedStart = expectedEnd - roundDurationMs; // -60 seconds
+    assertEquals("Previous round end time should be start time of current 
round rounded down",
+      expectedEnd, previousRound.getEndTime());
+    assertEquals("Previous round start time should be end time - round 
duration", expectedStart,
+      previousRound.getStartTime());
+
+    // Test 2: Second round (00:01:00 to 00:02:00)
+    long secondRoundStart = dayStart + roundDurationMs; // 00:01:00
+    long secondRoundEnd = secondRoundStart + roundDurationMs; // 00:02:00
+    ReplicationRound secondRound = new ReplicationRound(secondRoundStart, 
secondRoundEnd);
+    ReplicationRound previousRound2 = manager.getPreviousRound(secondRound);
+
+    // Previous round should end at secondRoundStart (00:01:00), which rounds 
down to 00:01:00
+    // Start time should be 00:01:00 - 60s = 00:00:00
+    long expectedEnd2 = secondRoundStart; // 00:01:00
+    long expectedStart2 = expectedEnd2 - roundDurationMs; // 00:00:00
+    assertEquals("Previous round end time should be start time of current 
round rounded down",
+      expectedEnd2, previousRound2.getEndTime());
+    assertEquals("Previous round start time should be end time - round 
duration", expectedStart2,
+      previousRound2.getStartTime());
+
+    // Test 3: Round with mid-timestamp start (00:01:30 would round to 
00:01:00)
+    long midRoundStart = dayStart + (90 * 1000L); // 00:01:30 (not aligned to 
round boundary)
+    long midRoundEnd = midRoundStart + roundDurationMs; // 00:02:30
+    ReplicationRound midRound = new ReplicationRound(midRoundStart, 
midRoundEnd);
+    ReplicationRound previousRound3 = manager.getPreviousRound(midRound);
+
+    // Previous round should end at midRoundStart rounded down = 00:01:00
+    // Start time should be 00:01:00 - 60s = 00:00:00
+    long expectedEnd3 = dayStart + roundDurationMs; // 00:01:00 (rounded down 
from 00:01:30)
+    long expectedStart3 = expectedEnd3 - roundDurationMs; // 00:00:00
+    assertEquals("Previous round end time should round down start time of 
current round",
+      expectedEnd3, previousRound3.getEndTime());
+    assertEquals("Previous round start time should be end time - round 
duration", expectedStart3,
+      previousRound3.getStartTime());
+
+    // Test 4: Multiple rounds later (00:05:00 to 00:06:00)
+    long multipleRoundsStart = dayStart + (5 * roundDurationMs); // 00:05:00
+    long multipleRoundsEnd = multipleRoundsStart + roundDurationMs; // 00:06:00
+    ReplicationRound multipleRounds = new 
ReplicationRound(multipleRoundsStart, multipleRoundsEnd);
+    ReplicationRound previousRound4 = manager.getPreviousRound(multipleRounds);
+
+    // Previous round should end at multipleRoundsStart = 00:05:00
+    // Start time should be 00:05:00 - 60s = 00:04:00
+    long expectedEnd4 = multipleRoundsStart; // 00:05:00
+    long expectedStart4 = expectedEnd4 - roundDurationMs; // 00:04:00
+    assertEquals("Previous round end time should be start time of current 
round", expectedEnd4,
+      previousRound4.getEndTime());
+    assertEquals("Previous round start time should be end time - round 
duration", expectedStart4,
+      previousRound4.getStartTime());
+
+    // Test 5: Verify round duration is consistent
+    long previousRoundDuration = previousRound4.getEndTime() - 
previousRound4.getStartTime();
+    assertEquals("Previous round duration should be 60 seconds", 
roundDurationMs,
+      previousRoundDuration);
+  }
+
+  @Test
+  public void testGetNextRound() {
+    // Use a specific day for consistent testing
+    // 2024-01-01 00:00:00 UTC = 1704067200000L
+    long dayStart = 1704067200000L; // 2024-01-01 00:00:00 UTC
+
+    // Default configuration: 60-second rounds
+    long roundDurationMs = 60 * 1000L; // 60 seconds in milliseconds
+
+    // Test 1: First round (00:00:00 to 00:01:00)
+    long firstRoundStart = dayStart; // 00:00:00
+    long firstRoundEnd = firstRoundStart + roundDurationMs; // 00:01:00
+    ReplicationRound firstRound = new ReplicationRound(firstRoundStart, 
firstRoundEnd);
+    ReplicationRound nextRound = manager.getNextRound(firstRound);
+
+    // Next round should start at firstRoundEnd (00:01:00), which rounds down 
to 00:01:00
+    // End time should be 00:01:00 + 60s = 00:02:00
+    long expectedStart = firstRoundEnd; // 00:01:00
+    long expectedEnd = expectedStart + roundDurationMs; // 00:02:00
+    assertEquals("Next round start time should be end time of current round 
rounded down",
+      expectedStart, nextRound.getStartTime());
+    assertEquals("Next round end time should be start time + round duration", 
expectedEnd,
+      nextRound.getEndTime());
+
+    // Test 2: Second round (00:01:00 to 00:02:00)
+    long secondRoundStart = dayStart + roundDurationMs; // 00:01:00
+    long secondRoundEnd = secondRoundStart + roundDurationMs; // 00:02:00
+    ReplicationRound secondRound = new ReplicationRound(secondRoundStart, 
secondRoundEnd);
+    ReplicationRound nextRound2 = manager.getNextRound(secondRound);
+
+    // Next round should start at secondRoundEnd (00:02:00), which rounds down 
to 00:02:00
+    // End time should be 00:02:00 + 60s = 00:03:00
+    long expectedStart2 = secondRoundEnd; // 00:02:00
+    long expectedEnd2 = expectedStart2 + roundDurationMs; // 00:03:00
+    assertEquals("Next round start time should be end time of current round 
rounded down",
+      expectedStart2, nextRound2.getStartTime());
+    assertEquals("Next round end time should be start time + round duration", 
expectedEnd2,
+      nextRound2.getEndTime());
+
+    // Test 3: Round with mid-timestamp end (00:02:30 would round to 00:02:00)
+    long midRoundStart = dayStart + (120 * 1000L); // 00:02:00
+    long midRoundEnd = dayStart + (150 * 1000L); // 00:02:30 (not aligned to 
round boundary)
+    ReplicationRound midRound = new ReplicationRound(midRoundStart, 
midRoundEnd);
+    ReplicationRound nextRound3 = manager.getNextRound(midRound);
+
+    // Next round should start at midRoundEnd rounded down = 00:02:00
+    // End time should be 00:02:00 + 60s = 00:03:00
+    long expectedStart3 = dayStart + (120 * 1000L); // 00:02:00 (rounded down 
from 00:02:30)
+    long expectedEnd3 = expectedStart3 + roundDurationMs; // 00:03:00
+    assertEquals("Next round start time should round down end time of current 
round",
+      expectedStart3, nextRound3.getStartTime());
+    assertEquals("Next round end time should be start time + round duration", 
expectedEnd3,
+      nextRound3.getEndTime());
+
+    // Test 4: Multiple rounds later (00:05:00 to 00:06:00)
+    long multipleRoundsStart = dayStart + (5 * roundDurationMs); // 00:05:00
+    long multipleRoundsEnd = multipleRoundsStart + roundDurationMs; // 00:06:00
+    ReplicationRound multipleRounds = new 
ReplicationRound(multipleRoundsStart, multipleRoundsEnd);
+    ReplicationRound nextRound4 = manager.getNextRound(multipleRounds);
+
+    // Next round should start at multipleRoundsEnd = 00:06:00
+    // End time should be 00:06:00 + 60s = 00:07:00
+    long expectedStart4 = multipleRoundsEnd; // 00:06:00
+    long expectedEnd4 = expectedStart4 + roundDurationMs; // 00:07:00
+    assertEquals("Next round start time should be end time of current round", 
expectedStart4,
+      nextRound4.getStartTime());
+    assertEquals("Next round end time should be start time + round duration", 
expectedEnd4,
+      nextRound4.getEndTime());
+
+    // Test 5: Verify round duration is consistent
+    long nextRoundDuration = nextRound4.getEndTime() - 
nextRound4.getStartTime();
+    assertEquals("Next round duration should be 60 seconds", roundDurationMs, 
nextRoundDuration);
+
+    // Test 6: Verify continuity - next round of previous round should equal 
original round
+    ReplicationRound originalRound =
+      new ReplicationRound(dayStart + (3 * roundDurationMs), dayStart + (4 * 
roundDurationMs)); // 00:03:00
+                                                                               
                 // to
+                                                                               
                 // 00:04:00
+    ReplicationRound prevRound = manager.getPreviousRound(originalRound);
+    ReplicationRound nextOfPrev = manager.getNextRound(prevRound);
+    assertEquals("Next round of previous round should equal original round 
start time",
+      originalRound.getStartTime(), nextOfPrev.getStartTime());
+    assertEquals("Next round of previous round should equal original round end 
time",
+      originalRound.getEndTime(), nextOfPrev.getEndTime());
+  }
 }

Reply via email to