This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 37de6ecbf3 Phoenix-7672 Adding failover implementation via replay
(Addendum) (#2351)
37de6ecbf3 is described below
commit 37de6ecbf366371793b583af57b6238a474c0034
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Fri Jan 23 04:29:25 2026 +0530
Phoenix-7672 Adding failover implementation via replay (Addendum) (#2351)
---
.../replication/ReplicationLogDiscovery.java | 50 +-
.../phoenix/replication/ReplicationLogTracker.java | 2 +-
.../phoenix/replication/ReplicationRound.java | 3 -
.../ReplicationShardDirectoryManager.java | 13 +-
.../reader/ReplicationLogDiscoveryReplay.java | 70 ++-
.../reader/ReplicationLogProcessor.java | 2 +
.../replication/reader/ReplicationLogReplay.java | 2 +
.../reader/ReplicationLogReplayService.java | 2 +
.../ReplicationLogDiscoveryReplayTestIT.java | 595 ++++++++++++++++++++-
.../reader/ReplicationLogProcessorTestIT.java | 10 +-
.../ReplicationShardDirectoryManagerTest.java | 167 +++++-
11 files changed, 837 insertions(+), 79 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 1123c20564..37ef9084c7 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -103,6 +103,7 @@ public abstract class ReplicationLogDiscovery {
}
public void init() throws IOException {
+ LOG.info("Initializing ReplicationLogDiscovery for haGroup: {}",
haGroupName);
initializeLastRoundProcessed();
this.metrics = createMetricsSource();
}
@@ -122,7 +123,7 @@ public abstract class ReplicationLogDiscovery {
public void start() throws IOException {
synchronized (this) {
if (isRunning) {
- LOG.warn("ReplicationLogDiscovery is already running for group: {}",
haGroupName);
+ LOG.warn("ReplicationLogDiscovery is already running for haGroup: {}",
haGroupName);
return;
}
// Initialize and schedule the executors
@@ -137,7 +138,7 @@ public abstract class ReplicationLogDiscovery {
}, 0, getReplayIntervalSeconds(), TimeUnit.SECONDS);
isRunning = true;
- LOG.info("ReplicationLogDiscovery started for group: {}", haGroupName);
+ LOG.info("ReplicationLogDiscovery started for haGroup: {}", haGroupName);
}
}
@@ -151,7 +152,7 @@ public abstract class ReplicationLogDiscovery {
synchronized (this) {
if (!isRunning) {
- LOG.warn("ReplicationLogDiscovery is not running for group: {}",
haGroupName);
+ LOG.warn("ReplicationLogDiscovery is not running for haGroup: {}",
haGroupName);
return;
}
@@ -171,7 +172,7 @@ public abstract class ReplicationLogDiscovery {
}
}
- LOG.info("ReplicationLogDiscovery stopped for group: {}", haGroupName);
+ LOG.info("ReplicationLogDiscovery stopped for haGroup: {}", haGroupName);
}
/**
@@ -190,8 +191,8 @@ public abstract class ReplicationLogDiscovery {
try {
processRound(replicationRound);
} catch (IOException e) {
- LOG.error("Failed processing replication round {}. Will retry in next
" + "scheduled run.",
- replicationRound, e);
+ LOG.error("Failed processing replication round {} for haGroup {}. Will
retry"
+ + "in next scheduled run.", replicationRound, haGroupName, e);
break; // stop this run, retry later
}
setLastRoundProcessed(replicationRound);
@@ -223,7 +224,7 @@ public abstract class ReplicationLogDiscovery {
* @throws IOException if there's an error during round processing
*/
protected void processRound(ReplicationRound replicationRound) throws
IOException {
- LOG.info("Starting to process round: {}", replicationRound);
+ LOG.info("Starting to process round: {} for haGroup: {}",
replicationRound, haGroupName);
// Increment the number of rounds processed
getMetrics().incrementNumRoundsProcessed();
@@ -233,7 +234,7 @@ public abstract class ReplicationLogDiscovery {
// Conditionally process the in progress files for the round
processInProgressDirectory();
}
- LOG.info("Finished processing round: {}", replicationRound);
+ LOG.info("Finished processing round: {} for haGroup: {}",
replicationRound, haGroupName);
}
/**
@@ -253,7 +254,8 @@ public abstract class ReplicationLogDiscovery {
* @throws IOException if there's an error during file processing
*/
protected void processNewFilesForRound(ReplicationRound replicationRound)
throws IOException {
- LOG.info("Starting new files processing for round: {}", replicationRound);
+ LOG.info("Starting new files processing for round: {} for haGroup: {}",
replicationRound,
+ haGroupName);
long startTime = EnvironmentEdgeManager.currentTime();
List<Path> files =
replicationLogTracker.getNewFilesForRound(replicationRound);
LOG.info("Number of new files for round {} is {}", replicationRound,
files.size());
@@ -262,7 +264,8 @@ public abstract class ReplicationLogDiscovery {
files = replicationLogTracker.getNewFilesForRound(replicationRound);
}
long duration = EnvironmentEdgeManager.currentTime() - startTime;
- LOG.info("Finished new files processing for round: {} in {}ms",
replicationRound, duration);
+ LOG.info("Finished new files processing for round: {} in {}ms for haGroup:
{}",
+ replicationRound, duration, haGroupName);
getMetrics().updateTimeToProcessNewFiles(duration);
}
@@ -272,24 +275,25 @@ public abstract class ReplicationLogDiscovery {
* @throws IOException if there's an error during file processing
*/
protected void processInProgressDirectory() throws IOException {
+ LOG.info("Starting {} directory processing for haGroup: {}",
+ replicationLogTracker.getInProgressLogSubDirectoryName(), haGroupName);
// Increase the count for number of times in progress directory is
processed
getMetrics().incrementNumInProgressDirectoryProcessed();
- LOG.info("Starting {} directory processing",
- replicationLogTracker.getInProgressLogSubDirectoryName());
long startTime = EnvironmentEdgeManager.currentTime();
long oldestTimestampToProcess =
replicationLogTracker.getReplicationShardDirectoryManager().getNearestRoundStartTimestamp(
EnvironmentEdgeManager.currentTime()) - getReplayIntervalSeconds() *
1000L;
List<Path> files =
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
- LOG.info("Number of {} files with oldestTimestampToProcess {} is {}",
+ LOG.info("Number of {} files with oldestTimestampToProcess {} is {} for
haGroup: {}",
replicationLogTracker.getInProgressLogSubDirectoryName(),
oldestTimestampToProcess,
- files.size());
+ files.size(), haGroupName);
while (!files.isEmpty()) {
processOneRandomFile(files);
files =
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
}
long duration = EnvironmentEdgeManager.currentTime() - startTime;
- LOG.info("Finished in-progress files processing in {}ms", duration);
+ LOG.info("Finished in-progress files processing in {}ms for haGroup: {}",
duration,
+ haGroupName);
getMetrics().updateTimeToProcessInProgressFiles(duration);
}
@@ -339,21 +343,27 @@ public abstract class ReplicationLogDiscovery {
protected void initializeLastRoundProcessed() throws IOException {
Optional<Long> minTimestampFromInProgressFiles =
getMinTimestampFromInProgressFiles();
if (minTimestampFromInProgressFiles.isPresent()) {
- LOG.info("Initializing lastRoundProcessed from {} files with minimum " +
"timestamp as {}",
- replicationLogTracker.getInProgressLogSubDirectoryName(),
+ LOG.info(
+ "Initializing lastRoundProcessed for haGroup: {} from {} files with
minimum "
+ + "timestamp as {}",
+ haGroupName, replicationLogTracker.getInProgressLogSubDirectoryName(),
minTimestampFromInProgressFiles.get());
this.lastRoundProcessed =
replicationLogTracker.getReplicationShardDirectoryManager()
.getReplicationRoundFromEndTime(minTimestampFromInProgressFiles.get());
} else {
Optional<Long> minTimestampFromNewFiles = getMinTimestampFromNewFiles();
if (minTimestampFromNewFiles.isPresent()) {
- LOG.info("Initializing lastRoundProcessed from {} files with minimum
timestamp " + "as {}",
- replicationLogTracker.getInSubDirectoryName(),
minTimestampFromNewFiles.get());
+ LOG.info(
+ "Initializing lastRoundProcessed for haGroup: {} from {}"
+ + "files with minimum timestamp as {}",
+ haGroupName, replicationLogTracker.getInSubDirectoryName(),
+ minTimestampFromNewFiles.get());
this.lastRoundProcessed =
replicationLogTracker.getReplicationShardDirectoryManager()
.getReplicationRoundFromEndTime(minTimestampFromNewFiles.get());
} else {
long currentTime = EnvironmentEdgeManager.currentTime();
- LOG.info("Initializing lastRoundProcessed from current time {}",
currentTime);
+ LOG.info("Initializing lastRoundProcessed for haGroup: {} from current
time {}",
+ haGroupName, currentTime);
this.lastRoundProcessed =
replicationLogTracker.getReplicationShardDirectoryManager()
.getReplicationRoundFromEndTime(EnvironmentEdgeManager.currentTime());
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
index de12f81fee..f7cb1f2cab 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
@@ -116,7 +116,7 @@ public class ReplicationLogTracker {
* @return List of valid log file paths that belong to the specified
replication round
* @throws IOException if there's an error accessing the file system
*/
- protected List<Path> getNewFilesForRound(ReplicationRound replicationRound)
throws IOException {
+ public List<Path> getNewFilesForRound(ReplicationRound replicationRound)
throws IOException {
Path roundDirectory =
replicationShardDirectoryManager.getShardDirectory(replicationRound);
LOG.info("Getting new files for round {} from shard {}", replicationRound,
roundDirectory);
if (!fileSystem.exists(roundDirectory)) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationRound.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationRound.java
index fe5d38143b..64dc3f1db8 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationRound.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationRound.java
@@ -19,8 +19,6 @@ package org.apache.phoenix.replication;
import java.util.Objects;
-import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
-
/**
* Represents a replication round with defined start and end timestamp. Used
for grouping
* replication log files into time-based processing windows and managing file
distribution across
@@ -32,7 +30,6 @@ public class ReplicationRound {
private final long endTime;
public ReplicationRound(long startTime, long endTime) {
- Preconditions.checkArgument(startTime < endTime);
this.startTime = startTime;
this.endTime = endTime;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
index 76c2afcb19..cbbe5fc3a0 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
@@ -150,7 +150,8 @@ public class ReplicationShardDirectoryManager {
*/
public ReplicationRound getReplicationRoundFromEndTime(long roundEndTime) {
long validRoundEndTime = getNearestRoundStartTimestamp(roundEndTime);
- long validRoundStartTime = validRoundEndTime -
replicationRoundDurationSeconds * 1000L;
+ long validRoundStartTime =
+ Math.max(0L, validRoundEndTime - replicationRoundDurationSeconds *
1000L);
return new ReplicationRound(validRoundStartTime, validRoundEndTime);
}
@@ -172,7 +173,7 @@ public class ReplicationShardDirectoryManager {
* @param timestamp The timestamp in milliseconds since epoch
* @return The nearest replication round start timestamp
*/
- protected long getNearestRoundStartTimestamp(long timestamp) {
+ public long getNearestRoundStartTimestamp(long timestamp) {
// Convert round time from seconds to milliseconds
long roundTimeMs = replicationRoundDurationSeconds * 1000L;
@@ -181,6 +182,14 @@ public class ReplicationShardDirectoryManager {
return (timestamp / roundTimeMs) * roundTimeMs;
}
+ public ReplicationRound getPreviousRound(final ReplicationRound
replicationRound) {
+ return getReplicationRoundFromEndTime(replicationRound.getStartTime());
+ }
+
+ public ReplicationRound getNextRound(final ReplicationRound
replicationRound) {
+ return getReplicationRoundFromStartTime(replicationRound.getEndTime());
+ }
+
public int getReplicationRoundDurationSeconds() {
return this.replicationRoundDurationSeconds;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index f1292519a5..36daca8b97 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -127,6 +127,9 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
@Override
public void init() throws IOException {
+
+ LOG.info("Initializing ReplicationLogDiscoveryReplay for haGroup: {}",
haGroupName);
+
HAGroupStateListener degradedListener =
(groupName, fromState, toState, modifiedTime, clusterType,
lastSyncStateTimeInMs) -> {
if (
@@ -220,7 +223,10 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
*/
@Override
protected void initializeLastRoundProcessed() throws IOException {
+ LOG.info("Initializing last round processed for haGroup: {}", haGroupName);
HAGroupStoreRecord haGroupStoreRecord = getHAGroupRecord();
+ LOG.info("Found HA Group state during initialization as {} for haGroup:
{}",
+ haGroupStoreRecord.getHAGroupState(), haGroupName);
if (
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(haGroupStoreRecord.getHAGroupState())
) {
@@ -285,6 +291,7 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
LOG.info("Starting replay with lastRoundProcessed={}, lastRoundInSync={}",
lastRoundProcessed,
lastRoundInSync);
Optional<ReplicationRound> optionalNextRound = getFirstRoundToProcess();
+ LOG.info("Found first round to process as {} for haGroup: {}",
optionalNextRound, haGroupName);
while (optionalNextRound.isPresent()) {
ReplicationRound replicationRound = optionalNextRound.get();
try {
@@ -301,8 +308,12 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
switch (currentState) {
case SYNCED_RECOVERY:
// Rewind to last in-sync round
- LOG.info("SYNCED_RECOVERY detected, rewinding to
lastRoundInSync={}", lastRoundInSync);
- setLastRoundProcessed(lastRoundInSync);
+ LOG.info("SYNCED_RECOVERY detected, rewinding with
lastRoundInSync={}", lastRoundInSync);
+ Optional<ReplicationRound> firstRoundToProcess =
getFirstRoundToProcess();
+ LOG.info("Calculated first round to process after SYNCED_RECOVERY
as" + "{}",
+ firstRoundToProcess);
+ firstRoundToProcess.ifPresent(round -> setLastRoundProcessed(
+
replicationLogTracker.getReplicationShardDirectoryManager().getPreviousRound(round)));
// Only reset to NORMAL if state hasn't been flipped to DEGRADED
replicationReplayState.compareAndSet(ReplicationReplayState.SYNCED_RECOVERY,
ReplicationReplayState.SYNC);
@@ -336,16 +347,9 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
LOG.info(
"No more rounds to process, lastRoundInSync={}, lastRoundProcessed={}.
"
+ "Failover is triggered & in progress directory is empty. "
- + "Marking cluster state as {}",
+ + "Attempting to mark cluster state as {}",
lastRoundInSync, lastRoundProcessed,
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
- try {
- triggerFailover();
- LOG.info("Successfully updated the cluster state");
- failoverPending.set(false);
- } catch (InvalidClusterRoleTransitionException exception) {
- LOG.warn("Failed to update the cluster state.", exception);
- failoverPending.set(false);
- }
+ triggerFailover();
}
}
@@ -356,8 +360,15 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
* scenarios where lastRoundProcessed may be ahead of lastRoundInSync.
* @return Optional containing the first round to process, or empty if not
enough time has passed
*/
- private Optional<ReplicationRound> getFirstRoundToProcess() {
- long lastRoundEndTimestamp = getLastRoundInSync().getEndTime();
+ private Optional<ReplicationRound> getFirstRoundToProcess() throws
IOException {
+ ReplicationRound lastRoundInSync = getLastRoundInSync();
+ long lastRoundEndTimestamp = lastRoundInSync.getEndTime();
+ if (lastRoundInSync.getStartTime() == 0) {
+ Optional<Long> optionalMinimumNewFilesTimestamp =
getMinTimestampFromNewFiles();
+ lastRoundEndTimestamp =
+
replicationLogTracker.getReplicationShardDirectoryManager().getNearestRoundStartTimestamp(
+
optionalMinimumNewFilesTimestamp.orElseGet(EnvironmentEdgeManager::currentTime));
+ }
long currentTime = EnvironmentEdgeManager.currentTime();
if (currentTime - lastRoundEndTimestamp < roundTimeMills + bufferMillis) {
// nothing more to process
@@ -440,17 +451,34 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
return optionalHAGroupStateRecord.get();
}
+ /**
+ * Determines whether failover should be triggered based on completion
criteria. Failover is safe
+ * to trigger when all of the following conditions are met: 1. A failover
has been requested
+ * (failoverPending is true) 2. No files are currently in the in-progress
directory 3. No new
+ * files exist for ongoing round These conditions ensure all replication
logs have been processed
+ * before transitioning the cluster from STANDBY to ACTIVE state.
+ * @return true if all conditions are met and failover should be triggered,
false otherwise
+ * @throws IOException if there's an error checking file status
+ */
protected boolean shouldTriggerFailover() throws IOException {
- return failoverPending.get() && lastRoundInSync.equals(lastRoundProcessed)
- && replicationLogTracker.getInProgressFiles().isEmpty();
- // TODO: Check for in files of lastRoundProcessed, lastRoundProcessed +
roundTime as well -
- // because there can be new files for upcoming round
+ return failoverPending.get() &&
replicationLogTracker.getInProgressFiles().isEmpty()
+ && replicationLogTracker.getNewFilesForRound(replicationLogTracker
+
.getReplicationShardDirectoryManager().getNextRound(getLastRoundProcessed())).isEmpty();
}
- protected void triggerFailover() throws IOException,
InvalidClusterRoleTransitionException {
- // TODO: Update cluster state to ACTIVE_IN_SYNC within try block
- // (once API is supported in HA Store)
- // Throw any exception back to caller.
+ protected void triggerFailover() {
+ try {
+
HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName);
+ failoverPending.set(false);
+ } catch (InvalidClusterRoleTransitionException
invalidClusterRoleTransitionException) {
+ LOG.warn(
+ "Failed to update the cluster state due to"
+ + "InvalidClusterRoleTransitionException. Setting failoverPending" +
"to false.",
+ invalidClusterRoleTransitionException);
+ failoverPending.set(false);
+ } catch (Exception exception) {
+ LOG.error("Failed to update the cluster state.", exception);
+ }
}
public enum ReplicationReplayState {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index be744a5f7e..84d4f1ba42 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -213,6 +213,8 @@ public class ReplicationLogProcessor implements Closeable {
public void processLogFile(FileSystem fs, Path filePath) throws IOException {
+ LOG.info("Starting to process file {}", filePath);
+
// Map from Table Name to List of Mutations
Map<TableName, List<Mutation>> tableToMutationsMap = new HashMap<>();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
index 89c05ce63c..e0e3811ed3 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
@@ -105,6 +105,7 @@ public class ReplicationLogReplay {
* @throws IOException if there's an error during initialization
*/
protected void init() throws IOException {
+ LOG.info("Initializing ReplicationLogReplay for haGroup: {}", haGroupName);
initializeFileSystem();
Path newFilesDirectory =
new Path(new Path(rootURI.getPath(), haGroupName),
ReplicationLogReplay.IN_DIRECTORY_NAME);
@@ -120,6 +121,7 @@ public class ReplicationLogReplay {
}
public void close() {
+ LOG.info("Closing ReplicationLogReplay for haGroup: {}", haGroupName);
replicationLogDiscoveryReplay.getReplicationLogFileTracker().close();
replicationLogDiscoveryReplay.close();
// Remove the instance from cache
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
index 1485675050..fe380dea85 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
@@ -191,6 +191,7 @@ public class ReplicationLogReplayService {
*/
protected void startReplicationReplay() throws IOException, SQLException {
List<String> replicationGroups = getReplicationGroups();
+ LOG.info("{} number of HA Groups found to start Replication Replay",
replicationGroups.size());
for (String replicationGroup : replicationGroups) {
ReplicationLogReplay.get(conf, replicationGroup).startReplay();
}
@@ -201,6 +202,7 @@ public class ReplicationLogReplayService {
*/
protected void stopReplicationReplay() throws IOException, SQLException {
List<String> replicationGroups = getReplicationGroups();
+ LOG.info("{} number of HA Groups found to stop Replication Replay",
replicationGroups.size());
for (String replicationGroup : replicationGroups) {
ReplicationLogReplay replicationLogReplay =
ReplicationLogReplay.get(conf, replicationGroup);
replicationLogReplay.stopReplay();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index 6eb8cd8b3e..68045bf5e0 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -26,9 +26,11 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URI;
+import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.jdbc.HAGroupStoreRecord;
import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
@@ -97,8 +100,8 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
public void setUp() throws Exception {
zkUrl = getLocalZkUrl(config);
peerZkUrl = CLUSTERS.getZkUrl2();
-
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(),
zkUrl,
- peerZkUrl, CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, zkUrl,
peerZkUrl,
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY, null);
localFs = FileSystem.getLocal(config);
standbyUri = testFolder.getRoot().toURI();
@@ -1293,6 +1296,396 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
}
}
+ /**
+ * Tests replay method when lastRoundInSync is not set, i.e. has start time
0. Validates that
+ * getFirstRoundToProcess uses minimum timestamp from new files
+ */
+ @Test
+ public void testReplay_LastRoundInSync_NotInitialized() throws IOException {
+ TestableReplicationLogTracker fileTracker =
+ createReplicationLogTracker(config, haGroupName, localFs, standbyUri);
+
+ try {
+ long roundTimeMills =
+
fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds()
+ * 1000L;
+ long bufferMillis = (long) (roundTimeMills * 0.15);
+
+ // Create new files with specific timestamps
+ // The minimum timestamp should be used as the basis for the first round
+ long minFileTimestamp = 1704067200000L + (5 * roundTimeMills) + (30 *
1000L); // 2024-01-01
+
// 00:05:30
+ long maxFileTimestamp = 1704067200000L + (8 * roundTimeMills) + (45 *
1000L); // 2024-01-01
+
// 00:08:45
+ long middleFileTimestamp = 1704067200000L + (7 * roundTimeMills) + (15 *
1000L); // 2024-01-01
+
// 00:07:15
+
+ // Create files in different shard directories
+ ReplicationShardDirectoryManager shardManager =
+ fileTracker.getReplicationShardDirectoryManager();
+
+ // Create file with minimum timestamp
+ Path shardPath1 = shardManager.getShardDirectory(minFileTimestamp);
+ localFs.mkdirs(shardPath1);
+ Path file1 = new Path(shardPath1, minFileTimestamp + "_rs-1.plog");
+ localFs.create(file1, true).close();
+
+ // Create file with middle timestamp
+ Path shardPath2 = shardManager.getShardDirectory(middleFileTimestamp);
+ localFs.mkdirs(shardPath2);
+ Path file2 = new Path(shardPath2, middleFileTimestamp + "_rs-2.plog");
+ localFs.create(file2, true).close();
+
+ // Create file with maximum timestamp
+ Path shardPath3 = shardManager.getShardDirectory(maxFileTimestamp);
+ localFs.mkdirs(shardPath3);
+ Path file3 = new Path(shardPath3, maxFileTimestamp + "_rs-3.plog");
+ localFs.create(file3, true).close();
+
+ // Create HAGroupStoreRecord for STANDBY state
+ HAGroupStoreRecord mockRecord =
+ new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ peerZkUrl, zkUrl, peerZkUrl, 0L);
+
+ // Calculate expected first round start time (minimum timestamp rounded
down to nearest round
+ // start)
+ long expectedFirstRoundStart =
shardManager.getNearestRoundStartTimestamp(minFileTimestamp);
+ long expectedFirstRoundEnd = expectedFirstRoundStart + roundTimeMills;
+
+ // Set current time to allow processing exactly 5 rounds
+ // Round 1 ends at expectedFirstRoundEnd
+ // Round 2 ends at expectedFirstRoundEnd + roundTimeMills
+ // Round 3 ends at expectedFirstRoundEnd + 2*roundTimeMills
+ // Round 4 ends at expectedFirstRoundEnd + 3*roundTimeMills
+ // Round 5 ends at expectedFirstRoundEnd + 4*roundTimeMills
+ // Round 6 starts at expectedFirstRoundEnd + 4*roundTimeMills
+ // For round 6 to be processable: currentTime - (expectedFirstRoundEnd +
4*roundTimeMills) >=
+ // roundTimeMills + bufferMillis
+ // So: currentTime >= expectedFirstRoundEnd + 5*roundTimeMills +
bufferMillis
+ // For round 6 NOT to be processable: currentTime -
(expectedFirstRoundEnd + 5*roundTimeMills)
+ // < roundTimeMills + bufferMillis
+ // So exactly 5 rounds should be processed with currentTime =
expectedFirstRoundEnd +
+ // 5*roundTimeMills + bufferMillis - 1
+ // To be safe, use: currentTime = expectedFirstRoundEnd +
5*roundTimeMills + bufferMillis -
+ // 1000
+ long currentTime = expectedFirstRoundEnd + (5 * roundTimeMills) +
bufferMillis - 1000;
+ EnvironmentEdge edge = () -> currentTime;
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ // Calculate exact number of rounds that should be processed
+ // Round 1: from getFirstRoundToProcess (based on lastRoundInSync with
start time 0)
+ // - Uses minFileTimestamp rounded down = expectedFirstRoundStart
+ // - Round 1: expectedFirstRoundStart to expectedFirstRoundEnd
+ // Round 2+: from getNextRoundToProcess (based on previous round's end
time)
+ // - Round 2: expectedFirstRoundEnd to expectedFirstRoundEnd +
roundTimeMills
+ // - Round 3: expectedFirstRoundEnd + roundTimeMills to
expectedFirstRoundEnd +
+ // 2*roundTimeMills
+ // - Round 4: expectedFirstRoundEnd + 2*roundTimeMills to
expectedFirstRoundEnd +
+ // 3*roundTimeMills
+ // - Round 5: expectedFirstRoundEnd + 3*roundTimeMills to
expectedFirstRoundEnd +
+ // 4*roundTimeMills
+ int expectedRoundCount = 5;
+ long[] expectedRoundStarts = new long[expectedRoundCount];
+ long[] expectedRoundEnds = new long[expectedRoundCount];
+ expectedRoundStarts[0] = expectedFirstRoundStart;
+ expectedRoundEnds[0] = expectedFirstRoundEnd;
+ for (int i = 1; i < expectedRoundCount; i++) {
+ expectedRoundStarts[i] = expectedFirstRoundEnd + ((i - 1) *
roundTimeMills);
+ expectedRoundEnds[i] = expectedRoundStarts[i] + roundTimeMills;
+ }
+
+ try {
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+ // Set lastRoundInSync with start time 0 (the new case being tested)
+ ReplicationRound lastRoundInSyncWithZeroStart = new
ReplicationRound(0L, roundTimeMills);
+ discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
+
+ // Set lastRoundProcessed to some initial value
+ ReplicationRound initialLastRoundProcessed = new ReplicationRound(0L,
roundTimeMills);
+ discovery.setLastRoundProcessed(initialLastRoundProcessed);
+ discovery
+
.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC);
+
+ // Verify initial state
+ ReplicationRound lastRoundInSyncBefore =
discovery.getLastRoundInSync();
+ assertEquals("lastRoundInSync should have start time 0 before replay",
0L,
+ lastRoundInSyncBefore.getStartTime());
+
+ // Call replay - should start from minimum timestamp of new files
+ discovery.replay();
+
+ // Verify exact count of processRound calls
+ int actualProcessRoundCallCount = discovery.getProcessRoundCallCount();
+ assertEquals("processRound should be called exactly " +
expectedRoundCount + " times",
+ expectedRoundCount, actualProcessRoundCallCount);
+
+ // Get all processed rounds
+ List<ReplicationRound> processedRounds =
discovery.getProcessedRounds();
+ assertEquals("Should have processed exactly " + expectedRoundCount + "
rounds",
+ expectedRoundCount, processedRounds.size());
+
+ // Verify each round's exact parameters
+ for (int i = 0; i < expectedRoundCount; i++) {
+ ReplicationRound actualRound = processedRounds.get(i);
+ assertEquals("Round " + (i + 1) + " should have correct start time",
+ expectedRoundStarts[i], actualRound.getStartTime());
+ assertEquals("Round " + (i + 1) + " should have correct end time",
expectedRoundEnds[i],
+ actualRound.getEndTime());
+ }
+
+ // Verify lastRoundProcessed was updated to the last processed round
+ ReplicationRound lastRoundAfterReplay =
discovery.getLastRoundProcessed();
+ assertNotNull("Last round processed should not be null",
lastRoundAfterReplay);
+ assertEquals("Last round processed should match the last processed
round start time",
+ expectedRoundStarts[expectedRoundCount - 1],
lastRoundAfterReplay.getStartTime());
+ assertEquals("Last round processed should match the last processed
round end time",
+ expectedRoundEnds[expectedRoundCount - 1],
lastRoundAfterReplay.getEndTime());
+
+ // Verify lastRoundInSync was updated (in SYNC state, both should
advance together)
+ ReplicationRound lastRoundInSyncAfter = discovery.getLastRoundInSync();
+ assertNotNull("Last round in sync should not be null",
lastRoundInSyncAfter);
+ assertEquals("Last round in sync should match last round processed in
SYNC state",
+ lastRoundAfterReplay, lastRoundInSyncAfter);
+ assertEquals("Last round in sync should have correct start time",
+ expectedRoundStarts[expectedRoundCount - 1],
lastRoundInSyncAfter.getStartTime());
+ assertEquals("Last round in sync should have correct end time",
+ expectedRoundEnds[expectedRoundCount - 1],
lastRoundInSyncAfter.getEndTime());
+
+ // Verify state remains SYNC
+ assertEquals("State should remain SYNC",
+ ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC,
+ discovery.getReplicationReplayState());
+
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ } finally {
+ fileTracker.close();
+ }
+ }
+
+ /**
+ * Tests replay method when state changes from DEGRADED to SYNC between
round processing and
+ * lastRoundInSync has start time 0. Expected behavior: - 1st round is
processed in DEGRADED state
+ * - 2nd round is processed in DEGRADED state, but then state transitions to
SYNCED_RECOVERY -
+ * When state becomes SYNCED_RECOVERY, lastRoundProcessed is rewound to the
previous round of
+ * getFirstRoundToProcess() (which uses minimum file timestamp when
lastRoundInSync.startTime ==
+ * 0) - Then 5 rounds are processed starting from the first round (based on
minimum file
+ * timestamp) in SYNC state Total: 7 rounds processed (2 in DEGRADED, then
rewind, then 5 in SYNC)
+ */
+ @Test
+ public void testReplay_DegradedToSync_LastRoundInSyncStartTimeZero() throws
IOException {
+ TestableReplicationLogTracker fileTracker =
+ createReplicationLogTracker(config, haGroupName, localFs, standbyUri);
+
+ try {
+ long roundTimeMills =
+
fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds()
+ * 1000L;
+ long bufferMillis = (long) (roundTimeMills * 0.15);
+
+ // Create new files with specific timestamps
+ // The minimum timestamp should be used as the basis for the first round
+ long minFileTimestamp = 1704067200000L + (5 * roundTimeMills) + (30 *
1000L); // 2024-01-01
+
// 00:05:30
+ long maxFileTimestamp = 1704067200000L + (8 * roundTimeMills) + (45 *
1000L); // 2024-01-01
+
// 00:08:45
+
+ // Create files in different shard directories
+ ReplicationShardDirectoryManager shardManager =
+ fileTracker.getReplicationShardDirectoryManager();
+
+ // Create file with minimum timestamp
+ Path shardPath1 = shardManager.getShardDirectory(minFileTimestamp);
+ localFs.mkdirs(shardPath1);
+ Path file1 = new Path(shardPath1, minFileTimestamp + "_rs-1.plog");
+ localFs.create(file1, true).close();
+
+ // Create file with maximum timestamp
+ Path shardPath2 = shardManager.getShardDirectory(maxFileTimestamp);
+ localFs.mkdirs(shardPath2);
+ Path file2 = new Path(shardPath2, maxFileTimestamp + "_rs-2.plog");
+ localFs.create(file2, true).close();
+
+ // Create HAGroupStoreRecord for STANDBY state
+ HAGroupStoreRecord mockRecord =
+ new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ peerZkUrl, zkUrl, peerZkUrl, 0L);
+
+ // Calculate expected first round start time (minimum timestamp rounded
down to nearest round
+ // start)
+ long expectedFirstRoundStart =
shardManager.getNearestRoundStartTimestamp(minFileTimestamp);
+ long expectedFirstRoundEnd = expectedFirstRoundStart + roundTimeMills;
+
+ // After rewind, lastRoundProcessed will be set to previous round of
firstRoundToProcess
+ // firstRoundToProcess = expectedFirstRoundStart to expectedFirstRoundEnd
+ // previous round = expectedFirstRoundStart - roundTimeMills to
expectedFirstRoundStart
+ // Then getNextRoundToProcess will return the first round starting from
+ // expectedFirstRoundStart
+ // So the first SYNC round after rewind will be expectedFirstRoundStart
to
+ // expectedFirstRoundEnd
+
+ // Set current time to allow processing 7 rounds total (2 in DEGRADED,
then rewind, then 5 in
+ // SYNC)
+ // After rewind, we need time for 5 more rounds starting from
expectedFirstRoundStart
+ // Round 7 (the 5th round after rewind) will end at
expectedFirstRoundEnd + 4*roundTimeMills
+ // = expectedFirstRoundStart + roundTimeMills + 4*roundTimeMills =
expectedFirstRoundStart +
+ // 5*roundTimeMills
+ // For round 7 to be processable: currentTime - (expectedFirstRoundEnd +
4*roundTimeMills) >=
+ // roundTimeMills + bufferMillis
+ // So: currentTime >= expectedFirstRoundStart + 5*roundTimeMills +
bufferMillis
+ // Set it slightly above to ensure round 7 is processable
+ long currentTime = expectedFirstRoundStart + (5 * roundTimeMills) +
bufferMillis + 1000;
+ EnvironmentEdge edge = () -> currentTime;
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ // Calculate expected rounds
+ // Rounds 1-2: In DEGRADED state
+ // Round 1: expectedFirstRoundStart to expectedFirstRoundEnd (DEGRADED)
+ // Round 2: expectedFirstRoundEnd to expectedFirstRoundEnd +
roundTimeMills (DEGRADED,
+ // transitions to SYNCED_RECOVERY at end)
+ // After round 2, state becomes SYNCED_RECOVERY and causes rewind
+ // - getFirstRoundToProcess returns: expectedFirstRoundStart to
expectedFirstRoundEnd
+ // - lastRoundProcessed is set to: expectedFirstRoundStart -
roundTimeMills to
+ // expectedFirstRoundStart
+ // Rounds 3-7: In SYNC state (starting from first round based on minimum
file timestamp)
+ // Round 3: expectedFirstRoundStart to expectedFirstRoundEnd (SYNC,
after rewind - first
+ // round)
+ // Round 4: expectedFirstRoundEnd to expectedFirstRoundEnd +
roundTimeMills (SYNC)
+ // Round 5: expectedFirstRoundEnd + roundTimeMills to
expectedFirstRoundEnd + 2*roundTimeMills
+ // (SYNC)
+ // Round 6: expectedFirstRoundEnd + 2*roundTimeMills to
expectedFirstRoundEnd +
+ // 3*roundTimeMills (SYNC)
+ // Round 7: expectedFirstRoundEnd + 3*roundTimeMills to
expectedFirstRoundEnd +
+ // 4*roundTimeMills (SYNC)
+ int expectedRoundCount = 7;
+ int degradedRoundCount = 2; // First 2 rounds are in DEGRADED state
+ int syncRoundCount = 5; // Last 5 rounds are in SYNC state (after rewind)
+ long[] expectedRoundStarts = new long[expectedRoundCount];
+ long[] expectedRoundEnds = new long[expectedRoundCount];
+
+ // Rounds 1-2: DEGRADED
+ expectedRoundStarts[0] = expectedFirstRoundStart;
+ expectedRoundEnds[0] = expectedFirstRoundEnd;
+ expectedRoundStarts[1] = expectedFirstRoundEnd;
+ expectedRoundEnds[1] = expectedFirstRoundEnd + roundTimeMills;
+
+ // Rounds 3-7: SYNC (after rewind, starting from expectedFirstRoundStart
- the first round
+ // based on minimum file timestamp)
+ expectedRoundStarts[2] = expectedFirstRoundStart; // Round 3: first
round after rewind
+ expectedRoundEnds[2] = expectedFirstRoundEnd;
+ for (int i = 3; i < expectedRoundCount; i++) {
+ expectedRoundStarts[i] = expectedFirstRoundEnd + ((i - 3) *
roundTimeMills);
+ expectedRoundEnds[i] = expectedRoundStarts[i] + roundTimeMills;
+ }
+
+ try {
+ // Create discovery that transitions from DEGRADED to SYNCED_RECOVERY
after 2 rounds
+ // SYNCED_RECOVERY triggers a rewind, then transitions to SYNC
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord) {
+ private int roundCount = 0;
+
+ @Override
+ protected void processRound(ReplicationRound replicationRound)
throws IOException {
+ super.processRound(replicationRound);
+ roundCount++;
+
+ // Transition to SYNCED_RECOVERY after 2 rounds (after
processing round 2)
+ // This triggers a rewind similar to when coming back from
DEGRADED to SYNC
+ if (roundCount == degradedRoundCount) {
+ setReplicationReplayState(
+
ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY);
+ }
+ }
+ };
+
+ // Set lastRoundInSync with start time 0
+ ReplicationRound lastRoundInSyncWithZeroStart = new
ReplicationRound(0L, roundTimeMills);
+ discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
+
+ // Set lastRoundProcessed to some initial value
+ ReplicationRound initialLastRoundProcessed = new ReplicationRound(0L,
roundTimeMills);
+ discovery.setLastRoundProcessed(initialLastRoundProcessed);
+ discovery
+
.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED);
+
+ // Verify initial state
+ ReplicationRound lastRoundInSyncBefore =
discovery.getLastRoundInSync();
+ assertEquals("lastRoundInSync should have start time 0 before replay",
0L,
+ lastRoundInSyncBefore.getStartTime());
+ assertEquals("Initial state should be DEGRADED",
+ ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED,
+ discovery.getReplicationReplayState());
+
+ // Call replay - should start from minimum timestamp of new files
+ discovery.replay();
+
+ // Verify exact count of processRound calls
+ int actualProcessRoundCallCount = discovery.getProcessRoundCallCount();
+ assertEquals("processRound should be called exactly " +
expectedRoundCount + " times",
+ expectedRoundCount, actualProcessRoundCallCount);
+
+ // Get all processed rounds
+ List<ReplicationRound> processedRounds =
discovery.getProcessedRounds();
+ assertEquals("Should have processed exactly " + expectedRoundCount + "
rounds",
+ expectedRoundCount, processedRounds.size());
+
+ // Verify each round's exact parameters
+ for (int i = 0; i < expectedRoundCount; i++) {
+ ReplicationRound actualRound = processedRounds.get(i);
+ assertEquals("Round " + (i + 1) + " should have correct start time",
+ expectedRoundStarts[i], actualRound.getStartTime());
+ assertEquals("Round " + (i + 1) + " should have correct end time",
expectedRoundEnds[i],
+ actualRound.getEndTime());
+ }
+
+ // Verify lastRoundProcessed was updated to the last processed round
+ ReplicationRound lastRoundAfterReplay =
discovery.getLastRoundProcessed();
+ assertNotNull("Last round processed should not be null",
lastRoundAfterReplay);
+ assertEquals("Last round processed should match the last processed
round start time",
+ expectedRoundStarts[expectedRoundCount - 1],
lastRoundAfterReplay.getStartTime());
+ assertEquals("Last round processed should match the last processed
round end time",
+ expectedRoundEnds[expectedRoundCount - 1],
lastRoundAfterReplay.getEndTime());
+
+ // Verify lastRoundInSync behavior:
+ // - During first 2 rounds (DEGRADED), lastRoundInSync should NOT be
updated (remains with
+ // start time 0)
+ // - After transition to SYNCED_RECOVERY and then SYNC (rounds 3-7),
lastRoundInSync should
+ // be updated
+ // - Final lastRoundInSync should match lastRoundProcessed (because
we're in SYNC state at
+ // the end)
+ ReplicationRound lastRoundInSyncAfter = discovery.getLastRoundInSync();
+ assertNotNull("Last round in sync should not be null",
lastRoundInSyncAfter);
+
+ // After processing, we're in SYNC state, so lastRoundInSync should be
updated
+ // It should match the last processed round because state is SYNC
+ assertEquals("Last round in sync should match last round processed in
SYNC state",
+ lastRoundAfterReplay, lastRoundInSyncAfter);
+ assertEquals("Last round in sync should have correct start time",
+ expectedRoundStarts[expectedRoundCount - 1],
lastRoundInSyncAfter.getStartTime());
+ assertEquals("Last round in sync should have correct end time",
+ expectedRoundEnds[expectedRoundCount - 1],
lastRoundInSyncAfter.getEndTime());
+ assertTrue("Last round in sync start time should not be 0 after
processing in SYNC state",
+ lastRoundInSyncAfter.getStartTime() > 0);
+
+ // Verify state transitioned to SYNC
+ assertEquals("State should be SYNC after transition",
+ ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC,
+ discovery.getReplicationReplayState());
+
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ } finally {
+ fileTracker.close();
+ }
+ }
+
/**
* Tests replay method when failoverPending becomes true during processing
and triggers failover
* after all rounds. Validates that triggerFailover is called exactly once
when all conditions are
@@ -1423,7 +1816,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
/**
* Tests the shouldTriggerFailover method with various combinations of
failoverPending,
- * lastRoundInSync, lastRoundProcessed and in-progress files state.
+ * in-progress files, and new files for next round.
*/
@Test
public void testShouldTriggerFailover() throws IOException {
@@ -1445,11 +1838,13 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
try {
// Create test rounds
ReplicationRound testRound = new ReplicationRound(1704153600000L,
1704153660000L);
- ReplicationRound differentRound = new ReplicationRound(1704153540000L,
1704153600000L);
+ ReplicationRound nextRound =
+ tracker.getReplicationShardDirectoryManager().getNextRound(testRound);
// Test Case 1: All conditions true - should return true
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
@@ -1463,6 +1858,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Test Case 2: failoverPending is false - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
@@ -1473,22 +1869,25 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
discovery.shouldTriggerFailover());
}
- // Test Case 3: lastRoundInSync not equals lastRoundProcessed - should
return false
+ // Test Case 3: in-progress files not empty - should return false
{
- when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+ when(tracker.getInProgressFiles())
+ .thenReturn(Collections.singletonList(new Path("test.plog")));
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
- discovery.setLastRoundProcessed(differentRound);
+ discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(true);
- assertFalse("Should not trigger failover when lastRoundInSync !=
lastRoundProcessed",
+ assertFalse("Should not trigger failover when in-progress files are
not empty",
discovery.shouldTriggerFailover());
}
- // Test Case 4: in-progress files not empty - should return false
+ // Test Case 4: new files exist for next round - should return false
{
- when(tracker.getInProgressFiles())
+ when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+ when(tracker.getNewFilesForRound(nextRound))
.thenReturn(Collections.singletonList(new Path("test.plog")));
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
@@ -1496,27 +1895,30 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(true);
- assertFalse("Should not trigger failover when in-progress files are
not empty",
+ assertFalse("Should not trigger failover when new files exist for next
round",
discovery.shouldTriggerFailover());
}
- // Test Case 5: failoverPending false AND lastRoundInSync !=
lastRoundProcessed - should
- // return false
+ // Test Case 5: failoverPending false AND in-progress files not empty -
should return false
{
- when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+ when(tracker.getInProgressFiles())
+ .thenReturn(Collections.singletonList(new Path("test.plog")));
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
- discovery.setLastRoundProcessed(differentRound);
+ discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(false);
- assertFalse("Should not trigger failover when failoverPending is false
and rounds differ",
+ assertFalse(
+ "Should not trigger failover when failoverPending is false and
in-progress files exist",
discovery.shouldTriggerFailover());
}
- // Test Case 6: failoverPending false AND in-progress files not empty -
should return false
+ // Test Case 6: failoverPending false AND new files exist - should
return false
{
- when(tracker.getInProgressFiles())
+ when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+ when(tracker.getNewFilesForRound(nextRound))
.thenReturn(Collections.singletonList(new Path("test.plog")));
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
@@ -1524,22 +1926,23 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(false);
- assertFalse("Should not trigger failover when failoverPending is false
and files exist",
+ assertFalse("Should not trigger failover when failoverPending is false
and new files exist",
discovery.shouldTriggerFailover());
}
- // Test Case 7: lastRoundInSync != lastRoundProcessed AND in-progress
files not empty - should
- // return false
+ // Test Case 7: in-progress files not empty AND new files exist - should
return false
{
when(tracker.getInProgressFiles())
- .thenReturn(Collections.singletonList(new Path("test.plog")));
+ .thenReturn(Collections.singletonList(new Path("test1.plog")));
+ when(tracker.getNewFilesForRound(nextRound))
+ .thenReturn(Collections.singletonList(new Path("test2.plog")));
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
- discovery.setLastRoundProcessed(differentRound);
+ discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(true);
- assertFalse("Should not trigger failover when rounds differ and files
exist",
+ assertFalse("Should not trigger failover when both in-progress and new
files exist",
discovery.shouldTriggerFailover());
}
@@ -1547,10 +1950,12 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
{
when(tracker.getInProgressFiles())
.thenReturn(Collections.singletonList(new Path("test.plog")));
+ when(tracker.getNewFilesForRound(nextRound))
+ .thenReturn(Collections.singletonList(new Path("test2.plog")));
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(tracker,
haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
- discovery.setLastRoundProcessed(differentRound);
+ discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(false);
assertFalse("Should not trigger failover when all conditions are
false",
@@ -1562,6 +1967,142 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
}
}
+ /**
+ * Tests the triggerFailover method to verify it properly updates cluster
state and handles
+ * exceptions correctly.
+ */
+ @Test
+ public void testTriggerFailover() throws IOException, SQLException {
+ final String haGroupName = "testTriggerFailoverHAGroup";
+ // Set up HA group store record with STANDBY_TO_ACTIVE state for
successful transition
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, zkUrl,
peerZkUrl,
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.STANDBY_TO_ACTIVE,
+ ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, null);
+
+ TestableReplicationLogTracker fileTracker = null;
+ try {
+ fileTracker = createReplicationLogTracker(config, haGroupName, localFs,
standbyUri);
+
+ // Create a spy of ReplicationLogDiscoveryReplay to test actual
implementation
+ ReplicationLogDiscoveryReplay discovery =
+ Mockito.spy(new ReplicationLogDiscoveryReplay(fileTracker));
+
+ // Initialize the discovery
+ discovery.init();
+
+ // Set failoverPending to true
+ discovery.setFailoverPending(true);
+ assertTrue("failoverPending should be true before triggerFailover",
+ discovery.getFailoverPending());
+
+ // Verify initial state is STANDBY_TO_ACTIVE
+ Optional<HAGroupStoreRecord> recordBefore =
+
HAGroupStoreManager.getInstance(config).getHAGroupStoreRecord(haGroupName);
+ assertTrue("HA group record should exist", recordBefore.isPresent());
+ assertEquals("Initial state should be STANDBY_TO_ACTIVE",
+ HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE,
recordBefore.get().getHAGroupState());
+
+ // Call triggerFailover - should succeed and update cluster state to
ACTIVE_IN_SYNC
+ discovery.triggerFailover();
+
+ // Verify triggerFailover was called once
+ Mockito.verify(discovery, Mockito.times(1)).triggerFailover();
+
+ // Verify cluster state is updated to ACTIVE_IN_SYNC after failover
+ Optional<HAGroupStoreRecord> recordAfter =
+
HAGroupStoreManager.getInstance(config).getHAGroupStoreRecord(haGroupName);
+ assertTrue("HA group record should exist after failover",
recordAfter.isPresent());
+ assertEquals("Cluster state should be ACTIVE_IN_SYNC after successful
failover",
+ HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
recordAfter.get().getHAGroupState());
+
+ // Verify failoverPending is set to false after successful failover
+ assertFalse("failoverPending should be set to false after successful
triggerFailover",
+ discovery.getFailoverPending());
+
+ } finally {
+ // Clean up
+ if (fileTracker != null) {
+ fileTracker.close();
+ }
+ try {
+ HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName,
zkUrl);
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up HA group store record", e);
+ }
+ }
+ }
+
+ /**
+ * Tests triggerFailover when
+ * HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName)
throws
+ * InvalidClusterRoleTransitionException. Verifies that failoverPending is
set to false even when
+ * the exception occurs. This test sets up the HA group in STANDBY state
instead of
+ * STANDBY_TO_ACTIVE. Calling setHAGroupStatusToSync from STANDBY state will
throw
+ * InvalidClusterRoleTransitionException because the transition from STANDBY
directly to
+ * ACTIVE_IN_SYNC is not allowed.
+ */
+ @Test
+ public void
testTriggerFailover_InvalidClusterRoleTransitionExceptionFromHAGroupStoreManager()
+ throws IOException, SQLException {
+ final String haGroupName = "testTriggerFailoverInvalidTransitionHAGroup";
+
+ // Set up HA group store record in STANDBY state (not STANDBY_TO_ACTIVE)
+ // This will cause setHAGroupStatusToSync to throw
InvalidClusterRoleTransitionException
+ // because transitioning from STANDBY directly to ACTIVE_IN_SYNC is invalid
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, zkUrl,
peerZkUrl,
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.STANDBY,
ClusterRoleRecord.ClusterRole.ACTIVE, null);
+
+ TestableReplicationLogTracker fileTracker = null;
+ try {
+ fileTracker = createReplicationLogTracker(config, haGroupName, localFs,
standbyUri);
+
+ // Create a spy of ReplicationLogDiscoveryReplay to test actual
implementation
+ ReplicationLogDiscoveryReplay discovery =
+ Mockito.spy(new ReplicationLogDiscoveryReplay(fileTracker));
+
+ // Initialize the discovery
+ discovery.init();
+
+ // Set failoverPending to true
+ discovery.setFailoverPending(true);
+ assertTrue("failoverPending should be true before triggerFailover",
+ discovery.getFailoverPending());
+
+ // Verify initial state is STANDBY (not STANDBY_TO_ACTIVE)
+ Optional<HAGroupStoreRecord> recordBefore =
+
HAGroupStoreManager.getInstance(config).getHAGroupStoreRecord(haGroupName);
+ assertTrue("HA group record should exist", recordBefore.isPresent());
+ assertEquals("Initial state should be DEGRADED_STANDBY",
+ HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY,
recordBefore.get().getHAGroupState());
+
+ // Call triggerFailover - should handle
InvalidClusterRoleTransitionException
+ // because transitioning from DEGRADED_STANDBY directly to
ACTIVE_IN_SYNC is invalid
+ discovery.triggerFailover();
+
+ // Verify triggerFailover was called (implicitly through spy)
+ Mockito.verify(discovery, Mockito.times(1)).triggerFailover();
+
+ // Verify failoverPending is set to false even when
InvalidClusterRoleTransitionException
+ // occurs
+ assertFalse(
+ "failoverPending should be set to false when
InvalidClusterRoleTransitionException occurs",
+ discovery.getFailoverPending());
+
+ } finally {
+ // Clean up
+ if (fileTracker != null) {
+ fileTracker.close();
+ }
+ try {
+ HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName,
zkUrl);
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up HA group store record", e);
+ }
+ }
+ }
+
/**
* Testable implementation of ReplicationLogDiscoveryReplay for unit
testing. Provides dependency
* injection for HAGroupStoreRecord, tracks processed rounds, and supports
simulating state
@@ -1632,9 +2173,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
protected void triggerFailover() {
// Track calls to triggerFailover for validation
triggerFailoverCallCount++;
-
- // Simulate the real behavior: set failoverPending to false after
triggering failover
- setFailoverPending(false);
+ super.triggerFailover();
}
public int getTriggerFailoverCallCount() {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index 22075604ee..8a985f3ead 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -29,7 +29,15 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
index 9bb27e356f..189b7f0ba5 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
@@ -237,7 +237,7 @@ public class ReplicationShardDirectoryManagerTest {
// Test 7: Multiple rounds later
long multipleRoundsLater = dayStart + (300 * 1000L); // 00:05:00 (5
minutes)
- long expectedRoundStart = dayStart + (300 * 1000L); // Should be exact
+ long expectedRoundStart = dayStart + (300 * 1000L); // Should be exac
long result7 = manager.getNearestRoundStartTimestamp(multipleRoundsLater);
assertEquals("Multiple rounds later should return exact round start",
expectedRoundStart,
result7);
@@ -410,7 +410,7 @@ public class ReplicationShardDirectoryManagerTest {
@Test
public void testGetReplicationRoundFromStartTimeConsistency() {
- // Test that the same input always produces the same output
+ // Test that the same input always produces the same outpu
long timestamp = 1704110400000L; // 2024-01-01 12:00:00 UTC
ReplicationRound result1 =
manager.getReplicationRoundFromStartTime(timestamp);
@@ -593,7 +593,7 @@ public class ReplicationShardDirectoryManagerTest {
@Test
public void testGetReplicationRoundFromEndTimeConsistency() {
- // Test that the same input always produces the same output
+ // Test that the same input always produces the same outpu
long timestamp = 1704110400000L; // 2024-01-01 12:00:00 UTC
ReplicationRound result1 =
manager.getReplicationRoundFromEndTime(timestamp);
@@ -654,4 +654,165 @@ public class ReplicationShardDirectoryManagerTest {
shardPath.getName());
}
}
+
+ @Test
+ public void testGetPreviousRound() {
+ // Use a specific day for consistent testing
+ // 2024-01-01 00:00:00 UTC = 1704067200000L
+ long dayStart = 1704067200000L; // 2024-01-01 00:00:00 UTC
+
+ // Default configuration: 60-second rounds
+ long roundDurationMs = 60 * 1000L; // 60 seconds in milliseconds
+
+ // Test 1: First round (00:00:00 to 00:01:00)
+ long firstRoundStart = dayStart; // 00:00:00
+ long firstRoundEnd = firstRoundStart + roundDurationMs; // 00:01:00
+ ReplicationRound firstRound = new ReplicationRound(firstRoundStart,
firstRoundEnd);
+ ReplicationRound previousRound = manager.getPreviousRound(firstRound);
+
+ // Previous round should end at firstRoundStart (00:00:00), which rounds
down to 00:00:00
+ // Start time should be end time - round duration = 00:00:00 - 60s = -60s
(edge case)
+ long expectedEnd = dayStart; // 00:00:00
+ long expectedStart = expectedEnd - roundDurationMs; // -60 seconds
+ assertEquals("Previous round end time should be start time of current
round rounded down",
+ expectedEnd, previousRound.getEndTime());
+ assertEquals("Previous round start time should be end time - round
duration", expectedStart,
+ previousRound.getStartTime());
+
+ // Test 2: Second round (00:01:00 to 00:02:00)
+ long secondRoundStart = dayStart + roundDurationMs; // 00:01:00
+ long secondRoundEnd = secondRoundStart + roundDurationMs; // 00:02:00
+ ReplicationRound secondRound = new ReplicationRound(secondRoundStart,
secondRoundEnd);
+ ReplicationRound previousRound2 = manager.getPreviousRound(secondRound);
+
+ // Previous round should end at secondRoundStart (00:01:00), which rounds
down to 00:01:00
+ // Start time should be 00:01:00 - 60s = 00:00:00
+ long expectedEnd2 = secondRoundStart; // 00:01:00
+ long expectedStart2 = expectedEnd2 - roundDurationMs; // 00:00:00
+ assertEquals("Previous round end time should be start time of current
round rounded down",
+ expectedEnd2, previousRound2.getEndTime());
+ assertEquals("Previous round start time should be end time - round
duration", expectedStart2,
+ previousRound2.getStartTime());
+
+ // Test 3: Round with mid-timestamp start (00:01:30 would round to
00:01:00)
+ long midRoundStart = dayStart + (90 * 1000L); // 00:01:30 (not aligned to
round boundary)
+ long midRoundEnd = midRoundStart + roundDurationMs; // 00:02:30
+ ReplicationRound midRound = new ReplicationRound(midRoundStart,
midRoundEnd);
+ ReplicationRound previousRound3 = manager.getPreviousRound(midRound);
+
+ // Previous round should end at midRoundStart rounded down = 00:01:00
+ // Start time should be 00:01:00 - 60s = 00:00:00
+ long expectedEnd3 = dayStart + roundDurationMs; // 00:01:00 (rounded down
from 00:01:30)
+ long expectedStart3 = expectedEnd3 - roundDurationMs; // 00:00:00
+ assertEquals("Previous round end time should round down start time of
current round",
+ expectedEnd3, previousRound3.getEndTime());
+ assertEquals("Previous round start time should be end time - round
duration", expectedStart3,
+ previousRound3.getStartTime());
+
+ // Test 4: Multiple rounds later (00:05:00 to 00:06:00)
+ long multipleRoundsStart = dayStart + (5 * roundDurationMs); // 00:05:00
+ long multipleRoundsEnd = multipleRoundsStart + roundDurationMs; // 00:06:00
+ ReplicationRound multipleRounds = new
ReplicationRound(multipleRoundsStart, multipleRoundsEnd);
+ ReplicationRound previousRound4 = manager.getPreviousRound(multipleRounds);
+
+ // Previous round should end at multipleRoundsStart = 00:05:00
+ // Start time should be 00:05:00 - 60s = 00:04:00
+ long expectedEnd4 = multipleRoundsStart; // 00:05:00
+ long expectedStart4 = expectedEnd4 - roundDurationMs; // 00:04:00
+ assertEquals("Previous round end time should be start time of current
round", expectedEnd4,
+ previousRound4.getEndTime());
+ assertEquals("Previous round start time should be end time - round
duration", expectedStart4,
+ previousRound4.getStartTime());
+
+ // Test 5: Verify round duration is consistent
+ long previousRoundDuration = previousRound4.getEndTime() -
previousRound4.getStartTime();
+ assertEquals("Previous round duration should be 60 seconds",
roundDurationMs,
+ previousRoundDuration);
+ }
+
+ @Test
+ public void testGetNextRound() {
+ // Use a specific day for consistent testing
+ // 2024-01-01 00:00:00 UTC = 1704067200000L
+ long dayStart = 1704067200000L; // 2024-01-01 00:00:00 UTC
+
+ // Default configuration: 60-second rounds
+ long roundDurationMs = 60 * 1000L; // 60 seconds in milliseconds
+
+ // Test 1: First round (00:00:00 to 00:01:00)
+ long firstRoundStart = dayStart; // 00:00:00
+ long firstRoundEnd = firstRoundStart + roundDurationMs; // 00:01:00
+ ReplicationRound firstRound = new ReplicationRound(firstRoundStart,
firstRoundEnd);
+ ReplicationRound nextRound = manager.getNextRound(firstRound);
+
+ // Next round should start at firstRoundEnd (00:01:00), which rounds down
to 00:01:00
+ // End time should be 00:01:00 + 60s = 00:02:00
+ long expectedStart = firstRoundEnd; // 00:01:00
+ long expectedEnd = expectedStart + roundDurationMs; // 00:02:00
+ assertEquals("Next round start time should be end time of current round
rounded down",
+ expectedStart, nextRound.getStartTime());
+ assertEquals("Next round end time should be start time + round duration",
expectedEnd,
+ nextRound.getEndTime());
+
+ // Test 2: Second round (00:01:00 to 00:02:00)
+ long secondRoundStart = dayStart + roundDurationMs; // 00:01:00
+ long secondRoundEnd = secondRoundStart + roundDurationMs; // 00:02:00
+ ReplicationRound secondRound = new ReplicationRound(secondRoundStart,
secondRoundEnd);
+ ReplicationRound nextRound2 = manager.getNextRound(secondRound);
+
+ // Next round should start at secondRoundEnd (00:02:00), which rounds down
to 00:02:00
+ // End time should be 00:02:00 + 60s = 00:03:00
+ long expectedStart2 = secondRoundEnd; // 00:02:00
+ long expectedEnd2 = expectedStart2 + roundDurationMs; // 00:03:00
+ assertEquals("Next round start time should be end time of current round
rounded down",
+ expectedStart2, nextRound2.getStartTime());
+ assertEquals("Next round end time should be start time + round duration",
expectedEnd2,
+ nextRound2.getEndTime());
+
+ // Test 3: Round with mid-timestamp end (00:02:30 would round to 00:02:00)
+ long midRoundStart = dayStart + (120 * 1000L); // 00:02:00
+ long midRoundEnd = dayStart + (150 * 1000L); // 00:02:30 (not aligned to
round boundary)
+ ReplicationRound midRound = new ReplicationRound(midRoundStart,
midRoundEnd);
+ ReplicationRound nextRound3 = manager.getNextRound(midRound);
+
+ // Next round should start at midRoundEnd rounded down = 00:02:00
+ // End time should be 00:02:00 + 60s = 00:03:00
+ long expectedStart3 = dayStart + (120 * 1000L); // 00:02:00 (rounded down
from 00:02:30)
+ long expectedEnd3 = expectedStart3 + roundDurationMs; // 00:03:00
+ assertEquals("Next round start time should round down end time of current
round",
+ expectedStart3, nextRound3.getStartTime());
+ assertEquals("Next round end time should be start time + round duration",
expectedEnd3,
+ nextRound3.getEndTime());
+
+ // Test 4: Multiple rounds later (00:05:00 to 00:06:00)
+ long multipleRoundsStart = dayStart + (5 * roundDurationMs); // 00:05:00
+ long multipleRoundsEnd = multipleRoundsStart + roundDurationMs; // 00:06:00
+ ReplicationRound multipleRounds = new
ReplicationRound(multipleRoundsStart, multipleRoundsEnd);
+ ReplicationRound nextRound4 = manager.getNextRound(multipleRounds);
+
+ // Next round should start at multipleRoundsEnd = 00:06:00
+ // End time should be 00:06:00 + 60s = 00:07:00
+ long expectedStart4 = multipleRoundsEnd; // 00:06:00
+ long expectedEnd4 = expectedStart4 + roundDurationMs; // 00:07:00
+ assertEquals("Next round start time should be end time of current round",
expectedStart4,
+ nextRound4.getStartTime());
+ assertEquals("Next round end time should be start time + round duration",
expectedEnd4,
+ nextRound4.getEndTime());
+
+ // Test 5: Verify round duration is consistent
+ long nextRoundDuration = nextRound4.getEndTime() -
nextRound4.getStartTime();
+ assertEquals("Next round duration should be 60 seconds", roundDurationMs,
nextRoundDuration);
+
+ // Test 6: Verify continuity - next round of previous round should equal
original round
+ ReplicationRound originalRound =
+ new ReplicationRound(dayStart + (3 * roundDurationMs), dayStart + (4 *
roundDurationMs)); // 00:03:00
+
// to
+
// 00:04:00
+ ReplicationRound prevRound = manager.getPreviousRound(originalRound);
+ ReplicationRound nextOfPrev = manager.getNextRound(prevRound);
+ assertEquals("Next round of previous round should equal original round
start time",
+ originalRound.getStartTime(), nextOfPrev.getStartTime());
+ assertEquals("Next round of previous round should equal original round end
time",
+ originalRound.getEndTime(), nextOfPrev.getEndTime());
+ }
}