This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this
push:
new 44cfa7e674 Phoenix-7672 Adding API to get new files within range of
rounds (Addendum) (#2323)
44cfa7e674 is described below
commit 44cfa7e67480c2425b41044c828870c786f2cfb1
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Mon Dec 1 18:46:17 2025 +0530
Phoenix-7672 Adding API to get new files within range of rounds (Addendum)
(#2323)
---
.../phoenix/replication/ReplicationLogTracker.java | 34 ++-
.../reader/ReplicationLogDiscoveryReplay.java | 68 +++--
.../ReplicationLogDiscoveryReplayTestIT.java | 26 +-
.../replication/ReplicationLogTrackerTest.java | 277 +++++++++++++++++++++
4 files changed, 377 insertions(+), 28 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
index cdb078dce3..9e9f20adf0 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
@@ -98,8 +98,9 @@ public class ReplicationLogTracker {
* exist.
*/
public void init() throws IOException {
- this.inProgressDirPath = new
Path(getReplicationShardDirectoryManager().getRootDirectoryPath().getParent(),
- getInProgressLogSubDirectoryName());
+ this.inProgressDirPath = new Path(
+
getReplicationShardDirectoryManager().getRootDirectoryPath().getParent(),
+ getInProgressLogSubDirectoryName());
createDirectoryIfNotExists(inProgressDirPath);
}
@@ -151,6 +152,35 @@ public class ReplicationLogTracker {
return filesInRound;
}
+ /**
+ * Retrieves new replication log files that belong to replication rounds
from startRound to
+ * endRound (inclusive). Iterates through all rounds in the range and
collects valid log files
+ * from each round's shard directory.
+ *
+ * @param startRound - The starting replication round (inclusive)
+ * @param endRound - The ending replication round (inclusive)
+ * @return List of valid log file paths from startRound to endRound, empty
list if
+ * startRound > endRound
+ * @throws IOException if there's an error accessing the file system
+ */
+ public List<Path> getNewFiles(ReplicationRound startRound,
ReplicationRound endRound)
+ throws IOException {
+ List<Path> files = new ArrayList<>();
+ // Early return if startRound is after endRound (invalid range)
+ if (startRound.getStartTime() > endRound.getStartTime()) {
+ return files;
+ }
+ // Iterate through all rounds from startRound to endRound (exclusive
of endRound)
+ ReplicationRound firstRound = startRound;
+ while (!firstRound.equals(endRound)) {
+ files.addAll(getNewFilesForRound(firstRound));
+ firstRound =
replicationShardDirectoryManager.getNextRound(firstRound);
+ }
+ // Add the files for the endRound (inclusive)
+ files.addAll(getNewFilesForRound(endRound));
+ return files;
+ }
+
/**
* Retrieves all valid log files currently in the in-progress directory.
* @return List of valid log file paths in the in-progress directory,
empty list if directory
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index faf078fa7f..3911785a4d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.jdbc.HAGroupStoreRecord;
import org.apache.phoenix.replication.ReplicationLogDiscovery;
import org.apache.phoenix.replication.ReplicationLogTracker;
import org.apache.phoenix.replication.ReplicationRound;
+import org.apache.phoenix.replication.ReplicationShardDirectoryManager;
import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery;
import
org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl;
import org.slf4j.Logger;
@@ -137,23 +138,28 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
LOG.info("Initializing ReplicationLogDiscoveryReplay for haGroup: {}",
haGroupName);
- HAGroupStateListener degradedListener = (groupName, fromState,
toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
- if (clusterType == ClusterType.LOCAL &&
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(toState)) {
+ HAGroupStateListener degradedListener = (groupName, fromState, toState,
+ modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
+ if (clusterType == ClusterType.LOCAL
+ &&
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(toState)) {
replicationReplayState.set(ReplicationReplayState.DEGRADED);
LOG.info("Cluster degraded detected for {}.
replicationReplayState={}",
haGroupName, ReplicationReplayState.DEGRADED);
}
};
- HAGroupStateListener recoveryListener = (groupName, fromState,
toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
- if (clusterType == ClusterType.LOCAL &&
HAGroupStoreRecord.HAGroupState.STANDBY.equals(toState)) {
+ HAGroupStateListener recoveryListener = (groupName, fromState, toState,
+ modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
+ if (clusterType == ClusterType.LOCAL
+ &&
HAGroupStoreRecord.HAGroupState.STANDBY.equals(toState)) {
replicationReplayState.set(ReplicationReplayState.SYNCED_RECOVERY);
LOG.info("Cluster recovered detected for {}.
replicationReplayState={}",
haGroupName, getReplicationReplayState());
}
};
- HAGroupStateListener triggerFailoverListner = (groupName, fromState,
toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
+ HAGroupStateListener triggerFailoverListner = (groupName, fromState,
toState,
+ modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
if (clusterType == ClusterType.LOCAL
&&
HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE.equals(toState)) {
failoverPending.set(true);
@@ -163,7 +169,8 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
}
};
- HAGroupStateListener abortFailoverListner = (groupName, fromState,
toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
+ HAGroupStateListener abortFailoverListner = (groupName, fromState,
toState,
+ modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
if (clusterType == ClusterType.LOCAL
&&
HAGroupStoreRecord.HAGroupState.ABORT_TO_STANDBY.equals(toState)) {
failoverPending.set(false);
@@ -228,7 +235,8 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
HAGroupStoreRecord haGroupStoreRecord = getHAGroupRecord();
LOG.info("Found HA Group state during initialization as {} for
haGroup: {}",
haGroupStoreRecord.getHAGroupState(), haGroupName);
- if
(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(haGroupStoreRecord.getHAGroupState()))
{
+ if (HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY
+ .equals(haGroupStoreRecord.getHAGroupState())) {
replicationReplayState.compareAndSet(ReplicationReplayState.NOT_INITIALIZED,
ReplicationReplayState.DEGRADED);
long minimumTimestampFromFiles =
EnvironmentEdgeManager.currentTime();
@@ -474,19 +482,51 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
* Failover is safe to trigger when all of the following conditions are
met:
* 1. A failover has been requested (failoverPending is true)
* 2. No files are currently in the in-progress directory
- * 3. No new files exist for ongoing round
+ * 3. No new files exist from the next round to process up to the current
timestamp round
*
- * These conditions ensure all replication logs have been processed before
transitioning
- * the cluster from STANDBY to ACTIVE state.
+ * The third condition checks for new files in the range from
nextRoundToProcess (derived from
+ * getLastRoundProcessed()) to currentTimestampRound (derived from current
time). This ensures
+ * all replication logs up to the current time have been processed before
transitioning
+ * the cluster
+ * from STANDBY to ACTIVE state.
*
* @return true if all conditions are met and failover should be
triggered, false otherwise
* @throws IOException if there's an error checking file status
*/
protected boolean shouldTriggerFailover() throws IOException {
- return failoverPending.get() &&
replicationLogTracker.getInProgressFiles().isEmpty()
- &&
replicationLogTracker.getNewFilesForRound(replicationLogTracker
- .getReplicationShardDirectoryManager()
- .getNextRound(getLastRoundProcessed())).isEmpty();
+ LOG.debug("Checking if failover should be triggered.
failoverPending={}", failoverPending);
+ // Check if failover has been requested
+ if (!failoverPending.get()) {
+ LOG.debug("Failover not triggered. failoverPending is false.");
+ return false;
+ }
+ // Check if in-progress directory is empty
+ boolean isInProgressDirectoryEmpty =
replicationLogTracker.getInProgressFiles().isEmpty();
+ if (!isInProgressDirectoryEmpty) {
+ LOG.debug("Failover not triggered. In progress directory is not
empty.");
+ return false;
+ }
+ // Check if there are any new files from next round to current
timestamp round
+ ReplicationShardDirectoryManager replicationShardDirectoryManager =
+ replicationLogTracker.getReplicationShardDirectoryManager();
+ ReplicationRound nextRoundToProcess =
+
replicationShardDirectoryManager.getNextRound(getLastRoundProcessed());
+ ReplicationRound currentTimestampRound =
+
replicationShardDirectoryManager.getReplicationRoundFromStartTime(
+ EnvironmentEdgeManager.currentTime());
+ LOG.debug("Checking the new files from next round {} to current
timestamp round {}.",
+ nextRoundToProcess, currentTimestampRound);
+ boolean isInDirectoryEmpty =
replicationLogTracker.getNewFiles(nextRoundToProcess,
+ currentTimestampRound).isEmpty();
+
+ if (!isInDirectoryEmpty) {
+ LOG.debug("Failover not triggered. New files exist from next round
to current "
+ + "timestamp round.");
+ return false;
+ }
+
+ LOG.info("Failover can be triggered.");
+ return true;
}
protected void triggerFailover() {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index f253bcbb07..90b3a0fdc9 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -1650,7 +1650,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
/**
* Tests the shouldTriggerFailover method with various combinations of
failoverPending,
- * in-progress files, and new files for next round.
+ * in-progress files, and new files from next round to current timestamp
round.
*/
@Test
public void testShouldTriggerFailover() throws IOException {
@@ -1668,12 +1668,14 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
try {
// Create test rounds
ReplicationRound testRound = new ReplicationRound(1704153600000L,
1704153660000L);
- ReplicationRound nextRound =
tracker.getReplicationShardDirectoryManager().getNextRound(testRound);
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ ReplicationRound nextRoundToProcess =
shardManager.getNextRound(testRound);
+ ReplicationRound currentTimestampRound =
shardManager.getReplicationRoundFromStartTime(currentTime);
// Test Case 1: All conditions true - should return true
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+ when(tracker.getNewFiles(nextRoundToProcess,
currentTimestampRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
@@ -1686,7 +1688,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Test Case 2: failoverPending is false - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+ when(tracker.getNewFiles(nextRoundToProcess,
currentTimestampRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
@@ -1699,7 +1701,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Test Case 3: in-progress files not empty - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test.plog")));
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+ when(tracker.getNewFiles(nextRoundToProcess,
currentTimestampRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
@@ -1709,23 +1711,23 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
discovery.shouldTriggerFailover());
}
- // Test Case 4: new files exist for next round - should return
false
+ // Test Case 4: new files exist from next round to current
timestamp round - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
Path("test.plog")));
+ when(tracker.getNewFiles(nextRoundToProcess,
currentTimestampRound)).thenReturn(Collections.singletonList(new
Path("test.plog")));
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
discovery.setFailoverPending(true);
- assertFalse("Should not trigger failover when new files exist
for next round",
+ assertFalse("Should not trigger failover when new files exist
from next round to current timestamp round",
discovery.shouldTriggerFailover());
}
// Test Case 5: failoverPending false AND in-progress files not
empty - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test.plog")));
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+ when(tracker.getNewFiles(nextRoundToProcess,
currentTimestampRound)).thenReturn(Collections.emptyList());
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
@@ -1738,7 +1740,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Test Case 6: failoverPending false AND new files exist - should
return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
Path("test.plog")));
+ when(tracker.getNewFiles(nextRoundToProcess,
currentTimestampRound)).thenReturn(Collections.singletonList(new
Path("test.plog")));
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
@@ -1751,7 +1753,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Test Case 7: in-progress files not empty AND new files exist -
should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test1.plog")));
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
Path("test2.plog")));
+ when(tracker.getNewFiles(nextRoundToProcess,
currentTimestampRound)).thenReturn(Collections.singletonList(new
Path("test2.plog")));
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
@@ -1764,7 +1766,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Test Case 8: All conditions false - should return false
{
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new
Path("test.plog")));
-
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
Path("test2.plog")));
+ when(tracker.getNewFiles(nextRoundToProcess,
currentTimestampRound)).thenReturn(Collections.singletonList(new
Path("test2.plog")));
TestableReplicationLogDiscoveryReplay discovery = new
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
discovery.setLastRoundInSync(testRound);
discovery.setLastRoundProcessed(testRound);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
index e8bc60e017..ae7e9cb3f0 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
@@ -1283,6 +1283,283 @@ public class ReplicationLogTrackerTest {
assertFalse("Should not contain invalidFile3",
resultFilenames.contains(invalidFile3.getName()));
}
+ @Test
+ public void testGetNewFilesWithStartAndEndRound() throws IOException {
+ // Initialize tracker
+ tracker.init();
+
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ long roundDurationMs =
shardManager.getReplicationRoundDurationSeconds() * 1000L;
+
+ // Create start round (60 seconds duration)
+ long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+ long startRoundEndTime = startRoundStartTime + roundDurationMs;
+ ReplicationRound startRound = new
ReplicationRound(startRoundStartTime, startRoundEndTime);
+
+ // Create middle round
+ long middleRoundStartTime = startRoundEndTime;
+ long middleRoundEndTime = middleRoundStartTime + roundDurationMs;
+ ReplicationRound middleRound = new
ReplicationRound(middleRoundStartTime, middleRoundEndTime);
+
+ // Create end round
+ long endRoundStartTime = middleRoundEndTime;
+ long endRoundEndTime = endRoundStartTime + roundDurationMs;
+ ReplicationRound endRound = new ReplicationRound(endRoundStartTime,
endRoundEndTime);
+
+ // Get shard directories for each round
+ Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+ Path middleRoundShardDir = shardManager.getShardDirectory(middleRound);
+ Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+ // Create shard directories
+ localFs.mkdirs(startRoundShardDir);
+ localFs.mkdirs(middleRoundShardDir);
+ localFs.mkdirs(endRoundShardDir);
+
+ // Create files in start round
+ Path startRoundFile1 = new Path(startRoundShardDir,
startRoundStartTime + "_rs1.plog");
+ Path startRoundFile2 = new Path(startRoundShardDir,
startRoundStartTime + 30000 + "_rs2.plog");
+
+ // Create files in middle round
+ Path middleRoundFile1 = new Path(middleRoundShardDir,
middleRoundStartTime + "_rs3.plog");
+ Path middleRoundFile2 = new Path(middleRoundShardDir,
middleRoundStartTime + 30000 + "_rs4.plog");
+
+ // Create files in end round
+ Path endRoundFile1 = new Path(endRoundShardDir, endRoundStartTime +
"_rs5.plog");
+ Path endRoundFile2 = new Path(endRoundShardDir, endRoundStartTime +
30000 + "_rs6.plog");
+
+ // Create files outside the range (before start round)
+ ReplicationRound beforeStartRound =
shardManager.getPreviousRound(startRound);
+ Path beforeStartRoundShardDir =
shardManager.getShardDirectory(beforeStartRound);
+ localFs.mkdirs(beforeStartRoundShardDir);
+ Path beforeStartRoundFile = new Path(beforeStartRoundShardDir,
beforeStartRound.getStartTime() + "_rs0.plog");
+
+ // Create files outside the range (after end round)
+ ReplicationRound afterEndRound = shardManager.getNextRound(endRound);
+ Path afterEndRoundShardDir =
shardManager.getShardDirectory(afterEndRound);
+ localFs.mkdirs(afterEndRoundShardDir);
+ Path afterEndRoundFile = new Path(afterEndRoundShardDir,
afterEndRound.getStartTime() + "_rs7.plog");
+
+ // Create all files
+ localFs.create(startRoundFile1, true).close();
+ localFs.create(startRoundFile2, true).close();
+ localFs.create(middleRoundFile1, true).close();
+ localFs.create(middleRoundFile2, true).close();
+ localFs.create(endRoundFile1, true).close();
+ localFs.create(endRoundFile2, true).close();
+ localFs.create(beforeStartRoundFile, true).close();
+ localFs.create(afterEndRoundFile, true).close();
+
+ // Call getNewFiles with startRound and endRound
+ List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+ // Verify file system operations
+ // Should call getNewFilesForRound for each round (start, middle, end)
+ // Each call to getNewFilesForRound calls exists() and listStatus() on
shard directories
+ // Note: init() already called exists() on in-progress directory
+ Mockito.verify(mockFs,
times(1)).exists(Mockito.eq(startRoundShardDir));
+ Mockito.verify(mockFs,
times(1)).exists(Mockito.eq(middleRoundShardDir));
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+ Mockito.verify(mockFs, times(3)).listStatus(Mockito.any(Path.class));
+
+ // Prepare expected set of file paths (should include files from
start, middle, and end rounds)
+ Set<String> expectedPaths = new HashSet<>();
+ expectedPaths.add(startRoundFile1.toString());
+ expectedPaths.add(startRoundFile2.toString());
+ expectedPaths.add(middleRoundFile1.toString());
+ expectedPaths.add(middleRoundFile2.toString());
+ expectedPaths.add(endRoundFile1.toString());
+ expectedPaths.add(endRoundFile2.toString());
+
+ // Create actual set of paths
+ Set<String> actualPaths = result.stream().map(path ->
path.toUri().getPath()).collect(Collectors.toSet());
+
+ // Verify all files from start to end rounds are returned
+ assertEquals("Should return exactly 6 files from start to end rounds",
expectedPaths.size(), actualPaths.size());
+ assertEquals("File paths do not match", expectedPaths, actualPaths);
+
+ // Verify files outside the range are not included
+ assertFalse("Should not contain file from before start round",
actualPaths.contains(beforeStartRoundFile.toString()));
+ assertFalse("Should not contain file from after end round",
actualPaths.contains(afterEndRoundFile.toString()));
+ }
+
+ @Test
+ public void testGetNewFilesWithSameStartAndEndRound() throws IOException {
+ // Initialize tracker
+ tracker.init();
+
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ long roundDurationMs =
shardManager.getReplicationRoundDurationSeconds() * 1000L;
+
+ // Create a single round
+ long roundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+ long roundEndTime = roundStartTime + roundDurationMs;
+ ReplicationRound round = new ReplicationRound(roundStartTime,
roundEndTime);
+
+ // Get shard directory for this round
+ Path roundShardDir = shardManager.getShardDirectory(round);
+ localFs.mkdirs(roundShardDir);
+
+ // Create files in the round
+ Path file1 = new Path(roundShardDir, roundStartTime + "_rs1.plog");
+ Path file2 = new Path(roundShardDir, roundStartTime + 30000 +
"_rs2.plog");
+ Path file3 = new Path(roundShardDir, roundStartTime + 50000 +
"_rs3.plog");
+
+ // Create files
+ localFs.create(file1, true).close();
+ localFs.create(file2, true).close();
+ localFs.create(file3, true).close();
+
+ // Call getNewFiles with same start and end round
+ List<Path> result = tracker.getNewFiles(round, round);
+
+ // Verify file system operations
+ // Should call getNewFilesForRound once for the round
+ // Note: init() already called exists() on in-progress directory
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(roundShardDir));
+ Mockito.verify(mockFs, times(1)).listStatus(Mockito.any(Path.class));
+
+ // Prepare expected set of file paths
+ Set<String> expectedPaths = new HashSet<>();
+ expectedPaths.add(file1.toString());
+ expectedPaths.add(file2.toString());
+ expectedPaths.add(file3.toString());
+
+ // Create actual set of paths
+ Set<String> actualPaths = result.stream().map(path ->
path.toUri().getPath()).collect(Collectors.toSet());
+
+ // Verify all files from the round are returned
+ assertEquals("Should return exactly 3 files from the round",
expectedPaths.size(), actualPaths.size());
+ assertEquals("File paths do not match", expectedPaths, actualPaths);
+ }
+
+ @Test
+ public void testGetNewFilesWithInvalidRange() throws IOException {
+ // Initialize tracker
+ tracker.init();
+
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ long roundDurationMs =
shardManager.getReplicationRoundDurationSeconds() * 1000L;
+
+ // Create end round (earlier time)
+ long endRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+ long endRoundEndTime = endRoundStartTime + roundDurationMs;
+ ReplicationRound endRound = new ReplicationRound(endRoundStartTime,
endRoundEndTime);
+
+ // Create start round (later time) - invalid: start > end
+ long startRoundStartTime = endRoundEndTime;
+ long startRoundEndTime = startRoundStartTime + roundDurationMs;
+ ReplicationRound startRound = new
ReplicationRound(startRoundStartTime, startRoundEndTime);
+
+ // Get shard directories
+ Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+ Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+ // Create shard directories
+ localFs.mkdirs(startRoundShardDir);
+ localFs.mkdirs(endRoundShardDir);
+
+ // Create files in both rounds
+ Path startRoundFile = new Path(startRoundShardDir, startRoundStartTime
+ "_rs1.plog");
+ Path endRoundFile = new Path(endRoundShardDir, endRoundStartTime +
"_rs2.plog");
+
+ localFs.create(startRoundFile, true).close();
+ localFs.create(endRoundFile, true).close();
+
+ // Call getNewFiles with invalid range (startRound.getStartTime() >
endRound.getStartTime())
+ List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+ // Verify empty list is returned when startRound.getStartTime() >
endRound.getStartTime()
+ assertTrue("Should return empty list for invalid range",
result.isEmpty());
+
+ // Verify no file system operations were performed on shard
directories (early return)
+ // Note: init() already called exists() and mkdirs() on in-progress
directory
+ Mockito.verify(mockFs,
times(0)).exists(Mockito.eq(startRoundShardDir));
+ Mockito.verify(mockFs, times(0)).exists(Mockito.eq(endRoundShardDir));
+ Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+ }
+
+ @Test
+ public void testGetNewFilesWithEmptyRounds() throws IOException {
+ // Initialize tracker
+ tracker.init();
+
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ long roundDurationMs =
shardManager.getReplicationRoundDurationSeconds() * 1000L;
+
+ // Create start round
+ long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+ long startRoundEndTime = startRoundStartTime + roundDurationMs;
+ ReplicationRound startRound = new
ReplicationRound(startRoundStartTime, startRoundEndTime);
+
+ // Create end round
+ long endRoundStartTime = startRoundEndTime;
+ long endRoundEndTime = endRoundStartTime + roundDurationMs;
+ ReplicationRound endRound = new ReplicationRound(endRoundStartTime,
endRoundEndTime);
+
+ // Get shard directories
+ Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+ Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+ // Create shard directories but leave them empty
+ localFs.mkdirs(startRoundShardDir);
+ localFs.mkdirs(endRoundShardDir);
+
+ // Call getNewFiles with empty rounds
+ List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+ // Verify file system operations
+ // Should call getNewFilesForRound for each round (start and end)
+ // Note: init() already called exists() on in-progress directory
+ Mockito.verify(mockFs,
times(1)).exists(Mockito.eq(startRoundShardDir));
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+ Mockito.verify(mockFs, times(2)).listStatus(Mockito.any(Path.class));
+
+ // Verify empty list is returned
+ assertTrue("Should return empty list for empty rounds",
result.isEmpty());
+ }
+
+ @Test
+ public void testGetNewFilesWithNonExistentRounds() throws IOException {
+ // Initialize tracker
+ tracker.init();
+
+ ReplicationShardDirectoryManager shardManager =
tracker.getReplicationShardDirectoryManager();
+ long roundDurationMs =
shardManager.getReplicationRoundDurationSeconds() * 1000L;
+
+ // Create start round
+ long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+ long startRoundEndTime = startRoundStartTime + roundDurationMs;
+ ReplicationRound startRound = new
ReplicationRound(startRoundStartTime, startRoundEndTime);
+
+ // Create end round
+ long endRoundStartTime = startRoundEndTime;
+ long endRoundEndTime = endRoundStartTime + roundDurationMs;
+ ReplicationRound endRound = new ReplicationRound(endRoundStartTime,
endRoundEndTime);
+
+ // Get shard directories
+ Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+ Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+ // Assert that shard directories do not exist
+ assertFalse("Start round shard directory should not exist",
localFs.exists(startRoundShardDir));
+ assertFalse("End round shard directory should not exist",
localFs.exists(endRoundShardDir));
+
+ // Call getNewFiles with non-existent rounds
+ List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+ // Verify file system operations
+ // Should call exists() for each round (start and end)
+ // Note: init() already called exists() on in-progress directory
+ Mockito.verify(mockFs,
times(1)).exists(Mockito.eq(startRoundShardDir));
+ Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+ // listStatus() should not be called when directories don't exist
+ Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+
+ // Verify empty list is returned
+ assertTrue("Should return empty list for non-existent rounds",
result.isEmpty());
+ }
+
private int countDirectories(FileSystem fs, Path path) throws IOException {
if (!fs.exists(path)) {
return 0;