This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this
push:
new 11a1f5a680 Phoenix-7672 Adding failover implementation via replay
(Addendum) (#2312)
11a1f5a680 is described below
commit 11a1f5a68005cc0247c9ba4b466b82b31e3700c5
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Wed Nov 12 03:58:28 2025 +0530
Phoenix-7672 Adding failover implementation via replay (Addendum) (#2312)
---
.../replication/ReplicationLogDiscovery.java | 48 +-
.../phoenix/replication/ReplicationLogTracker.java | 2 +-
.../ReplicationShardDirectoryManager.java | 10 +-
.../reader/ReplicationLogDiscoveryReplay.java | 79 ++-
.../reader/ReplicationLogProcessor.java | 2 +
.../replication/reader/ReplicationLogReplay.java | 2 +
.../reader/ReplicationLogReplayService.java | 4 +
.../reader/RecoverLeaseFSUtilsTestIT.java} | 5 +-
.../ReplicationLogDiscoveryReplayTestIT.java} | 546 ++++++++++++++++++++-
.../reader/ReplicationLogProcessorTestIT.java} | 5 +-
.../reader/ReplicationLogReplayTestIT.java} | 5 +-
.../replication/ReplicationLogDiscoveryTest.java | 2 +-
.../ReplicationShardDirectoryManagerTest.java | 166 ++++++-
13 files changed, 796 insertions(+), 80 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 76051edef4..01e9e13651 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -115,6 +115,7 @@ public abstract class ReplicationLogDiscovery {
}
public void init() throws IOException {
+ LOG.info("Initializing ReplicationLogDiscovery for haGroup: {}",
haGroupName);
initializeLastRoundProcessed();
this.metrics = createMetricsSource();
}
@@ -134,7 +135,7 @@ public abstract class ReplicationLogDiscovery {
public void start() throws IOException {
synchronized (this) {
if (isRunning) {
- LOG.warn("ReplicationLogDiscovery is already running for
group: {}", haGroupName);
+ LOG.warn("ReplicationLogDiscovery is already running for
haGroup: {}", haGroupName);
return;
}
// Initialize and schedule the executors
@@ -150,7 +151,7 @@ public abstract class ReplicationLogDiscovery {
}, 0, getReplayIntervalSeconds(), TimeUnit.SECONDS);
isRunning = true;
- LOG.info("ReplicationLogDiscovery started for group: {}",
haGroupName);
+ LOG.info("ReplicationLogDiscovery started for haGroup: {}",
haGroupName);
}
}
@@ -164,7 +165,7 @@ public abstract class ReplicationLogDiscovery {
synchronized (this) {
if (!isRunning) {
- LOG.warn("ReplicationLogDiscovery is not running for group:
{}", haGroupName);
+ LOG.warn("ReplicationLogDiscovery is not running for haGroup:
{}", haGroupName);
return;
}
@@ -185,7 +186,7 @@ public abstract class ReplicationLogDiscovery {
}
}
- LOG.info("ReplicationLogDiscovery stopped for group: {}", haGroupName);
+ LOG.info("ReplicationLogDiscovery stopped for haGroup: {}",
haGroupName);
}
/**
@@ -209,8 +210,8 @@ public abstract class ReplicationLogDiscovery {
try {
processRound(replicationRound);
} catch (IOException e) {
- LOG.error("Failed processing replication round {}. Will retry
in next "
- + "scheduled run.", replicationRound, e);
+ LOG.error("Failed processing replication round {} for haGroup
{}. Will retry"
+ + "in next scheduled run.", replicationRound,
haGroupName, e);
break; // stop this run, retry later
}
setLastRoundProcessed(replicationRound);
@@ -244,7 +245,7 @@ public abstract class ReplicationLogDiscovery {
* @throws IOException if there's an error during round processing
*/
protected void processRound(ReplicationRound replicationRound) throws
IOException {
- LOG.info("Starting to process round: {}", replicationRound);
+ LOG.info("Starting to process round: {} for haGroup: {}",
replicationRound, haGroupName);
// Increment the number of rounds processed
getMetrics().incrementNumRoundsProcessed();
@@ -254,7 +255,7 @@ public abstract class ReplicationLogDiscovery {
// Conditionally process the in progress files for the round
processInProgressDirectory();
}
- LOG.info("Finished processing round: {}", replicationRound);
+ LOG.info("Finished processing round: {} for haGroup: {}",
replicationRound, haGroupName);
}
/**
@@ -275,7 +276,8 @@ public abstract class ReplicationLogDiscovery {
* @throws IOException if there's an error during file processing
*/
protected void processNewFilesForRound(ReplicationRound replicationRound)
throws IOException {
- LOG.info("Starting new files processing for round: {}",
replicationRound);
+ LOG.info("Starting new files processing for round: {} for haGroup: {}",
+ replicationRound, haGroupName);
long startTime = EnvironmentEdgeManager.currentTime();
List<Path> files =
replicationLogTracker.getNewFilesForRound(replicationRound);
LOG.info("Number of new files for round {} is {}", replicationRound,
files.size());
@@ -284,7 +286,8 @@ public abstract class ReplicationLogDiscovery {
files =
replicationLogTracker.getNewFilesForRound(replicationRound);
}
long duration = EnvironmentEdgeManager.currentTime() - startTime;
- LOG.info("Finished new files processing for round: {} in {}ms",
replicationRound, duration);
+ LOG.info("Finished new files processing for round: {} in {}ms for
haGroup: {}",
+ replicationRound, duration, haGroupName);
getMetrics().updateTimeToProcessNewFiles(duration);
}
@@ -294,23 +297,25 @@ public abstract class ReplicationLogDiscovery {
* @throws IOException if there's an error during file processing
*/
protected void processInProgressDirectory() throws IOException {
+ LOG.info("Starting {} directory processing for haGroup: {}",
+ replicationLogTracker.getInProgressLogSubDirectoryName(),
haGroupName);
// Increase the count for number of times in progress directory is
processed
getMetrics().incrementNumInProgressDirectoryProcessed();
- LOG.info("Starting {} directory processing",
replicationLogTracker.getInProgressLogSubDirectoryName());
long startTime = EnvironmentEdgeManager.currentTime();
long oldestTimestampToProcess =
replicationLogTracker.getReplicationShardDirectoryManager()
.getNearestRoundStartTimestamp(EnvironmentEdgeManager.currentTime())
- getReplayIntervalSeconds() * 1000L;
List<Path> files =
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
- LOG.info("Number of {} files with oldestTimestampToProcess {} is {}",
+ LOG.info("Number of {} files with oldestTimestampToProcess {} is {}
for haGroup: {}",
replicationLogTracker.getInProgressLogSubDirectoryName(),
oldestTimestampToProcess,
- files.size());
+ files.size(), haGroupName);
while (!files.isEmpty()) {
processOneRandomFile(files);
files =
replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
}
long duration = EnvironmentEdgeManager.currentTime() - startTime;
- LOG.info("Finished in-progress files processing in {}ms", duration);
+ LOG.info("Finished in-progress files processing in {}ms for haGroup:
{}",
+ duration, haGroupName);
getMetrics().updateTimeToProcessInProgressFiles(duration);
}
@@ -364,21 +369,26 @@ public abstract class ReplicationLogDiscovery {
Optional<Long> minTimestampFromInProgressFiles =
getMinTimestampFromInProgressFiles();
if (minTimestampFromInProgressFiles.isPresent()) {
- LOG.info("Initializing lastRoundProcessed from {} files with
minimum "
- + "timestamp as {}",
replicationLogTracker.getInProgressLogSubDirectoryName(),
minTimestampFromInProgressFiles.get());
+ LOG.info("Initializing lastRoundProcessed for haGroup: {} from {}
files with minimum "
+ + "timestamp as {}", haGroupName,
+ replicationLogTracker.getInProgressLogSubDirectoryName(),
+ minTimestampFromInProgressFiles.get());
this.lastRoundProcessed =
replicationLogTracker.getReplicationShardDirectoryManager()
.getReplicationRoundFromEndTime(minTimestampFromInProgressFiles.get());
} else {
Optional<Long> minTimestampFromNewFiles =
getMinTimestampFromNewFiles();
if (minTimestampFromNewFiles.isPresent()) {
- LOG.info("Initializing lastRoundProcessed from {} files with
minimum timestamp "
- + "as {}",
replicationLogTracker.getInSubDirectoryName(), minTimestampFromNewFiles.get());
+ LOG.info("Initializing lastRoundProcessed for haGroup: {} from
{}"
+ + "files with minimum timestamp as {}", haGroupName,
+ replicationLogTracker.getInSubDirectoryName(),
+ minTimestampFromNewFiles.get());
this.lastRoundProcessed = replicationLogTracker
.getReplicationShardDirectoryManager()
.getReplicationRoundFromEndTime(minTimestampFromNewFiles.get());
} else {
long currentTime = EnvironmentEdgeManager.currentTime();
- LOG.info("Initializing lastRoundProcessed from current time
{}", currentTime);
+ LOG.info("Initializing lastRoundProcessed for haGroup: {} from
current time {}",
+ haGroupName, currentTime);
this.lastRoundProcessed = replicationLogTracker
.getReplicationShardDirectoryManager()
.getReplicationRoundFromEndTime(EnvironmentEdgeManager.currentTime());
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
index 6d16d9ed8a..5838a9de4b 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
@@ -116,7 +116,7 @@ public class ReplicationLogTracker {
* @return List of valid log file paths that belong to the specified
replication round
* @throws IOException if there's an error accessing the file system
*/
- protected List<Path> getNewFilesForRound(ReplicationRound
replicationRound) throws IOException {
+ public List<Path> getNewFilesForRound(ReplicationRound replicationRound)
throws IOException {
Path roundDirectory =
replicationShardDirectoryManager.getShardDirectory(replicationRound);
LOG.info("Getting new files for round {} from shard {}",
replicationRound, roundDirectory);
if (!fileSystem.exists(roundDirectory)) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
index e5fd495534..7010131b36 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationShardDirectoryManager.java
@@ -175,7 +175,7 @@ public class ReplicationShardDirectoryManager {
* @param timestamp The timestamp in milliseconds since epoch
* @return The nearest replication round start timestamp
*/
- protected long getNearestRoundStartTimestamp(long timestamp) {
+ public long getNearestRoundStartTimestamp(long timestamp) {
// Convert round time from seconds to milliseconds
long roundTimeMs = replicationRoundDurationSeconds * 1000L;
@@ -184,6 +184,14 @@ public class ReplicationShardDirectoryManager {
return (timestamp / roundTimeMs) * roundTimeMs;
}
+ public ReplicationRound getPreviousRound(final ReplicationRound
replicationRound) {
+ return getReplicationRoundFromEndTime(replicationRound.getStartTime());
+ }
+
+ public ReplicationRound getNextRound(final ReplicationRound
replicationRound) {
+ return getReplicationRoundFromStartTime(replicationRound.getEndTime());
+ }
+
public int getReplicationRoundDurationSeconds() {
return this.replicationRoundDurationSeconds;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index 93fd18c947..faf078fa7f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -18,8 +18,6 @@
package org.apache.phoenix.replication.reader;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -136,6 +134,9 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
@Override
public void init() throws IOException {
+
+ LOG.info("Initializing ReplicationLogDiscoveryReplay for haGroup: {}",
haGroupName);
+
HAGroupStateListener degradedListener = (groupName, fromState,
toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
if (clusterType == ClusterType.LOCAL &&
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(toState)) {
replicationReplayState.set(ReplicationReplayState.DEGRADED);
@@ -223,7 +224,10 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
*/
@Override
protected void initializeLastRoundProcessed() throws IOException {
+ LOG.info("Initializing last round processed for haGroup: {}",
haGroupName);
HAGroupStoreRecord haGroupStoreRecord = getHAGroupRecord();
+ LOG.info("Found HA Group state during initialization as {} for
haGroup: {}",
+ haGroupStoreRecord.getHAGroupState(), haGroupName);
if
(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(haGroupStoreRecord.getHAGroupState()))
{
replicationReplayState.compareAndSet(ReplicationReplayState.NOT_INITIALIZED,
ReplicationReplayState.DEGRADED);
@@ -298,6 +302,8 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
LOG.info("Starting replay with lastRoundProcessed={},
lastRoundInSync={}",
lastRoundProcessed, lastRoundInSync);
Optional<ReplicationRound> optionalNextRound =
getFirstRoundToProcess();
+ LOG.info("Found first round to process as {} for haGroup: {}",
+ optionalNextRound, haGroupName);
while (optionalNextRound.isPresent()) {
ReplicationRound replicationRound = optionalNextRound.get();
try {
@@ -314,9 +320,14 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
switch (currentState) {
case SYNCED_RECOVERY:
// Rewind to last in-sync round
- LOG.info("SYNCED_RECOVERY detected, rewinding to
lastRoundInSync={}",
+ LOG.info("SYNCED_RECOVERY detected, rewinding with
lastRoundInSync={}",
lastRoundInSync);
- setLastRoundProcessed(lastRoundInSync);
+ Optional<ReplicationRound> firstRoundToProcess =
getFirstRoundToProcess();
+ LOG.info("Calculated first round to process after
SYNCED_RECOVERY as"
+ + "{}", firstRoundToProcess);
+ firstRoundToProcess.ifPresent(round -> setLastRoundProcessed(
+
replicationLogTracker.getReplicationShardDirectoryManager()
+ .getPreviousRound(round)));
// Only reset to NORMAL if state hasn't been flipped to
DEGRADED
replicationReplayState.compareAndSet(ReplicationReplayState.SYNCED_RECOVERY,
ReplicationReplayState.SYNC);
@@ -348,17 +359,10 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
if (!optionalNextRound.isPresent() && shouldTriggerFailover()) {
LOG.info("No more rounds to process, lastRoundInSync={},
lastRoundProcessed={}. "
+ "Failover is triggered & in progress directory
is empty. "
- + "Marking cluster state as {}",
+ + "Attempting to mark cluster state as {}",
lastRoundInSync, lastRoundProcessed,
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
- try {
- triggerFailover();
- LOG.info("Successfully updated the cluster state");
- failoverPending.set(false);
- } catch (InvalidClusterRoleTransitionException exception) {
- LOG.warn("Failed to update the cluster state.", exception);
- failoverPending.set(false);
- }
+ triggerFailover();
}
}
@@ -372,8 +376,15 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
* @return Optional containing the first round to process, or empty if not
enough time
* has passed
*/
- private Optional<ReplicationRound> getFirstRoundToProcess() {
- long lastRoundEndTimestamp = getLastRoundInSync().getEndTime();
+ private Optional<ReplicationRound> getFirstRoundToProcess() throws
IOException {
+ ReplicationRound lastRoundInSync = getLastRoundInSync();
+ long lastRoundEndTimestamp = lastRoundInSync.getEndTime();
+ if (lastRoundInSync.getStartTime() == 0) {
+ Optional<Long> optionalMinimumNewFilesTimestamp =
getMinTimestampFromNewFiles();
+ lastRoundEndTimestamp =
replicationLogTracker.getReplicationShardDirectoryManager()
+
.getNearestRoundStartTimestamp(optionalMinimumNewFilesTimestamp
+ .orElseGet(EnvironmentEdgeManager::currentTime));
+ }
long currentTime = EnvironmentEdgeManager.currentTime();
if (currentTime - lastRoundEndTimestamp < roundTimeMills +
bufferMillis) {
// nothing more to process
@@ -457,17 +468,39 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
return optionalHAGroupStateRecord.get();
}
+ /**
+ * Determines whether failover should be triggered based on completion
criteria.
+ *
+ * Failover is safe to trigger when all of the following conditions are
met:
+ * 1. A failover has been requested (failoverPending is true)
+ * 2. No files are currently in the in-progress directory
+ * 3. No new files exist for ongoing round
+ *
+ * These conditions ensure all replication logs have been processed before
transitioning
+ * the cluster from STANDBY to ACTIVE state.
+ *
+ * @return true if all conditions are met and failover should be
triggered, false otherwise
+ * @throws IOException if there's an error checking file status
+ */
protected boolean shouldTriggerFailover() throws IOException {
- return failoverPending.get() &&
lastRoundInSync.equals(lastRoundProcessed)
- && replicationLogTracker.getInProgressFiles().isEmpty()
- ;
- // TODO: Check for in files of lastRoundProcessed, lastRoundProcessed
+ roundTime as well - because there can be new files for upcoming round
+ return failoverPending.get() &&
replicationLogTracker.getInProgressFiles().isEmpty()
+ &&
replicationLogTracker.getNewFilesForRound(replicationLogTracker
+ .getReplicationShardDirectoryManager()
+ .getNextRound(getLastRoundProcessed())).isEmpty();
}
- protected void triggerFailover() throws IOException,
InvalidClusterRoleTransitionException {
- // TODO: Update cluster state to ACTIVE_IN_SYNC within try block
- // (once API is supported in HA Store)
- // Throw any exception back to caller.
+ protected void triggerFailover() {
+ try {
+
HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName);
+ failoverPending.set(false);
+ } catch (InvalidClusterRoleTransitionException
invalidClusterRoleTransitionException) {
+ LOG.warn("Failed to update the cluster state due to"
+ + "InvalidClusterRoleTransitionException. Setting
failoverPending"
+ + "to false.", invalidClusterRoleTransitionException);
+ failoverPending.set(false);
+ } catch (Exception exception) {
+ LOG.error("Failed to update the cluster state.", exception);
+ }
}
public enum ReplicationReplayState {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index 4a47779d70..e44bcc5ba9 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -217,6 +217,8 @@ public class ReplicationLogProcessor implements Closeable {
public void processLogFile(FileSystem fs, Path filePath) throws
IOException {
+ LOG.info("Starting to process file {}", filePath);
+
// Map from Table Name to List of Mutations
Map<TableName, List<Mutation>> tableToMutationsMap = new HashMap<>();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
index 5f12573403..587f9bdb02 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
@@ -107,6 +107,7 @@ public class ReplicationLogReplay {
* @throws IOException if there's an error during initialization
*/
protected void init() throws IOException {
+ LOG.info("Initializing ReplicationLogReplay for haGroup: {}",
haGroupName);
initializeFileSystem();
Path newFilesDirectory = new Path(new Path(rootURI.getPath(),
haGroupName), ReplicationLogReplay.IN_DIRECTORY_NAME);
ReplicationShardDirectoryManager replicationShardDirectoryManager =
@@ -121,6 +122,7 @@ public class ReplicationLogReplay {
}
public void close() {
+ LOG.info("Closing ReplicationLogReplay for haGroup: {}", haGroupName);
replicationLogDiscoveryReplay.getReplicationLogFileTracker().close();
replicationLogDiscoveryReplay.close();
// Remove the instance from cache
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
index 7df307f762..fef11daec6 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
@@ -195,6 +195,8 @@ public class ReplicationLogReplayService {
*/
protected void startReplicationReplay() throws IOException, SQLException {
List<String> replicationGroups = getReplicationGroups();
+ LOG.info("{} number of HA Groups found to start Replication Replay",
+ replicationGroups.size());
for (String replicationGroup : replicationGroups) {
ReplicationLogReplay.get(conf, replicationGroup).startReplay();
}
@@ -205,6 +207,8 @@ public class ReplicationLogReplayService {
*/
protected void stopReplicationReplay() throws IOException, SQLException {
List<String> replicationGroups = getReplicationGroups();
+ LOG.info("{} number of HA Groups found to stop Replication Replay",
+ replicationGroups.size());
for (String replicationGroup : replicationGroups) {
ReplicationLogReplay replicationLogReplay =
ReplicationLogReplay.get(conf, replicationGroup);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTestIT.java
similarity index 96%
rename from
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
rename to
phoenix-core/src/it/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTestIT.java
index 6df8577095..d688d55747 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTestIT.java
@@ -30,18 +30,21 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
/**
* Test our recoverLease loop against mocked up filesystem.
*/
-public class RecoverLeaseFSUtilsTest extends ParallelStatsDisabledIT {
+@Category(NeedsOwnMiniClusterTest.class)
+public class RecoverLeaseFSUtilsTestIT extends ParallelStatsDisabledIT {
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTest.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
similarity index 72%
rename from
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTest.java
rename to
phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index b0bb1d7e98..ca1171d76f 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTest.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -19,9 +19,11 @@ package org.apache.phoenix.replication.reader;
import java.io.IOException;
import java.net.URI;
+import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
@@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.jdbc.HAGroupStoreRecord;
import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
@@ -65,9 +68,9 @@ import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.when;
@Category(NeedsOwnMiniClusterTest.class)
-public class ReplicationLogDiscoveryReplayTest extends BaseTest {
+public class ReplicationLogDiscoveryReplayTestIT extends BaseTest {
- private static final Logger LOG =
LoggerFactory.getLogger(ReplicationLogDiscoveryReplayTest.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationLogDiscoveryReplayTestIT.class);
private static final
HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new
HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
private String zkUrl;
@@ -94,7 +97,7 @@ public class ReplicationLogDiscoveryReplayTest extends
BaseTest {
public void setUp() throws Exception {
zkUrl = getLocalZkUrl(config);
peerZkUrl = CLUSTERS.getZkUrl2();
-
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(),
zkUrl, peerZkUrl,
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName,
zkUrl, peerZkUrl,
CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY,
null);
@@ -1191,6 +1194,354 @@ public class ReplicationLogDiscoveryReplayTest extends
BaseTest {
}
}
+ /**
+ * Tests replay method when lastRoundInSync is not set, i.e. has start
time 0.
+ * Validates that getFirstRoundToProcess uses minimum timestamp from new
files
+ */
+ @Test
+ public void testReplay_LastRoundInSync_NotInitialized() throws IOException
{
+ TestableReplicationLogTracker fileTracker =
createReplicationLogTracker(config, haGroupName, localFs, standbyUri);
+
+ try {
+ long roundTimeMills =
fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds()
* 1000L;
+ long bufferMillis = (long) (roundTimeMills * 0.15);
+
+ // Create new files with specific timestamps
+ // The minimum timestamp should be used as the basis for the first
round
+ long minFileTimestamp = 1704067200000L + (5 * roundTimeMills) +
(30 * 1000L); // 2024-01-01 00:05:30
+ long maxFileTimestamp = 1704067200000L + (8 * roundTimeMills) +
(45 * 1000L); // 2024-01-01 00:08:45
+ long middleFileTimestamp = 1704067200000L + (7 * roundTimeMills) +
(15 * 1000L); // 2024-01-01 00:07:15
+
+ // Create files in different shard directories
+ ReplicationShardDirectoryManager shardManager =
fileTracker.getReplicationShardDirectoryManager();
+
+ // Create file with minimum timestamp
+ Path shardPath1 = shardManager.getShardDirectory(minFileTimestamp);
+ localFs.mkdirs(shardPath1);
+ Path file1 = new Path(shardPath1, minFileTimestamp + "_rs-1.plog");
+ localFs.create(file1, true).close();
+
+ // Create file with middle timestamp
+ Path shardPath2 =
shardManager.getShardDirectory(middleFileTimestamp);
+ localFs.mkdirs(shardPath2);
+ Path file2 = new Path(shardPath2, middleFileTimestamp +
"_rs-2.plog");
+ localFs.create(file2, true).close();
+
+ // Create file with maximum timestamp
+ Path shardPath3 = shardManager.getShardDirectory(maxFileTimestamp);
+ localFs.mkdirs(shardPath3);
+ Path file3 = new Path(shardPath3, maxFileTimestamp + "_rs-3.plog");
+ localFs.create(file3, true).close();
+
+ // Create HAGroupStoreRecord for STANDBY state
+ HAGroupStoreRecord mockRecord = new
HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(), peerZkUrl, zkUrl, peerZkUrl, 0L);
+
+ // Calculate expected first round start time (minimum timestamp
rounded down to nearest round start)
+ long expectedFirstRoundStart =
shardManager.getNearestRoundStartTimestamp(minFileTimestamp);
+ long expectedFirstRoundEnd = expectedFirstRoundStart +
roundTimeMills;
+
+ // Set current time to allow processing exactly 5 rounds
+ // Round 1 ends at expectedFirstRoundEnd
+ // Round 2 ends at expectedFirstRoundEnd + roundTimeMills
+ // Round 3 ends at expectedFirstRoundEnd + 2*roundTimeMills
+ // Round 4 ends at expectedFirstRoundEnd + 3*roundTimeMills
+ // Round 5 ends at expectedFirstRoundEnd + 4*roundTimeMills
+ // Round 6 starts at expectedFirstRoundEnd + 4*roundTimeMills
+ // For round 6 to be processable: currentTime -
(expectedFirstRoundEnd + 4*roundTimeMills) >= roundTimeMills + bufferMillis
+ // So: currentTime >= expectedFirstRoundEnd + 5*roundTimeMills +
bufferMillis
+ // For round 6 NOT to be processable: currentTime -
(expectedFirstRoundEnd + 5*roundTimeMills) < roundTimeMills + bufferMillis
+ // So exactly 5 rounds should be processed with currentTime =
expectedFirstRoundEnd + 5*roundTimeMills + bufferMillis - 1
+ // To be safe, use: currentTime = expectedFirstRoundEnd +
5*roundTimeMills + bufferMillis - 1000
+ long currentTime = expectedFirstRoundEnd + (5 * roundTimeMills) +
bufferMillis - 1000;
+ EnvironmentEdge edge = () -> currentTime;
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ // Calculate exact number of rounds that should be processed
+ // Round 1: from getFirstRoundToProcess (based on lastRoundInSync
with start time 0)
+ // - Uses minFileTimestamp rounded down = expectedFirstRoundStart
+ // - Round 1: expectedFirstRoundStart to expectedFirstRoundEnd
+ // Round 2+: from getNextRoundToProcess (based on previous round's
end time)
+ // - Round 2: expectedFirstRoundEnd to expectedFirstRoundEnd +
roundTimeMills
+ // - Round 3: expectedFirstRoundEnd + roundTimeMills to
expectedFirstRoundEnd + 2*roundTimeMills
+ // - Round 4: expectedFirstRoundEnd + 2*roundTimeMills to
expectedFirstRoundEnd + 3*roundTimeMills
+ // - Round 5: expectedFirstRoundEnd + 3*roundTimeMills to
expectedFirstRoundEnd + 4*roundTimeMills
+ int expectedRoundCount = 5;
+ long[] expectedRoundStarts = new long[expectedRoundCount];
+ long[] expectedRoundEnds = new long[expectedRoundCount];
+ expectedRoundStarts[0] = expectedFirstRoundStart;
+ expectedRoundEnds[0] = expectedFirstRoundEnd;
+ for (int i = 1; i < expectedRoundCount; i++) {
+ expectedRoundStarts[i] = expectedFirstRoundEnd + ((i - 1) *
roundTimeMills);
+ expectedRoundEnds[i] = expectedRoundStarts[i] + roundTimeMills;
+ }
+
+ try {
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(fileTracker,
mockRecord);
+
+ // Set lastRoundInSync with start time 0 (the new case being
tested)
+ ReplicationRound lastRoundInSyncWithZeroStart = new
ReplicationRound(0L, roundTimeMills);
+ discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
+
+ // Set lastRoundProcessed to some initial value
+ ReplicationRound initialLastRoundProcessed = new
ReplicationRound(0L, roundTimeMills);
+ discovery.setLastRoundProcessed(initialLastRoundProcessed);
+
discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC);
+
+ // Verify initial state
+ ReplicationRound lastRoundInSyncBefore =
discovery.getLastRoundInSync();
+ assertEquals("lastRoundInSync should have start time 0 before
replay", 0L, lastRoundInSyncBefore.getStartTime());
+
+ // Call replay - should start from minimum timestamp of new
files
+ discovery.replay();
+
+ // Verify exact count of processRound calls
+ int actualProcessRoundCallCount =
discovery.getProcessRoundCallCount();
+ assertEquals("processRound should be called exactly " +
expectedRoundCount + " times",
+ expectedRoundCount, actualProcessRoundCallCount);
+
+ // Get all processed rounds
+ List<ReplicationRound> processedRounds =
discovery.getProcessedRounds();
+ assertEquals("Should have processed exactly " +
expectedRoundCount + " rounds",
+ expectedRoundCount, processedRounds.size());
+
+ // Verify each round's exact parameters
+ for (int i = 0; i < expectedRoundCount; i++) {
+ ReplicationRound actualRound = processedRounds.get(i);
+ assertEquals("Round " + (i + 1) + " should have correct
start time",
+ expectedRoundStarts[i],
actualRound.getStartTime());
+ assertEquals("Round " + (i + 1) + " should have correct
end time",
+ expectedRoundEnds[i], actualRound.getEndTime());
+ }
+
+ // Verify lastRoundProcessed was updated to the last processed
round
+ ReplicationRound lastRoundAfterReplay =
discovery.getLastRoundProcessed();
+ assertNotNull("Last round processed should not be null",
lastRoundAfterReplay);
+ assertEquals("Last round processed should match the last
processed round start time",
+ expectedRoundStarts[expectedRoundCount - 1],
lastRoundAfterReplay.getStartTime());
+ assertEquals("Last round processed should match the last
processed round end time",
+ expectedRoundEnds[expectedRoundCount - 1],
lastRoundAfterReplay.getEndTime());
+
+ // Verify lastRoundInSync was updated (in SYNC state, both
should advance together)
+ ReplicationRound lastRoundInSyncAfter =
discovery.getLastRoundInSync();
+ assertNotNull("Last round in sync should not be null",
lastRoundInSyncAfter);
+ assertEquals("Last round in sync should match last round
processed in SYNC state",
+ lastRoundAfterReplay, lastRoundInSyncAfter);
+ assertEquals("Last round in sync should have correct start
time",
+ expectedRoundStarts[expectedRoundCount - 1],
lastRoundInSyncAfter.getStartTime());
+ assertEquals("Last round in sync should have correct end time",
+ expectedRoundEnds[expectedRoundCount - 1],
lastRoundInSyncAfter.getEndTime());
+
+ // Verify state remains SYNC
+ assertEquals("State should remain SYNC",
+
ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC,
+ discovery.getReplicationReplayState());
+
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ } finally {
+ fileTracker.close();
+ }
+ }
+
+ /**
+ * Tests replay method when state changes from DEGRADED to SYNC between
round processing
+ * and lastRoundInSync has start time 0.
+ *
+ * Expected behavior:
+ * - 1st round is processed in DEGRADED state
+ * - 2nd round is processed in DEGRADED state, but then state transitions
to SYNCED_RECOVERY
+ * - When state becomes SYNCED_RECOVERY, lastRoundProcessed is rewound to
the previous round of getFirstRoundToProcess()
+ * (which uses minimum file timestamp when lastRoundInSync.startTime ==
0)
+ * - Then 5 rounds are processed starting from the first round (based on
minimum file timestamp) in SYNC state
+ *
+ * Total: 7 rounds processed (2 in DEGRADED, then rewind, then 5 in SYNC)
+ */
+ @Test
+ public void testReplay_DegradedToSync_LastRoundInSyncStartTimeZero()
throws IOException {
+ TestableReplicationLogTracker fileTracker =
createReplicationLogTracker(config, haGroupName, localFs, standbyUri);
+
+ try {
+ long roundTimeMills =
fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds()
* 1000L;
+ long bufferMillis = (long) (roundTimeMills * 0.15);
+
+ // Create new files with specific timestamps
+ // The minimum timestamp should be used as the basis for the first
round
+ long minFileTimestamp = 1704067200000L + (5 * roundTimeMills) +
(30 * 1000L); // 2024-01-01 00:05:30
+ long maxFileTimestamp = 1704067200000L + (8 * roundTimeMills) +
(45 * 1000L); // 2024-01-01 00:08:45
+
+ // Create files in different shard directories
+ ReplicationShardDirectoryManager shardManager =
fileTracker.getReplicationShardDirectoryManager();
+
+ // Create file with minimum timestamp
+ Path shardPath1 = shardManager.getShardDirectory(minFileTimestamp);
+ localFs.mkdirs(shardPath1);
+ Path file1 = new Path(shardPath1, minFileTimestamp + "_rs-1.plog");
+ localFs.create(file1, true).close();
+
+ // Create file with maximum timestamp
+ Path shardPath2 = shardManager.getShardDirectory(maxFileTimestamp);
+ localFs.mkdirs(shardPath2);
+ Path file2 = new Path(shardPath2, maxFileTimestamp + "_rs-2.plog");
+ localFs.create(file2, true).close();
+
+ // Create HAGroupStoreRecord for STANDBY state
+ HAGroupStoreRecord mockRecord = new
HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(), peerZkUrl, zkUrl, peerZkUrl, 0L);
+
+ // Calculate expected first round start time (minimum timestamp
rounded down to nearest round start)
+ long expectedFirstRoundStart =
shardManager.getNearestRoundStartTimestamp(minFileTimestamp);
+ long expectedFirstRoundEnd = expectedFirstRoundStart +
roundTimeMills;
+
+ // After rewind, lastRoundProcessed will be set to previous round
of firstRoundToProcess
+ // firstRoundToProcess = expectedFirstRoundStart to
expectedFirstRoundEnd
+ // previous round = expectedFirstRoundStart - roundTimeMills to
expectedFirstRoundStart
+ // Then getNextRoundToProcess will return the first round starting
from expectedFirstRoundStart
+ // So the first SYNC round after rewind will be
expectedFirstRoundStart to expectedFirstRoundEnd
+
+ // Set current time to allow processing 7 rounds total (2 in
DEGRADED, then rewind, then 5 in SYNC)
+ // After rewind, we need time for 5 more rounds starting from
expectedFirstRoundStart
+ // Round 7 (the 5th round after rewind) will end at
expectedFirstRoundEnd + 4*roundTimeMills
+ // = expectedFirstRoundStart + roundTimeMills + 4*roundTimeMills
= expectedFirstRoundStart + 5*roundTimeMills
+ // For round 7 to be processable: currentTime -
(expectedFirstRoundEnd + 4*roundTimeMills) >= roundTimeMills + bufferMillis
+ // So: currentTime >= expectedFirstRoundStart + 5*roundTimeMills +
bufferMillis
+ // Set it slightly above to ensure round 7 is processable
+ long currentTime = expectedFirstRoundStart + (5 * roundTimeMills)
+ bufferMillis + 1000;
+ EnvironmentEdge edge = () -> currentTime;
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ // Calculate expected rounds
+ // Rounds 1-2: In DEGRADED state
+ // Round 1: expectedFirstRoundStart to expectedFirstRoundEnd
(DEGRADED)
+ // Round 2: expectedFirstRoundEnd to expectedFirstRoundEnd +
roundTimeMills (DEGRADED, transitions to SYNCED_RECOVERY at end)
+ // After round 2, state becomes SYNCED_RECOVERY and causes rewind
+ // - getFirstRoundToProcess returns: expectedFirstRoundStart to
expectedFirstRoundEnd
+ // - lastRoundProcessed is set to: expectedFirstRoundStart -
roundTimeMills to expectedFirstRoundStart
+ // Rounds 3-7: In SYNC state (starting from first round based on
minimum file timestamp)
+ // Round 3: expectedFirstRoundStart to expectedFirstRoundEnd
(SYNC, after rewind - first round)
+ // Round 4: expectedFirstRoundEnd to expectedFirstRoundEnd +
roundTimeMills (SYNC)
+ // Round 5: expectedFirstRoundEnd + roundTimeMills to
expectedFirstRoundEnd + 2*roundTimeMills (SYNC)
+ // Round 6: expectedFirstRoundEnd + 2*roundTimeMills to
expectedFirstRoundEnd + 3*roundTimeMills (SYNC)
+ // Round 7: expectedFirstRoundEnd + 3*roundTimeMills to
expectedFirstRoundEnd + 4*roundTimeMills (SYNC)
+ int expectedRoundCount = 7;
+ int degradedRoundCount = 2; // First 2 rounds are in DEGRADED state
+ int syncRoundCount = 5; // Last 5 rounds are in SYNC state (after
rewind)
+ long[] expectedRoundStarts = new long[expectedRoundCount];
+ long[] expectedRoundEnds = new long[expectedRoundCount];
+
+ // Rounds 1-2: DEGRADED
+ expectedRoundStarts[0] = expectedFirstRoundStart;
+ expectedRoundEnds[0] = expectedFirstRoundEnd;
+ expectedRoundStarts[1] = expectedFirstRoundEnd;
+ expectedRoundEnds[1] = expectedFirstRoundEnd + roundTimeMills;
+
+ // Rounds 3-7: SYNC (after rewind, starting from
expectedFirstRoundStart - the first round based on minimum file timestamp)
+ expectedRoundStarts[2] = expectedFirstRoundStart; // Round 3:
first round after rewind
+ expectedRoundEnds[2] = expectedFirstRoundEnd;
+ for (int i = 3; i < expectedRoundCount; i++) {
+ expectedRoundStarts[i] = expectedFirstRoundEnd + ((i - 3) *
roundTimeMills);
+ expectedRoundEnds[i] = expectedRoundStarts[i] + roundTimeMills;
+ }
+
+ try {
+ // Create discovery that transitions from DEGRADED to
SYNCED_RECOVERY after 2 rounds
+ // SYNCED_RECOVERY triggers a rewind, then transitions to SYNC
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(fileTracker,
mockRecord) {
+ private int roundCount = 0;
+
+ @Override
+ protected void processRound(ReplicationRound
replicationRound) throws IOException {
+ super.processRound(replicationRound);
+ roundCount++;
+
+ // Transition to SYNCED_RECOVERY after 2 rounds (after
processing round 2)
+ // This triggers a rewind similar to when coming back
from DEGRADED to SYNC
+ if (roundCount == degradedRoundCount) {
+
setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY);
+ }
+ }
+ };
+
+ // Set lastRoundInSync with start time 0
+ ReplicationRound lastRoundInSyncWithZeroStart = new
ReplicationRound(0L, roundTimeMills);
+ discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
+
+ // Set lastRoundProcessed to some initial value
+ ReplicationRound initialLastRoundProcessed = new
ReplicationRound(0L, roundTimeMills);
+ discovery.setLastRoundProcessed(initialLastRoundProcessed);
+
discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED);
+
+ // Verify initial state
+ ReplicationRound lastRoundInSyncBefore =
discovery.getLastRoundInSync();
+ assertEquals("lastRoundInSync should have start time 0 before
replay", 0L, lastRoundInSyncBefore.getStartTime());
+ assertEquals("Initial state should be DEGRADED",
+
ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED,
+ discovery.getReplicationReplayState());
+
+ // Call replay - should start from minimum timestamp of new
files
+ discovery.replay();
+
+ // Verify exact count of processRound calls
+ int actualProcessRoundCallCount =
discovery.getProcessRoundCallCount();
+ assertEquals("processRound should be called exactly " +
expectedRoundCount + " times",
+ expectedRoundCount, actualProcessRoundCallCount);
+
+ // Get all processed rounds
+ List<ReplicationRound> processedRounds =
discovery.getProcessedRounds();
+ assertEquals("Should have processed exactly " +
expectedRoundCount + " rounds",
+ expectedRoundCount, processedRounds.size());
+
+ // Verify each round's exact parameters
+ for (int i = 0; i < expectedRoundCount; i++) {
+ ReplicationRound actualRound = processedRounds.get(i);
+ assertEquals("Round " + (i + 1) + " should have correct
start time",
+ expectedRoundStarts[i],
actualRound.getStartTime());
+ assertEquals("Round " + (i + 1) + " should have correct
end time",
+ expectedRoundEnds[i], actualRound.getEndTime());
+ }
+
+ // Verify lastRoundProcessed was updated to the last processed
round
+ ReplicationRound lastRoundAfterReplay =
discovery.getLastRoundProcessed();
+ assertNotNull("Last round processed should not be null",
lastRoundAfterReplay);
+ assertEquals("Last round processed should match the last
processed round start time",
+ expectedRoundStarts[expectedRoundCount - 1],
lastRoundAfterReplay.getStartTime());
+ assertEquals("Last round processed should match the last
processed round end time",
+ expectedRoundEnds[expectedRoundCount - 1],
lastRoundAfterReplay.getEndTime());
+
+ // Verify lastRoundInSync behavior:
+ // - During first 2 rounds (DEGRADED), lastRoundInSync should
NOT be updated (remains with start time 0)
+ // - After transition to SYNCED_RECOVERY and then SYNC (rounds
3-7), lastRoundInSync should be updated
+ // - Final lastRoundInSync should match lastRoundProcessed
(because we're in SYNC state at the end)
+ ReplicationRound lastRoundInSyncAfter =
discovery.getLastRoundInSync();
+ assertNotNull("Last round in sync should not be null",
lastRoundInSyncAfter);
+
+ // After processing, we're in SYNC state, so lastRoundInSync
should be updated
+ // It should match the last processed round because state is
SYNC
+ assertEquals("Last round in sync should match last round
processed in SYNC state",
+ lastRoundAfterReplay, lastRoundInSyncAfter);
+ assertEquals("Last round in sync should have correct start
time",
+ expectedRoundStarts[expectedRoundCount - 1],
lastRoundInSyncAfter.getStartTime());
+ assertEquals("Last round in sync should have correct end time",
+ expectedRoundEnds[expectedRoundCount - 1],
lastRoundInSyncAfter.getEndTime());
+ assertTrue("Last round in sync start time should not be 0
after processing in SYNC state",
+ lastRoundInSyncAfter.getStartTime() > 0);
+
+ // Verify state transitioned to SYNC
+ assertEquals("State should be SYNC after transition",
+
ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC,
+ discovery.getReplicationReplayState());
+
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ } finally {
+ fileTracker.close();
+ }
+ }
+
/**
* Tests replay method when failoverPending becomes true during processing
and triggers failover after all rounds.
* Validates that triggerFailover is called exactly once when all
conditions are met.
@@ -1299,7 +1650,7 @@ public class ReplicationLogDiscoveryReplayTest extends
BaseTest {
/**
* Tests the shouldTriggerFailover method with various combinations of
failoverPending,
- * lastRoundInSync, lastRoundProcessed and in-progress files state.
+ * in-progress files, and new files for next round.
*/
@Test
public void testShouldTriggerFailover() throws IOException {
@@ -1317,11 +1668,12 @@ public class ReplicationLogDiscoveryReplayTest extends
BaseTest {
try {
// Create test rounds
ReplicationRound testRound = new ReplicationRound(1704153600000L,
1704153660000L);
- ReplicationRound differentRound = new
ReplicationRound(1704153540000L, 1704153600000L);
+ ReplicationRound nextRound =
tracker.getReplicationShardDirectoryManager().getNextRound(testRound);
// Test Case 1: All conditions true - should return true
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
@@ -1334,6 +1686,7 @@ public class ReplicationLogDiscoveryReplayTest extends
BaseTest {
// Test Case 2: failoverPending is false - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
@@ -1343,72 +1696,78 @@ public class ReplicationLogDiscoveryReplayTest extends
BaseTest {
discovery.shouldTriggerFailover());
}
- // Test Case 3: lastRoundInSync not equals lastRoundProcessed -
should return false
+ // Test Case 3: in-progress files not empty - should return false
{
-
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test.plog")));
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
- discovery.setLastRoundProcessed(differentRound);
+ discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(true);
- assertFalse("Should not trigger failover when lastRoundInSync
!= lastRoundProcessed",
+ assertFalse("Should not trigger failover when in-progress
files are not empty",
discovery.shouldTriggerFailover());
}
- // Test Case 4: in-progress files not empty - should return false
+ // Test Case 4: new files exist for next round - should return
false
{
-
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test.plog")));
+
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
Path("test.plog")));
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(true);
- assertFalse("Should not trigger failover when in-progress
files are not empty",
+ assertFalse("Should not trigger failover when new files exist
for next round",
discovery.shouldTriggerFailover());
}
- // Test Case 5: failoverPending false AND lastRoundInSync !=
lastRoundProcessed - should return false
+ // Test Case 5: failoverPending false AND in-progress files not
empty - should return false
{
-
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test.plog")));
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
- discovery.setLastRoundProcessed(differentRound);
+ discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(false);
- assertFalse("Should not trigger failover when failoverPending
is false and rounds differ",
+ assertFalse("Should not trigger failover when failoverPending
is false and in-progress files exist",
discovery.shouldTriggerFailover());
}
- // Test Case 6: failoverPending false AND in-progress files not
empty - should return false
+ // Test Case 6: failoverPending false AND new files exist - should
return false
{
-
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test.plog")));
+
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
Path("test.plog")));
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(false);
- assertFalse("Should not trigger failover when failoverPending
is false and files exist",
+ assertFalse("Should not trigger failover when failoverPending
is false and new files exist",
discovery.shouldTriggerFailover());
}
- // Test Case 7: lastRoundInSync != lastRoundProcessed AND
in-progress files not empty - should return false
+ // Test Case 7: in-progress files not empty AND new files exist -
should return false
{
-
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test.plog")));
+
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test1.plog")));
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
Path("test2.plog")));
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
- discovery.setLastRoundProcessed(differentRound);
+ discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(true);
- assertFalse("Should not trigger failover when rounds differ
and files exist",
+ assertFalse("Should not trigger failover when both in-progress
and new files exist",
discovery.shouldTriggerFailover());
}
// Test Case 8: All conditions false - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test.plog")));
+
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
Path("test2.plog")));
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
- discovery.setLastRoundProcessed(differentRound);
+ discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(false);
assertFalse("Should not trigger failover when all conditions
are false",
@@ -1420,6 +1779,141 @@ public class ReplicationLogDiscoveryReplayTest extends
BaseTest {
}
}
+ /**
+ * Tests the triggerFailover method to verify it properly updates cluster
state
+ * and handles exceptions correctly.
+ */
+ @Test
+ public void testTriggerFailover() throws IOException, SQLException {
+ final String haGroupName = "testTriggerFailoverHAGroup";
+ // Set up HA group store record with STANDBY_TO_ACTIVE state for
successful transition
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName,
zkUrl, peerZkUrl,
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.STANDBY_TO_ACTIVE,
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ null);
+
+ TestableReplicationLogTracker fileTracker = null;
+ try {
+ fileTracker = createReplicationLogTracker(config, haGroupName,
localFs, standbyUri);
+
+ // Create a spy of ReplicationLogDiscoveryReplay to test actual
implementation
+ ReplicationLogDiscoveryReplay discovery = Mockito.spy(
+ new ReplicationLogDiscoveryReplay(fileTracker));
+
+ // Initialize the discovery
+ discovery.init();
+
+ // Set failoverPending to true
+ discovery.setFailoverPending(true);
+ assertTrue("failoverPending should be true before
triggerFailover", discovery.getFailoverPending());
+
+ // Verify initial state is STANDBY_TO_ACTIVE
+ Optional<HAGroupStoreRecord> recordBefore =
HAGroupStoreManager.getInstance(config)
+ .getHAGroupStoreRecord(haGroupName);
+ assertTrue("HA group record should exist",
recordBefore.isPresent());
+ assertEquals("Initial state should be STANDBY_TO_ACTIVE",
+ HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE,
+ recordBefore.get().getHAGroupState());
+
+ // Call triggerFailover - should succeed and update cluster state
to ACTIVE_IN_SYNC
+ discovery.triggerFailover();
+
+ // Verify triggerFailover was called once
+ Mockito.verify(discovery, Mockito.times(1)).triggerFailover();
+
+ // Verify cluster state is updated to ACTIVE_IN_SYNC after failover
+ Optional<HAGroupStoreRecord> recordAfter =
HAGroupStoreManager.getInstance(config)
+ .getHAGroupStoreRecord(haGroupName);
+ assertTrue("HA group record should exist after failover",
recordAfter.isPresent());
+ assertEquals("Cluster state should be ACTIVE_IN_SYNC after
successful failover",
+ HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
+ recordAfter.get().getHAGroupState());
+
+ // Verify failoverPending is set to false after successful failover
+ assertFalse("failoverPending should be set to false after
successful triggerFailover",
+ discovery.getFailoverPending());
+
+ } finally {
+ // Clean up
+ if (fileTracker != null) {
+ fileTracker.close();
+ }
+ try {
+
HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, zkUrl);
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up HA group store record", e);
+ }
+ }
+ }
+
+ /**
+ * Tests triggerFailover when
HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName)
+ * throws InvalidClusterRoleTransitionException.
+ * Verifies that failoverPending is set to false even when the exception
occurs.
+ *
+ * This test sets up the HA group in STANDBY state instead of
STANDBY_TO_ACTIVE.
+ * Calling setHAGroupStatusToSync from STANDBY state will throw
InvalidClusterRoleTransitionException
+ * because the transition from STANDBY directly to ACTIVE_IN_SYNC is not
allowed.
+ */
+ @Test
+ public void
testTriggerFailover_InvalidClusterRoleTransitionExceptionFromHAGroupStoreManager()
throws IOException, SQLException {
+ final String haGroupName =
"testTriggerFailoverInvalidTransitionHAGroup";
+
+ // Set up HA group store record in STANDBY state (not
STANDBY_TO_ACTIVE)
+ // This will cause setHAGroupStatusToSync to throw
InvalidClusterRoleTransitionException
+ // because transitioning from STANDBY directly to ACTIVE_IN_SYNC is
invalid
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName,
zkUrl, peerZkUrl,
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.STANDBY,
ClusterRoleRecord.ClusterRole.ACTIVE,
+ null);
+
+ TestableReplicationLogTracker fileTracker = null;
+ try {
+ fileTracker = createReplicationLogTracker(config, haGroupName,
localFs, standbyUri);
+
+ // Create a spy of ReplicationLogDiscoveryReplay to test actual
implementation
+ ReplicationLogDiscoveryReplay discovery = Mockito.spy(
+ new ReplicationLogDiscoveryReplay(fileTracker));
+
+ // Initialize the discovery
+ discovery.init();
+
+ // Set failoverPending to true
+ discovery.setFailoverPending(true);
+ assertTrue("failoverPending should be true before
triggerFailover", discovery.getFailoverPending());
+
+ // Verify initial state is STANDBY (not STANDBY_TO_ACTIVE)
+ Optional<HAGroupStoreRecord> recordBefore =
HAGroupStoreManager.getInstance(config)
+ .getHAGroupStoreRecord(haGroupName);
+ assertTrue("HA group record should exist",
recordBefore.isPresent());
+ assertEquals("Initial state should be DEGRADED_STANDBY",
+ HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY,
+ recordBefore.get().getHAGroupState());
+
+ // Call triggerFailover - should handle
InvalidClusterRoleTransitionException
+ // because transitioning from DEGRADED_STANDBY directly to
ACTIVE_IN_SYNC is invalid
+ discovery.triggerFailover();
+
+ // Verify triggerFailover was called (implicitly through spy)
+ Mockito.verify(discovery, Mockito.times(1)).triggerFailover();
+
+ // Verify failoverPending is set to false even when
InvalidClusterRoleTransitionException occurs
+ assertFalse("failoverPending should be set to false when
InvalidClusterRoleTransitionException occurs",
+ discovery.getFailoverPending());
+
+ } finally {
+ // Clean up
+ if (fileTracker != null) {
+ fileTracker.close();
+ }
+ try {
+
HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, zkUrl);
+ } catch (Exception e) {
+ LOG.warn("Failed to clean up HA group store record", e);
+ }
+ }
+ }
+
/**
* Testable implementation of ReplicationLogDiscoveryReplay for unit
testing.
* Provides dependency injection for HAGroupStoreRecord, tracks processed
rounds,
@@ -1482,9 +1976,7 @@ public class ReplicationLogDiscoveryReplayTest extends
BaseTest {
protected void triggerFailover() {
// Track calls to triggerFailover for validation
triggerFailoverCallCount++;
-
- // Simulate the real behavior: set failoverPending to false after
triggering failover
- setFailoverPending(false);
+ super.triggerFailover();
}
public int getTriggerFailoverCallCount() {
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
similarity index 99%
rename from
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
rename to
phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index 9540a8d20f..1ad72806f5 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
@@ -83,11 +84,13 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT {
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogProcessorTestIT extends ParallelStatsDisabledIT {
private static final String CREATE_TABLE_SQL_STATEMENT = "CREATE TABLE %s
(ID VARCHAR PRIMARY KEY, " +
"COL_1 VARCHAR, COL_2 VARCHAR, COL_3 BIGINT)";
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTest.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTestIT.java
similarity index 97%
rename from
phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTest.java
rename to
phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTestIT.java
index 3412966abd..37c79b8852 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTest.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayTestIT.java
@@ -34,6 +34,7 @@ import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.ClusterRoleRecord;
import org.apache.phoenix.jdbc.HAGroupStoreClient;
import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
@@ -43,10 +44,12 @@ import
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.HAGroupStoreTestUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.*;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
-public class ReplicationLogReplayTest extends BaseTest {
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogReplayTestIT extends BaseTest {
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
index f609a4aed2..0649b551b7 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
@@ -436,7 +436,7 @@ public class ReplicationLogDiscoveryTest {
@Test
public void testShouldProcessInProgressDirectory() {
// Test multiple times to verify probability-based behavior
- int totalTests = 1000;
+ int totalTests = 100000;
int trueCount = 0;
for (int i = 0; i < totalTests; i++) {
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
index 0b413104fc..f26e8436e6 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationShardDirectoryManagerTest.java
@@ -5,7 +5,7 @@
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License a
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -234,7 +234,7 @@ public class ReplicationShardDirectoryManagerTest {
// Test 7: Multiple rounds later
long multipleRoundsLater = dayStart + (300 * 1000L); // 00:05:00 (5
minutes)
- long expectedRoundStart = dayStart + (300 * 1000L); // Should be exact
+ long expectedRoundStart = dayStart + (300 * 1000L); // Should be exac
long result7 =
manager.getNearestRoundStartTimestamp(multipleRoundsLater);
assertEquals("Multiple rounds later should return exact round start",
expectedRoundStart, result7);
@@ -389,7 +389,7 @@ public class ReplicationShardDirectoryManagerTest {
@Test
public void testGetReplicationRoundFromStartTimeConsistency() {
- // Test that the same input always produces the same output
+ // Test that the same input always produces the same outpu
long timestamp = 1704110400000L; // 2024-01-01 12:00:00 UTC
ReplicationRound result1 =
manager.getReplicationRoundFromStartTime(timestamp);
@@ -543,7 +543,7 @@ public class ReplicationShardDirectoryManagerTest {
@Test
public void testGetReplicationRoundFromEndTimeConsistency() {
- // Test that the same input always produces the same output
+ // Test that the same input always produces the same outpu
long timestamp = 1704110400000L; // 2024-01-01 12:00:00 UTC
ReplicationRound result1 =
manager.getReplicationRoundFromEndTime(timestamp);
@@ -595,5 +595,161 @@ public class ReplicationShardDirectoryManagerTest {
assertEquals("Shard " + i + " should have correct name",
expectedShardName, shardPath.getName());
}
}
-}
+ @Test
+ public void testGetPreviousRound() {
+ // Use a specific day for consistent testing
+ // 2024-01-01 00:00:00 UTC = 1704067200000L
+ long dayStart = 1704067200000L; // 2024-01-01 00:00:00 UTC
+
+ // Default configuration: 60-second rounds
+ long roundDurationMs = 60 * 1000L; // 60 seconds in milliseconds
+
+ // Test 1: First round (00:00:00 to 00:01:00)
+ long firstRoundStart = dayStart; // 00:00:00
+ long firstRoundEnd = firstRoundStart + roundDurationMs; // 00:01:00
+ ReplicationRound firstRound = new ReplicationRound(firstRoundStart,
firstRoundEnd);
+ ReplicationRound previousRound = manager.getPreviousRound(firstRound);
+
+ // Previous round should end at firstRoundStart (00:00:00), which
rounds down to 00:00:00
+ // Start time should be end time - round duration = 00:00:00 - 60s =
-60s (edge case)
+ long expectedEnd = dayStart; // 00:00:00
+ long expectedStart = expectedEnd - roundDurationMs; // -60 seconds
+ assertEquals("Previous round end time should be start time of current
round rounded down",
+ expectedEnd, previousRound.getEndTime());
+ assertEquals("Previous round start time should be end time - round
duration",
+ expectedStart, previousRound.getStartTime());
+
+ // Test 2: Second round (00:01:00 to 00:02:00)
+ long secondRoundStart = dayStart + roundDurationMs; // 00:01:00
+ long secondRoundEnd = secondRoundStart + roundDurationMs; // 00:02:00
+ ReplicationRound secondRound = new ReplicationRound(secondRoundStart,
secondRoundEnd);
+ ReplicationRound previousRound2 =
manager.getPreviousRound(secondRound);
+
+ // Previous round should end at secondRoundStart (00:01:00), which
rounds down to 00:01:00
+ // Start time should be 00:01:00 - 60s = 00:00:00
+ long expectedEnd2 = secondRoundStart; // 00:01:00
+ long expectedStart2 = expectedEnd2 - roundDurationMs; // 00:00:00
+ assertEquals("Previous round end time should be start time of current
round rounded down",
+ expectedEnd2, previousRound2.getEndTime());
+ assertEquals("Previous round start time should be end time - round
duration",
+ expectedStart2, previousRound2.getStartTime());
+
+ // Test 3: Round with mid-timestamp start (00:01:30 would round to
00:01:00)
+ long midRoundStart = dayStart + (90 * 1000L); // 00:01:30 (not aligned
to round boundary)
+ long midRoundEnd = midRoundStart + roundDurationMs; // 00:02:30
+ ReplicationRound midRound = new ReplicationRound(midRoundStart,
midRoundEnd);
+ ReplicationRound previousRound3 = manager.getPreviousRound(midRound);
+
+ // Previous round should end at midRoundStart rounded down = 00:01:00
+ // Start time should be 00:01:00 - 60s = 00:00:00
+ long expectedEnd3 = dayStart + roundDurationMs; // 00:01:00 (rounded
down from 00:01:30)
+ long expectedStart3 = expectedEnd3 - roundDurationMs; // 00:00:00
+ assertEquals("Previous round end time should round down start time of
current round",
+ expectedEnd3, previousRound3.getEndTime());
+ assertEquals("Previous round start time should be end time - round
duration",
+ expectedStart3, previousRound3.getStartTime());
+
+ // Test 4: Multiple rounds later (00:05:00 to 00:06:00)
+ long multipleRoundsStart = dayStart + (5 * roundDurationMs); //
00:05:00
+ long multipleRoundsEnd = multipleRoundsStart + roundDurationMs; //
00:06:00
+ ReplicationRound multipleRounds = new
ReplicationRound(multipleRoundsStart, multipleRoundsEnd);
+ ReplicationRound previousRound4 =
manager.getPreviousRound(multipleRounds);
+
+ // Previous round should end at multipleRoundsStart = 00:05:00
+ // Start time should be 00:05:00 - 60s = 00:04:00
+ long expectedEnd4 = multipleRoundsStart; // 00:05:00
+ long expectedStart4 = expectedEnd4 - roundDurationMs; // 00:04:00
+ assertEquals("Previous round end time should be start time of current
round",
+ expectedEnd4, previousRound4.getEndTime());
+ assertEquals("Previous round start time should be end time - round
duration",
+ expectedStart4, previousRound4.getStartTime());
+
+ // Test 5: Verify round duration is consistent
+ long previousRoundDuration = previousRound4.getEndTime() -
previousRound4.getStartTime();
+ assertEquals("Previous round duration should be 60 seconds",
roundDurationMs, previousRoundDuration);
+ }
+
+ @Test
+ public void testGetNextRound() {
+ // Use a specific day for consistent testing
+ // 2024-01-01 00:00:00 UTC = 1704067200000L
+ long dayStart = 1704067200000L; // 2024-01-01 00:00:00 UTC
+
+ // Default configuration: 60-second rounds
+ long roundDurationMs = 60 * 1000L; // 60 seconds in milliseconds
+
+ // Test 1: First round (00:00:00 to 00:01:00)
+ long firstRoundStart = dayStart; // 00:00:00
+ long firstRoundEnd = firstRoundStart + roundDurationMs; // 00:01:00
+ ReplicationRound firstRound = new ReplicationRound(firstRoundStart,
firstRoundEnd);
+ ReplicationRound nextRound = manager.getNextRound(firstRound);
+
+ // Next round should start at firstRoundEnd (00:01:00), which rounds
down to 00:01:00
+ // End time should be 00:01:00 + 60s = 00:02:00
+ long expectedStart = firstRoundEnd; // 00:01:00
+ long expectedEnd = expectedStart + roundDurationMs; // 00:02:00
+ assertEquals("Next round start time should be end time of current
round rounded down",
+ expectedStart, nextRound.getStartTime());
+ assertEquals("Next round end time should be start time + round
duration",
+ expectedEnd, nextRound.getEndTime());
+
+ // Test 2: Second round (00:01:00 to 00:02:00)
+ long secondRoundStart = dayStart + roundDurationMs; // 00:01:00
+ long secondRoundEnd = secondRoundStart + roundDurationMs; // 00:02:00
+ ReplicationRound secondRound = new ReplicationRound(secondRoundStart,
secondRoundEnd);
+ ReplicationRound nextRound2 = manager.getNextRound(secondRound);
+
+ // Next round should start at secondRoundEnd (00:02:00), which rounds
down to 00:02:00
+ // End time should be 00:02:00 + 60s = 00:03:00
+ long expectedStart2 = secondRoundEnd; // 00:02:00
+ long expectedEnd2 = expectedStart2 + roundDurationMs; // 00:03:00
+ assertEquals("Next round start time should be end time of current
round rounded down",
+ expectedStart2, nextRound2.getStartTime());
+ assertEquals("Next round end time should be start time + round
duration",
+ expectedEnd2, nextRound2.getEndTime());
+
+ // Test 3: Round with mid-timestamp end (00:02:30 would round to
00:02:00)
+ long midRoundStart = dayStart + (120 * 1000L); // 00:02:00
+ long midRoundEnd = dayStart + (150 * 1000L); // 00:02:30 (not aligned
to round boundary)
+ ReplicationRound midRound = new ReplicationRound(midRoundStart,
midRoundEnd);
+ ReplicationRound nextRound3 = manager.getNextRound(midRound);
+
+ // Next round should start at midRoundEnd rounded down = 00:02:00
+ // End time should be 00:02:00 + 60s = 00:03:00
+ long expectedStart3 = dayStart + (120 * 1000L); // 00:02:00 (rounded
down from 00:02:30)
+ long expectedEnd3 = expectedStart3 + roundDurationMs; // 00:03:00
+ assertEquals("Next round start time should round down end time of
current round",
+ expectedStart3, nextRound3.getStartTime());
+ assertEquals("Next round end time should be start time + round
duration",
+ expectedEnd3, nextRound3.getEndTime());
+
+ // Test 4: Multiple rounds later (00:05:00 to 00:06:00)
+ long multipleRoundsStart = dayStart + (5 * roundDurationMs); //
00:05:00
+ long multipleRoundsEnd = multipleRoundsStart + roundDurationMs; //
00:06:00
+ ReplicationRound multipleRounds = new
ReplicationRound(multipleRoundsStart, multipleRoundsEnd);
+ ReplicationRound nextRound4 = manager.getNextRound(multipleRounds);
+
+ // Next round should start at multipleRoundsEnd = 00:06:00
+ // End time should be 00:06:00 + 60s = 00:07:00
+ long expectedStart4 = multipleRoundsEnd; // 00:06:00
+ long expectedEnd4 = expectedStart4 + roundDurationMs; // 00:07:00
+ assertEquals("Next round start time should be end time of current
round",
+ expectedStart4, nextRound4.getStartTime());
+ assertEquals("Next round end time should be start time + round
duration",
+ expectedEnd4, nextRound4.getEndTime());
+
+ // Test 5: Verify round duration is consistent
+ long nextRoundDuration = nextRound4.getEndTime() -
nextRound4.getStartTime();
+ assertEquals("Next round duration should be 60 seconds",
roundDurationMs, nextRoundDuration);
+
+ // Test 6: Verify continuity - next round of previous round should
equal original round
+ ReplicationRound originalRound = new ReplicationRound(dayStart + (3 *
roundDurationMs), dayStart + (4 * roundDurationMs)); // 00:03:00 to 00:04:00
+ ReplicationRound prevRound = manager.getPreviousRound(originalRound);
+ ReplicationRound nextOfPrev = manager.getNextRound(prevRound);
+ assertEquals("Next round of previous round should equal original round
start time",
+ originalRound.getStartTime(), nextOfPrev.getStartTime());
+ assertEquals("Next round of previous round should equal original round
end time",
+ originalRound.getEndTime(), nextOfPrev.getEndTime());
+ }
+}