This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 28d2763c41 PHOENIX-7813 Align ReplicationLogDiscovery scheduler to
round boundaries to fix file distribution imbalance across RS (#2432)
28d2763c41 is described below
commit 28d2763c4177097801a6be90bc06ca2820f476f8
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Tue May 5 18:34:28 2026 +0530
PHOENIX-7813 Align ReplicationLogDiscovery scheduler to round boundaries to
fix file distribution imbalance across RS (#2432)
---
.../replication/ReplicationLogDiscovery.java | 39 ++++--
.../reader/ReplicationLogDiscoveryReplay.java | 17 ---
.../ReplicationLogDiscoveryReplayTestIT.java | 32 +++--
.../replication/ReplicationLogDiscoveryTest.java | 153 ++++++++++++++++++++-
4 files changed, 197 insertions(+), 44 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 175de94f87..3c64e9ec61 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -65,11 +65,6 @@ public abstract class ReplicationLogDiscovery {
*/
private static final String DEFAULT_EXECUTOR_THREAD_NAME_FORMAT =
"ReplicationLogDiscovery-%d";
- /**
- * Default interval in seconds between replay operations
- */
- private static final long DEFAULT_REPLAY_INTERVAL_SECONDS = 10;
-
/**
* Default timeout in seconds for graceful shutdown of the executor service
*/
@@ -150,13 +145,17 @@ public abstract class ReplicationLogDiscovery {
// Initialize and schedule the executors
scheduler = Executors.newScheduledThreadPool(getExecutorThreadCount(),
new
ThreadFactoryBuilder().setNameFormat(getExecutorThreadNameFormat()).build());
+ long initialDelayMs = computeAlignedInitialDelay();
+ long replayIntervalMs = getReplayIntervalMillis();
+ LOG.info("Scheduling replay for haGroup: {} with initialDelay={}ms,
interval={}ms",
+ haGroupName, initialDelayMs, replayIntervalMs);
scheduler.scheduleAtFixedRate(() -> {
try {
replay();
} catch (Exception e) {
LOG.error("Error during replay", e);
}
- }, 0, getReplayIntervalSeconds(), TimeUnit.SECONDS);
+ }, initialDelayMs, replayIntervalMs, TimeUnit.MILLISECONDS);
isRunning = true;
LOG.info("ReplicationLogDiscovery started for haGroup: {}", haGroupName);
@@ -293,7 +292,7 @@ public abstract class ReplicationLogDiscovery {
long startTime = EnvironmentEdgeManager.currentTime();
List<Path> files =
replicationLogTracker.getNewFilesForRound(replicationRound);
LOG.info("Number of new files for round {} is {}", replicationRound,
files.size());
- while (!files.isEmpty()) {
+ while (!files.isEmpty() && isRunning()) {
processOneRandomFile(files);
files = replicationLogTracker.getNewFilesForRound(replicationRound);
}
@@ -323,7 +322,7 @@ public abstract class ReplicationLogDiscovery {
LOG.info("Number of {} files with renameTimestampThreshold {} is {} for
haGroup: {}",
replicationLogTracker.getInProgressLogSubDirectoryName(),
renameTimestampThreshold,
files.size(), haGroupName);
- while (!files.isEmpty()) {
+ while (!files.isEmpty() && isRunning()) {
Optional<Path> failedFile = processOneRandomFile(files);
if (failedFile.isPresent()) {
String prefix = replicationLogTracker.getFilePrefix(failedFile.get());
@@ -472,12 +471,12 @@ public abstract class ReplicationLogDiscovery {
}
/**
- * Returns the replay interval in seconds. Subclasses can override this
method to provide custom
- * intervals.
- * @return The replay interval in seconds (default: 10 seconds).
+ * Returns the replay interval in milliseconds. Subclasses can override this
method to provide
+ * custom intervals. Defaults to the round duration.
+ * @return The replay interval in milliseconds.
*/
- public long getReplayIntervalSeconds() {
- return DEFAULT_REPLAY_INTERVAL_SECONDS;
+ public long getReplayIntervalMillis() {
+ return roundTimeMills;
}
/**
@@ -507,6 +506,20 @@ public abstract class ReplicationLogDiscovery {
return DEFAULT_WAITING_BUFFER_PERCENTAGE;
}
+ /**
+ * Computes initial delay to align the scheduler to round-eligible
boundaries so all RS wake up at
+ * the same wall-clock moment. A round becomes eligible when currentTime >=
roundEndTime +
+ * bufferMillis, and rounds repeat every roundTimeMills. This gives a
universal grid of eligible
+ * ticks at bufferMillis, bufferMillis + roundTimeMills, bufferMillis +
2*roundTimeMills, etc.
+ * from epoch. All RS compute the same grid regardless of when start() is
called.
+ * @return the initial delay in milliseconds until the next round-eligible
tick
+ */
+ protected long computeAlignedInitialDelay() {
+ long now = EnvironmentEdgeManager.currentTime();
+ long elapsed = (now - bufferMillis) % roundTimeMills;
+ return (elapsed == 0) ? 0 : roundTimeMills - elapsed;
+ }
+
public int getInProgressFileMaxRetries() {
return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index f20ff7ae21..4d5f886d51 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -57,12 +57,6 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
public static final String EXECUTOR_THREAD_NAME_FORMAT =
"Phoenix-ReplicationLogDiscoveryReplay-%d";
- /**
- * Configuration key for replay interval in seconds
- */
- public static final String REPLICATION_REPLAY_INTERVAL_SECONDS_KEY =
- "phoenix.replication.replay.interval.seconds";
-
/**
* Configuration key for shutdown timeout in seconds
*/
@@ -87,11 +81,6 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
public static final String REPLICATION_REPLAY_WAITING_BUFFER_PERCENTAGE_KEY =
"phoenix.replication.replay.waiting.buffer.percentage";
- /**
- * Default replay interval in seconds. Controls how frequently the replay
process runs.
- */
- public static final long DEFAULT_REPLAY_INTERVAL_SECONDS = 60;
-
/**
* Default shutdown timeout in seconds. Maximum time to wait for executor
service to shutdown
* gracefully.
@@ -422,12 +411,6 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
return EXECUTOR_THREAD_NAME_FORMAT;
}
- @Override
- public long getReplayIntervalSeconds() {
- return getConf().getLong(REPLICATION_REPLAY_INTERVAL_SECONDS_KEY,
- DEFAULT_REPLAY_INTERVAL_SECONDS);
- }
-
@Override
public long getShutdownTimeoutSeconds() {
return getConf().getLong(REPLICATION_REPLAY_SHUTDOWN_TIMEOUT_SECONDS_KEY,
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index 8d7568f422..628652d001 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -113,24 +113,32 @@ public class ReplicationLogDiscoveryReplayTestIT extends
HABaseIT {
}
/**
- * Tests the replay interval configuration with default and custom values.
+ * Tests that replay interval always matches the configured round duration.
*/
@Test
- public void testGetReplayIntervalSeconds() throws IOException {
- // Create ReplicationLogDiscoveryReplay instance
+ public void testGetReplayIntervalMillis() throws IOException {
+ // Test with default round duration
TestableReplicationLogTracker fileTracker =
createReplicationLogTracker(conf1, haGroupName, rootFs, rootUri);
ReplicationLogDiscoveryReplay discovery = new
ReplicationLogDiscoveryReplay(fileTracker);
+ long expectedRoundMillis =
+
fileTracker.getReplicationShardDirectoryManager().getReplicationRoundDurationSeconds()
+ * 1000L;
+ assertEquals("Replay interval should match round duration",
expectedRoundMillis,
+ discovery.getReplayIntervalMillis());
- // Test default value when no custom config is set
- long defaultResult = discovery.getReplayIntervalSeconds();
- assertEquals("Should return default value when no custom config is set",
- ReplicationLogDiscoveryReplay.DEFAULT_REPLAY_INTERVAL_SECONDS,
defaultResult);
-
- // Test custom value when config is set
-
conf1.setLong(ReplicationLogDiscoveryReplay.REPLICATION_REPLAY_INTERVAL_SECONDS_KEY,
120L);
- long customResult = discovery.getReplayIntervalSeconds();
- assertEquals("Should return custom value when config is set", 120L,
customResult);
+ // Test with custom round duration
+
conf1.setInt(ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
+ 120);
+ try {
+ TestableReplicationLogTracker fileTracker2 =
+ createReplicationLogTracker(conf1, haGroupName, rootFs, rootUri);
+ ReplicationLogDiscoveryReplay discovery2 = new
ReplicationLogDiscoveryReplay(fileTracker2);
+ assertEquals("Replay interval should match custom round duration",
120_000L,
+ discovery2.getReplayIntervalMillis());
+ } finally {
+
conf1.unset(ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY);
+ }
}
/**
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
index caacf175ce..6ed7d8723f 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
@@ -95,6 +95,7 @@ public class ReplicationLogDiscoveryTest {
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
0);
discovery = Mockito.spy(new TestableReplicationLogDiscovery(fileTracker));
Mockito.doReturn(metricsLogDiscovery).when(discovery).getMetrics();
+ Mockito.doReturn(true).when(discovery).isRunning();
}
@After
@@ -115,6 +116,7 @@ public class ReplicationLogDiscoveryTest {
*/
@Test
public void testStartAndStop() throws IOException {
+ Mockito.doCallRealMethod().when(discovery).isRunning();
// 1. Validate that it's not running initially
assertFalse("Discovery should not be running initially",
discovery.isRunning());
@@ -138,8 +140,8 @@ public class ReplicationLogDiscoveryTest {
threadName.contains("ReplicationLogDiscovery"));
// Verify replay interval
- long replayInterval = discovery.getReplayIntervalSeconds();
- assertEquals("Replay interval should be 10 seconds", 10L, replayInterval);
+ long replayInterval = discovery.getReplayIntervalMillis();
+ assertEquals("Replay interval should be 60000 milliseconds", 60_000L,
replayInterval);
// 6. Ensure starting again does not create a new scheduler (and also
should not throw any
// exception)
@@ -159,6 +161,153 @@ public class ReplicationLogDiscoveryTest {
assertFalse("Discovery should not be running after stop",
discovery.isRunning());
}
+ @Test
+ public void testComputeAlignedInitialDelay() {
+ long roundTimeMs = discovery.roundTimeMills;
+ long bufferMs = discovery.bufferMillis;
+
+ // RS initialize at different times within the same round window.
+ // All should align to the same next tick.
+ // With roundTimeMs=60000 and bufferMs=9000, ticks are at 9000, 69000,
129000, ...
+ // Place all 3 RS between tick 69000 and tick 129000 so they all target
129000.
+ AtomicLong mockTime = new AtomicLong();
+ EnvironmentEdgeManager.injectEdge(new EnvironmentEdge() {
+ @Override
+ public long currentTime() {
+ return mockTime.get();
+ }
+ });
+
+ try {
+ // RS-1 starts early in the window (10s after previous tick at 69000)
+ mockTime.set(79_000L);
+ long delay1 = discovery.computeAlignedInitialDelay();
+ long tick1 = mockTime.get() + delay1;
+
+ // RS-2 starts 30s later
+ mockTime.set(109_000L);
+ long delay2 = discovery.computeAlignedInitialDelay();
+ long tick2 = mockTime.get() + delay2;
+
+ // RS-3 starts 50s after RS-1 (just before the tick)
+ mockTime.set(128_000L);
+ long delay3 = discovery.computeAlignedInitialDelay();
+ long tick3 = mockTime.get() + delay3;
+
+ // All should align to the same tick (129000)
+ assertEquals("RS-1 and RS-2 should align to the same tick", tick1,
tick2);
+ assertEquals("RS-2 and RS-3 should align to the same tick", tick2,
tick3);
+ assertEquals("All should target tick at 129000", 129_000L, tick1);
+
+ // Delay should always be > 0 and <= roundTimeMs
+ assertTrue("Delay should be positive", delay1 > 0);
+ assertTrue("Delay should not exceed round time", delay1 <= roundTimeMs);
+ assertTrue("Delay should be positive", delay2 > 0);
+ assertTrue("Delay should not exceed round time", delay2 <= roundTimeMs);
+
+ // The aligned tick should be at a multiple of roundTimeMs offset by
bufferMs
+ assertEquals("Tick should be aligned to round-eligible boundary", 0,
+ (tick1 - bufferMs) % roundTimeMs);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ @Test
+ public void testComputeAlignedInitialDelayExactlyOnTick() {
+ long roundTimeMs = discovery.roundTimeMills;
+ long bufferMs = discovery.bufferMillis;
+
+ AtomicLong mockTime = new AtomicLong();
+ EnvironmentEdgeManager.injectEdge(new EnvironmentEdge() {
+ @Override
+ public long currentTime() {
+ return mockTime.get();
+ }
+ });
+
+ try {
+ // Set time to exactly on a round-eligible tick boundary
+ long exactTick = roundTimeMs * 5 + bufferMs;
+ mockTime.set(exactTick);
+ long delay = discovery.computeAlignedInitialDelay();
+ assertEquals("Delay should be 0 when exactly on a tick", 0, delay);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ @Test
+ public void testComputeAlignedInitialDelaySlightlyBeyondBuffer() {
+ // Time is 12 seconds into the round (past the 9s buffer) — the tick has
already passed,
+ // so we should wait until the next round-eligible tick
+ long roundTimeMs = discovery.roundTimeMills;
+ long bufferMs = discovery.bufferMillis;
+
+ AtomicLong mockTime = new AtomicLong();
+ EnvironmentEdgeManager.injectEdge(new EnvironmentEdge() {
+ @Override
+ public long currentTime() {
+ return mockTime.get();
+ }
+ });
+
+ try {
+ // Round-eligible tick is at roundTimeMs * 5 + bufferMs
+ // Set time to 3s past that tick (12s into the round, buffer is 9s)
+ long tick = roundTimeMs * 5 + bufferMs;
+ long now = tick + 3_000L;
+ mockTime.set(now);
+ long delay = discovery.computeAlignedInitialDelay();
+
+ // Should wait until the next tick: tick + roundTimeMs
+ long expectedDelay = roundTimeMs - 3_000L;
+ assertEquals("Should wait until next round-eligible tick",
expectedDelay, delay);
+
+ // Verify the target tick is correct
+ long targetTick = now + delay;
+ assertEquals("Target should be next tick", tick + roundTimeMs,
targetTick);
+ assertEquals("Target should be aligned", 0, (targetTick - bufferMs) %
roundTimeMs);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ @Test
+ public void testComputeAlignedInitialDelaySlightlyBeforeBuffer() {
+ // Time is 6 seconds into the round (before the 9s buffer) — the tick
hasn't arrived yet,
+ // so we should wait for it
+ long roundTimeMs = discovery.roundTimeMills;
+ long bufferMs = discovery.bufferMillis;
+
+ AtomicLong mockTime = new AtomicLong();
+ EnvironmentEdgeManager.injectEdge(new EnvironmentEdge() {
+ @Override
+ public long currentTime() {
+ return mockTime.get();
+ }
+ });
+
+ try {
+ // Round-eligible tick is at roundTimeMs * 5 + bufferMs
+ // Set time to 3s before that tick (6s into the round, buffer is 9s)
+ long tick = roundTimeMs * 5 + bufferMs;
+ long now = tick - 3_000L;
+ mockTime.set(now);
+ long delay = discovery.computeAlignedInitialDelay();
+
+ // Should wait 3s until the upcoming tick
+ assertEquals("Should wait until upcoming round-eligible tick", 3_000L,
delay);
+
+ // Verify the target tick is correct
+ long targetTick = now + delay;
+ assertEquals("Target should be the upcoming tick", tick, targetTick);
+ assertEquals("Target should be aligned", 0, (targetTick - bufferMs) %
roundTimeMs);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
@Test
public void testGetInProgressFileMaxRetries() {
// Default value