This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this 
push:
     new 44cfa7e674 Phoenix-7672 Adding API to get new files within range of 
rounds (Addendum) (#2323)
44cfa7e674 is described below

commit 44cfa7e67480c2425b41044c828870c786f2cfb1
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Mon Dec 1 18:46:17 2025 +0530

    Phoenix-7672 Adding API to get new files within range of rounds (Addendum) 
(#2323)
---
 .../phoenix/replication/ReplicationLogTracker.java |  34 ++-
 .../reader/ReplicationLogDiscoveryReplay.java      |  68 +++--
 .../ReplicationLogDiscoveryReplayTestIT.java       |  26 +-
 .../replication/ReplicationLogTrackerTest.java     | 277 +++++++++++++++++++++
 4 files changed, 377 insertions(+), 28 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
index cdb078dce3..9e9f20adf0 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java
@@ -98,8 +98,9 @@ public class ReplicationLogTracker {
      * exist.
      */
     public void init() throws IOException {
-        this.inProgressDirPath = new 
Path(getReplicationShardDirectoryManager().getRootDirectoryPath().getParent(),
-            getInProgressLogSubDirectoryName());
+        this.inProgressDirPath = new Path(
+                
getReplicationShardDirectoryManager().getRootDirectoryPath().getParent(),
+                getInProgressLogSubDirectoryName());
         createDirectoryIfNotExists(inProgressDirPath);
     }
 
@@ -151,6 +152,35 @@ public class ReplicationLogTracker {
         return filesInRound;
     }
 
+    /**
+     * Retrieves new replication log files that belong to replication rounds 
from startRound to
+     * endRound (inclusive). Iterates through all rounds in the range and 
collects valid log files
+     * from each round's shard directory.
+     *
+     * @param startRound - The starting replication round (inclusive)
+     * @param endRound - The ending replication round (inclusive)
+     * @return List of valid log file paths from startRound to endRound, empty 
list if
+     *         startRound > endRound
+     * @throws IOException if there's an error accessing the file system
+     */
+    public List<Path> getNewFiles(ReplicationRound startRound, 
ReplicationRound endRound)
+            throws IOException {
+        List<Path> files = new ArrayList<>();
+        // Early return if startRound is after endRound (invalid range)
+        if (startRound.getStartTime() > endRound.getStartTime()) {
+            return files;
+        }
+        // Iterate through all rounds from startRound to endRound (exclusive 
of endRound)
+        ReplicationRound firstRound = startRound;
+        while (!firstRound.equals(endRound)) {
+            files.addAll(getNewFilesForRound(firstRound));
+            firstRound = 
replicationShardDirectoryManager.getNextRound(firstRound);
+        }
+        // Add the files for the endRound (inclusive)
+        files.addAll(getNewFilesForRound(endRound));
+        return files;
+    }
+
     /**
      * Retrieves all valid log files currently in the in-progress directory.
      * @return List of valid log file paths in the in-progress directory, 
empty list if directory
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index faf078fa7f..3911785a4d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.jdbc.HAGroupStoreRecord;
 import org.apache.phoenix.replication.ReplicationLogDiscovery;
 import org.apache.phoenix.replication.ReplicationLogTracker;
 import org.apache.phoenix.replication.ReplicationRound;
+import org.apache.phoenix.replication.ReplicationShardDirectoryManager;
 import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery;
 import 
org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl;
 import org.slf4j.Logger;
@@ -137,23 +138,28 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
 
         LOG.info("Initializing ReplicationLogDiscoveryReplay for haGroup: {}", 
haGroupName);
 
-        HAGroupStateListener degradedListener = (groupName, fromState, 
toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
-            if (clusterType == ClusterType.LOCAL && 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(toState)) {
+        HAGroupStateListener degradedListener = (groupName, fromState, toState,
+                modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
+            if (clusterType == ClusterType.LOCAL
+                    && 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(toState)) {
                 replicationReplayState.set(ReplicationReplayState.DEGRADED);
                 LOG.info("Cluster degraded detected for {}. 
replicationReplayState={}",
                         haGroupName, ReplicationReplayState.DEGRADED);
             }
         };
 
-        HAGroupStateListener recoveryListener = (groupName, fromState, 
toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
-            if (clusterType == ClusterType.LOCAL && 
HAGroupStoreRecord.HAGroupState.STANDBY.equals(toState)) {
+        HAGroupStateListener recoveryListener = (groupName, fromState, toState,
+                modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
+            if (clusterType == ClusterType.LOCAL
+                    && 
HAGroupStoreRecord.HAGroupState.STANDBY.equals(toState)) {
                 
replicationReplayState.set(ReplicationReplayState.SYNCED_RECOVERY);
                 LOG.info("Cluster recovered detected for {}. 
replicationReplayState={}",
                         haGroupName, getReplicationReplayState());
             }
         };
 
-        HAGroupStateListener triggerFailoverListner = (groupName, fromState, 
toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
+        HAGroupStateListener triggerFailoverListner = (groupName, fromState, 
toState,
+                modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
             if (clusterType == ClusterType.LOCAL
                     && 
HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE.equals(toState)) {
                 failoverPending.set(true);
@@ -163,7 +169,8 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
             }
         };
 
-        HAGroupStateListener abortFailoverListner = (groupName, fromState, 
toState, modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
+        HAGroupStateListener abortFailoverListner = (groupName, fromState, 
toState,
+                modifiedTime, clusterType, lastSyncStateTimeInMs) -> {
             if (clusterType == ClusterType.LOCAL
                     && 
HAGroupStoreRecord.HAGroupState.ABORT_TO_STANDBY.equals(toState)) {
                 failoverPending.set(false);
@@ -228,7 +235,8 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
         HAGroupStoreRecord haGroupStoreRecord = getHAGroupRecord();
         LOG.info("Found HA Group state during initialization as {} for 
haGroup: {}",
                 haGroupStoreRecord.getHAGroupState(), haGroupName);
-        if 
(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY.equals(haGroupStoreRecord.getHAGroupState()))
 {
+        if (HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY
+                .equals(haGroupStoreRecord.getHAGroupState())) {
             
replicationReplayState.compareAndSet(ReplicationReplayState.NOT_INITIALIZED,
                     ReplicationReplayState.DEGRADED);
             long minimumTimestampFromFiles = 
EnvironmentEdgeManager.currentTime();
@@ -474,19 +482,51 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
      * Failover is safe to trigger when all of the following conditions are 
met:
      * 1. A failover has been requested (failoverPending is true)
      * 2. No files are currently in the in-progress directory
-     * 3. No new files exist for ongoing round
+     * 3. No new files exist from the next round to process up to the current 
timestamp round
      *
-     * These conditions ensure all replication logs have been processed before 
transitioning
-     * the cluster from STANDBY to ACTIVE state.
+     * The third condition checks for new files in the range from 
nextRoundToProcess (derived from
+     * getLastRoundProcessed()) to currentTimestampRound (derived from current 
time). This ensures
+     * all replication logs up to the current time have been processed before 
transitioning
+     * the cluster
+     * from STANDBY to ACTIVE state.
      *
      * @return true if all conditions are met and failover should be 
triggered, false otherwise
      * @throws IOException if there's an error checking file status
      */
     protected boolean shouldTriggerFailover() throws IOException {
-        return failoverPending.get() && 
replicationLogTracker.getInProgressFiles().isEmpty()
-                && 
replicationLogTracker.getNewFilesForRound(replicationLogTracker
-                .getReplicationShardDirectoryManager()
-                .getNextRound(getLastRoundProcessed())).isEmpty();
+        LOG.debug("Checking if failover should be triggered. 
failoverPending={}", failoverPending);
+        // Check if failover has been requested
+        if (!failoverPending.get()) {
+            LOG.debug("Failover not triggered. failoverPending is false.");
+            return false;
+        }
+        // Check if in-progress directory is empty
+        boolean isInProgressDirectoryEmpty = 
replicationLogTracker.getInProgressFiles().isEmpty();
+        if (!isInProgressDirectoryEmpty) {
+            LOG.debug("Failover not triggered. In progress directory is not 
empty.");
+            return false;
+        }
+        // Check if there are any new files from next round to current 
timestamp round
+        ReplicationShardDirectoryManager replicationShardDirectoryManager =
+                replicationLogTracker.getReplicationShardDirectoryManager();
+        ReplicationRound nextRoundToProcess =
+                
replicationShardDirectoryManager.getNextRound(getLastRoundProcessed());
+        ReplicationRound currentTimestampRound =
+                
replicationShardDirectoryManager.getReplicationRoundFromStartTime(
+                        EnvironmentEdgeManager.currentTime());
+        LOG.debug("Checking the new files from next round {} to current 
timestamp round {}.",
+                nextRoundToProcess, currentTimestampRound);
+        boolean isInDirectoryEmpty = 
replicationLogTracker.getNewFiles(nextRoundToProcess,
+                currentTimestampRound).isEmpty();
+
+        if (!isInDirectoryEmpty) {
+            LOG.debug("Failover not triggered. New files exist from next round 
to current "
+                    + "timestamp round.");
+            return false;
+        }
+
+        LOG.info("Failover can be triggered.");
+        return true;
     }
 
     protected void triggerFailover() {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index f253bcbb07..90b3a0fdc9 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -1650,7 +1650,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
 
     /**
      * Tests the shouldTriggerFailover method with various combinations of 
failoverPending,
-     * in-progress files, and new files for next round.
+     * in-progress files, and new files from next round to current timestamp 
round.
      */
     @Test
     public void testShouldTriggerFailover() throws IOException {
@@ -1668,12 +1668,14 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
         try {
             // Create test rounds
             ReplicationRound testRound = new ReplicationRound(1704153600000L, 
1704153660000L);
-            ReplicationRound nextRound = 
tracker.getReplicationShardDirectoryManager().getNextRound(testRound);
+            ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+            ReplicationRound nextRoundToProcess = 
shardManager.getNextRound(testRound);
+            ReplicationRound currentTimestampRound = 
shardManager.getReplicationRoundFromStartTime(currentTime);
 
             // Test Case 1: All conditions true - should return true
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+                when(tracker.getNewFiles(nextRoundToProcess, 
currentTimestampRound)).thenReturn(Collections.emptyList());
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
@@ -1686,7 +1688,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
             // Test Case 2: failoverPending is false - should return false
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+                when(tracker.getNewFiles(nextRoundToProcess, 
currentTimestampRound)).thenReturn(Collections.emptyList());
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
@@ -1699,7 +1701,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
             // Test Case 3: in-progress files not empty - should return false
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test.plog")));
-                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+                when(tracker.getNewFiles(nextRoundToProcess, 
currentTimestampRound)).thenReturn(Collections.emptyList());
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
@@ -1709,23 +1711,23 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
                         discovery.shouldTriggerFailover());
             }
 
-            // Test Case 4: new files exist for next round - should return 
false
+            // Test Case 4: new files exist from next round to current 
timestamp round - should return false
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
 Path("test.plog")));
+                when(tracker.getNewFiles(nextRoundToProcess, 
currentTimestampRound)).thenReturn(Collections.singletonList(new 
Path("test.plog")));
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
                 discovery.setFailoverPending(true);
 
-                assertFalse("Should not trigger failover when new files exist 
for next round",
+                assertFalse("Should not trigger failover when new files exist 
from next round to current timestamp round",
                         discovery.shouldTriggerFailover());
             }
 
             // Test Case 5: failoverPending false AND in-progress files not 
empty - should return false
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test.plog")));
-                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList());
+                when(tracker.getNewFiles(nextRoundToProcess, 
currentTimestampRound)).thenReturn(Collections.emptyList());
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
@@ -1738,7 +1740,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
             // Test Case 6: failoverPending false AND new files exist - should 
return false
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList());
-                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
 Path("test.plog")));
+                when(tracker.getNewFiles(nextRoundToProcess, 
currentTimestampRound)).thenReturn(Collections.singletonList(new 
Path("test.plog")));
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
@@ -1751,7 +1753,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
             // Test Case 7: in-progress files not empty AND new files exist - 
should return false
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test1.plog")));
-                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
 Path("test2.plog")));
+                when(tracker.getNewFiles(nextRoundToProcess, 
currentTimestampRound)).thenReturn(Collections.singletonList(new 
Path("test2.plog")));
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
@@ -1764,7 +1766,7 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
             // Test Case 8: All conditions false - should return false
             {
                 
when(tracker.getInProgressFiles()).thenReturn(Collections.singletonList(new 
Path("test.plog")));
-                
when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.singletonList(new
 Path("test2.plog")));
+                when(tracker.getNewFiles(nextRoundToProcess, 
currentTimestampRound)).thenReturn(Collections.singletonList(new 
Path("test2.plog")));
                 TestableReplicationLogDiscoveryReplay discovery = new 
TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
                 discovery.setLastRoundInSync(testRound);
                 discovery.setLastRoundProcessed(testRound);
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
index e8bc60e017..ae7e9cb3f0 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java
@@ -1283,6 +1283,283 @@ public class ReplicationLogTrackerTest {
         assertFalse("Should not contain invalidFile3", 
resultFilenames.contains(invalidFile3.getName()));
     }
 
+    @Test
+    public void testGetNewFilesWithStartAndEndRound() throws IOException {
+        // Initialize tracker
+        tracker.init();
+
+        ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+        long roundDurationMs = 
shardManager.getReplicationRoundDurationSeconds() * 1000L;
+
+        // Create start round (60 seconds duration)
+        long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+        long startRoundEndTime = startRoundStartTime + roundDurationMs;
+        ReplicationRound startRound = new 
ReplicationRound(startRoundStartTime, startRoundEndTime);
+
+        // Create middle round
+        long middleRoundStartTime = startRoundEndTime;
+        long middleRoundEndTime = middleRoundStartTime + roundDurationMs;
+        ReplicationRound middleRound = new 
ReplicationRound(middleRoundStartTime, middleRoundEndTime);
+
+        // Create end round
+        long endRoundStartTime = middleRoundEndTime;
+        long endRoundEndTime = endRoundStartTime + roundDurationMs;
+        ReplicationRound endRound = new ReplicationRound(endRoundStartTime, 
endRoundEndTime);
+
+        // Get shard directories for each round
+        Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+        Path middleRoundShardDir = shardManager.getShardDirectory(middleRound);
+        Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+        // Create shard directories
+        localFs.mkdirs(startRoundShardDir);
+        localFs.mkdirs(middleRoundShardDir);
+        localFs.mkdirs(endRoundShardDir);
+
+        // Create files in start round
+        Path startRoundFile1 = new Path(startRoundShardDir, 
startRoundStartTime + "_rs1.plog");
+        Path startRoundFile2 = new Path(startRoundShardDir, 
startRoundStartTime + 30000 + "_rs2.plog");
+
+        // Create files in middle round
+        Path middleRoundFile1 = new Path(middleRoundShardDir, 
middleRoundStartTime + "_rs3.plog");
+        Path middleRoundFile2 = new Path(middleRoundShardDir, 
middleRoundStartTime + 30000 + "_rs4.plog");
+
+        // Create files in end round
+        Path endRoundFile1 = new Path(endRoundShardDir, endRoundStartTime + 
"_rs5.plog");
+        Path endRoundFile2 = new Path(endRoundShardDir, endRoundStartTime + 
30000 + "_rs6.plog");
+
+        // Create files outside the range (before start round)
+        ReplicationRound beforeStartRound = 
shardManager.getPreviousRound(startRound);
+        Path beforeStartRoundShardDir = 
shardManager.getShardDirectory(beforeStartRound);
+        localFs.mkdirs(beforeStartRoundShardDir);
+        Path beforeStartRoundFile = new Path(beforeStartRoundShardDir, 
beforeStartRound.getStartTime() + "_rs0.plog");
+
+        // Create files outside the range (after end round)
+        ReplicationRound afterEndRound = shardManager.getNextRound(endRound);
+        Path afterEndRoundShardDir = 
shardManager.getShardDirectory(afterEndRound);
+        localFs.mkdirs(afterEndRoundShardDir);
+        Path afterEndRoundFile = new Path(afterEndRoundShardDir, 
afterEndRound.getStartTime() + "_rs7.plog");
+
+        // Create all files
+        localFs.create(startRoundFile1, true).close();
+        localFs.create(startRoundFile2, true).close();
+        localFs.create(middleRoundFile1, true).close();
+        localFs.create(middleRoundFile2, true).close();
+        localFs.create(endRoundFile1, true).close();
+        localFs.create(endRoundFile2, true).close();
+        localFs.create(beforeStartRoundFile, true).close();
+        localFs.create(afterEndRoundFile, true).close();
+
+        // Call getNewFiles with startRound and endRound
+        List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+        // Verify file system operations
+        // Should call getNewFilesForRound for each round (start, middle, end)
+        // Each call to getNewFilesForRound calls exists() and listStatus() on 
shard directories
+        // Note: init() already called exists() on in-progress directory
+        Mockito.verify(mockFs, 
times(1)).exists(Mockito.eq(startRoundShardDir));
+        Mockito.verify(mockFs, 
times(1)).exists(Mockito.eq(middleRoundShardDir));
+        Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+        Mockito.verify(mockFs, times(3)).listStatus(Mockito.any(Path.class));
+
+        // Prepare expected set of file paths (should include files from 
start, middle, and end rounds)
+        Set<String> expectedPaths = new HashSet<>();
+        expectedPaths.add(startRoundFile1.toString());
+        expectedPaths.add(startRoundFile2.toString());
+        expectedPaths.add(middleRoundFile1.toString());
+        expectedPaths.add(middleRoundFile2.toString());
+        expectedPaths.add(endRoundFile1.toString());
+        expectedPaths.add(endRoundFile2.toString());
+
+        // Create actual set of paths
+        Set<String> actualPaths = result.stream().map(path -> 
path.toUri().getPath()).collect(Collectors.toSet());
+
+        // Verify all files from start to end rounds are returned
+        assertEquals("Should return exactly 6 files from start to end rounds", 
expectedPaths.size(), actualPaths.size());
+        assertEquals("File paths do not match", expectedPaths, actualPaths);
+
+        // Verify files outside the range are not included
+        assertFalse("Should not contain file from before start round", 
actualPaths.contains(beforeStartRoundFile.toString()));
+        assertFalse("Should not contain file from after end round", 
actualPaths.contains(afterEndRoundFile.toString()));
+    }
+
+    @Test
+    public void testGetNewFilesWithSameStartAndEndRound() throws IOException {
+        // Initialize tracker
+        tracker.init();
+
+        ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+        long roundDurationMs = 
shardManager.getReplicationRoundDurationSeconds() * 1000L;
+
+        // Create a single round
+        long roundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+        long roundEndTime = roundStartTime + roundDurationMs;
+        ReplicationRound round = new ReplicationRound(roundStartTime, 
roundEndTime);
+
+        // Get shard directory for this round
+        Path roundShardDir = shardManager.getShardDirectory(round);
+        localFs.mkdirs(roundShardDir);
+
+        // Create files in the round
+        Path file1 = new Path(roundShardDir, roundStartTime + "_rs1.plog");
+        Path file2 = new Path(roundShardDir, roundStartTime + 30000 + 
"_rs2.plog");
+        Path file3 = new Path(roundShardDir, roundStartTime + 50000 + 
"_rs3.plog");
+
+        // Create files
+        localFs.create(file1, true).close();
+        localFs.create(file2, true).close();
+        localFs.create(file3, true).close();
+
+        // Call getNewFiles with same start and end round
+        List<Path> result = tracker.getNewFiles(round, round);
+
+        // Verify file system operations
+        // Should call getNewFilesForRound once for the round
+        // Note: init() already called exists() on in-progress directory
+        Mockito.verify(mockFs, times(1)).exists(Mockito.eq(roundShardDir));
+        Mockito.verify(mockFs, times(1)).listStatus(Mockito.any(Path.class));
+
+        // Prepare expected set of file paths
+        Set<String> expectedPaths = new HashSet<>();
+        expectedPaths.add(file1.toString());
+        expectedPaths.add(file2.toString());
+        expectedPaths.add(file3.toString());
+
+        // Create actual set of paths
+        Set<String> actualPaths = result.stream().map(path -> 
path.toUri().getPath()).collect(Collectors.toSet());
+
+        // Verify all files from the round are returned
+        assertEquals("Should return exactly 3 files from the round", 
expectedPaths.size(), actualPaths.size());
+        assertEquals("File paths do not match", expectedPaths, actualPaths);
+    }
+
+    @Test
+    public void testGetNewFilesWithInvalidRange() throws IOException {
+        // Initialize tracker
+        tracker.init();
+
+        ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+        long roundDurationMs = 
shardManager.getReplicationRoundDurationSeconds() * 1000L;
+
+        // Create end round (earlier time)
+        long endRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+        long endRoundEndTime = endRoundStartTime + roundDurationMs;
+        ReplicationRound endRound = new ReplicationRound(endRoundStartTime, 
endRoundEndTime);
+
+        // Create start round (later time) - invalid: start > end
+        long startRoundStartTime = endRoundEndTime;
+        long startRoundEndTime = startRoundStartTime + roundDurationMs;
+        ReplicationRound startRound = new 
ReplicationRound(startRoundStartTime, startRoundEndTime);
+
+        // Get shard directories
+        Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+        Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+        // Create shard directories
+        localFs.mkdirs(startRoundShardDir);
+        localFs.mkdirs(endRoundShardDir);
+
+        // Create files in both rounds
+        Path startRoundFile = new Path(startRoundShardDir, startRoundStartTime 
+ "_rs1.plog");
+        Path endRoundFile = new Path(endRoundShardDir, endRoundStartTime + 
"_rs2.plog");
+
+        localFs.create(startRoundFile, true).close();
+        localFs.create(endRoundFile, true).close();
+
+        // Call getNewFiles with invalid range (startRound.getStartTime() > 
endRound.getStartTime())
+        List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+        // Verify empty list is returned when startRound.getStartTime() > 
endRound.getStartTime()
+        assertTrue("Should return empty list for invalid range", 
result.isEmpty());
+
+        // Verify no file system operations were performed on shard 
directories (early return)
+        // Note: init() already called exists() and mkdirs() on in-progress 
directory
+        Mockito.verify(mockFs, 
times(0)).exists(Mockito.eq(startRoundShardDir));
+        Mockito.verify(mockFs, times(0)).exists(Mockito.eq(endRoundShardDir));
+        Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+    }
+
+    @Test
+    public void testGetNewFilesWithEmptyRounds() throws IOException {
+        // Initialize tracker
+        tracker.init();
+
+        ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+        long roundDurationMs = 
shardManager.getReplicationRoundDurationSeconds() * 1000L;
+
+        // Create start round
+        long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+        long startRoundEndTime = startRoundStartTime + roundDurationMs;
+        ReplicationRound startRound = new 
ReplicationRound(startRoundStartTime, startRoundEndTime);
+
+        // Create end round
+        long endRoundStartTime = startRoundEndTime;
+        long endRoundEndTime = endRoundStartTime + roundDurationMs;
+        ReplicationRound endRound = new ReplicationRound(endRoundStartTime, 
endRoundEndTime);
+
+        // Get shard directories
+        Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+        Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+        // Create shard directories but leave them empty
+        localFs.mkdirs(startRoundShardDir);
+        localFs.mkdirs(endRoundShardDir);
+
+        // Call getNewFiles with empty rounds
+        List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+        // Verify file system operations
+        // Should call getNewFilesForRound for each round (start and end)
+        // Note: init() already called exists() on in-progress directory
+        Mockito.verify(mockFs, 
times(1)).exists(Mockito.eq(startRoundShardDir));
+        Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+        Mockito.verify(mockFs, times(2)).listStatus(Mockito.any(Path.class));
+
+        // Verify empty list is returned
+        assertTrue("Should return empty list for empty rounds", 
result.isEmpty());
+    }
+
+    @Test
+    public void testGetNewFilesWithNonExistentRounds() throws IOException {
+        // Initialize tracker
+        tracker.init();
+
+        ReplicationShardDirectoryManager shardManager = 
tracker.getReplicationShardDirectoryManager();
+        long roundDurationMs = 
shardManager.getReplicationRoundDurationSeconds() * 1000L;
+
+        // Create start round
+        long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00
+        long startRoundEndTime = startRoundStartTime + roundDurationMs;
+        ReplicationRound startRound = new 
ReplicationRound(startRoundStartTime, startRoundEndTime);
+
+        // Create end round
+        long endRoundStartTime = startRoundEndTime;
+        long endRoundEndTime = endRoundStartTime + roundDurationMs;
+        ReplicationRound endRound = new ReplicationRound(endRoundStartTime, 
endRoundEndTime);
+
+        // Get shard directories
+        Path startRoundShardDir = shardManager.getShardDirectory(startRound);
+        Path endRoundShardDir = shardManager.getShardDirectory(endRound);
+
+        // Assert that shard directories do not exist
+        assertFalse("Start round shard directory should not exist", 
localFs.exists(startRoundShardDir));
+        assertFalse("End round shard directory should not exist", 
localFs.exists(endRoundShardDir));
+
+        // Call getNewFiles with non-existent rounds
+        List<Path> result = tracker.getNewFiles(startRound, endRound);
+
+        // Verify file system operations
+        // Should call exists() for each round (start and end)
+        // Note: init() already called exists() on in-progress directory
+        Mockito.verify(mockFs, 
times(1)).exists(Mockito.eq(startRoundShardDir));
+        Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir));
+        // listStatus() should not be called when directories don't exist
+        Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class));
+
+        // Verify empty list is returned
+        assertTrue("Should return empty list for non-existent rounds", 
result.isEmpty());
+    }
+
     private int countDirectories(FileSystem fs, Path path) throws IOException {
         if (!fs.exists(path)) {
             return 0;


Reply via email to