This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 55dc013470f Fix segment move race condition, add simulation test
(#18162)
55dc013470f is described below
commit 55dc013470f8c8e6c5940b69d6ebb2aeec3327d8
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jun 20 14:56:46 2025 +0530
Fix segment move race condition, add simulation test (#18162)
Description:
Addresses the race condition identified in #18161
Race condition:
- Segment is moving from server A to server B.
- If it has just finished its MOVE_TO operation on B but the MOVE_FROM
operation on A is still pending,
the segment may be chosen to drop from server B causing under-replication.
- If it is chosen to drop from server A, that operation would be ignored
since the segment already has
an ongoing operation on server A.
Fix:
- Use the delta of MOVE_FROM - MOVE_TO to track pending drop callbacks
- Add a simulation test to verify the change. This test fails before the
fix.
---
.../coordinator/loading/SegmentReplicaCount.java | 23 +++++++++---
.../loading/StrategicSegmentAssigner.java | 3 +-
.../simulate/CoordinatorSimulation.java | 6 ++++
.../simulate/CoordinatorSimulationBaseTest.java | 6 ++++
.../simulate/CoordinatorSimulationBuilder.java | 19 ++++++++--
.../coordinator/simulate/SegmentBalancingTest.java | 41 ++++++++++++++++++++++
6 files changed, 91 insertions(+), 7 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java
index bcaa23083a6..1f9650a59f0 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java
@@ -33,7 +33,8 @@ public class SegmentReplicaCount
private int loading;
private int dropping;
- private int moving;
+ private int movingTo;
+ private int movingFrom;
/**
* Increments number of replicas loaded on historical servers.
@@ -63,7 +64,10 @@ public class SegmentReplicaCount
++loading;
break;
case MOVE_TO:
- ++moving;
+ ++movingTo;
+ break;
+ case MOVE_FROM:
+ ++movingFrom;
break;
case DROP:
++dropping;
@@ -110,7 +114,17 @@ public class SegmentReplicaCount
int moving()
{
- return moving;
+ return movingTo;
+ }
+
+ /**
+ * Number of moving segments which have been loaded on the target server but
+ * are yet to be dropped from the source server. This value can be negative
+ * only if the source server disappears before the move has finished.
+ */
+ int moveCompletedPendingDrop()
+ {
+ return movingFrom - movingTo;
}
/**
@@ -162,6 +176,7 @@ public class SegmentReplicaCount
this.loading += other.loading;
this.dropping += other.dropping;
- this.moving += other.moving;
+ this.movingTo += other.movingTo;
+ this.movingFrom += other.movingFrom;
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
index 654fe42b220..b87e545605c 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
@@ -264,7 +264,8 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
= replicaCountMap.get(segment.getId(), tier);
final int projectedReplicas = replicaCountOnTier.loadedNotDropping()
- + replicaCountOnTier.loading();
+ + replicaCountOnTier.loading()
+ - Math.max(0,
replicaCountOnTier.moveCompletedPendingDrop());
final int movingReplicas = replicaCountOnTier.moving();
final boolean shouldCancelMoves = requiredReplicas == 0 && movingReplicas
> 0;
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index e0dc5b128bf..114b03da2b8 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -107,6 +107,12 @@ public interface CoordinatorSimulation
*/
void loadQueuedSegments();
+ /**
+ * Finishes load of all the segments that were queued in the previous
+ * coordinator run. Does not execute the respective callbacks on the
coordinator.
+ */
+ void loadQueuedSegmentsSkipCallbacks();
+
/**
* Removes the specified server from the cluster.
*/
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index b9db5921ee3..be985fffca0 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -126,6 +126,12 @@ public abstract class CoordinatorSimulationBaseTest
implements
sim.cluster().loadQueuedSegments();
}
+ @Override
+ public void loadQueuedSegmentsSkipCallbacks()
+ {
+ sim.cluster().loadQueuedSegmentsSkipCallbacks();
+ }
+
@Override
public void removeServer(DruidServer server)
{
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index b8e56419c02..223a29e5d85 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -327,8 +327,19 @@ public class CoordinatorSimulationBuilder
return env.coordinatorInventoryView.getInventoryValue(serverName);
}
+ @Override
+ public void loadQueuedSegmentsSkipCallbacks()
+ {
+ loadSegments(false);
+ }
+
@Override
public void loadQueuedSegments()
+ {
+ loadSegments(true);
+ }
+
+ private void loadSegments(boolean executeCallbacks)
{
verifySimulationRunning();
Preconditions.checkState(
@@ -337,7 +348,9 @@ public class CoordinatorSimulationBuilder
);
final BlockingExecutorService loadQueueExecutor =
env.executorFactory.loadQueueExecutor;
- while (loadQueueExecutor.hasPendingTasks()) {
+ final BlockingExecutorService loadCallbackExecutor =
env.executorFactory.loadCallbackExecutor;
+ while (loadQueueExecutor.hasPendingTasks()
+ || (executeCallbacks && loadCallbackExecutor.hasPendingTasks())) {
// Drain all the items from the load queue executor
// This sends at most 1 load/drop request to each server
loadQueueExecutor.finishAllPendingTasks();
@@ -345,7 +358,9 @@ public class CoordinatorSimulationBuilder
// Load all the queued segments, handle their responses and execute
callbacks
int loadedSegments =
env.executorFactory.historicalLoader.finishAllPendingTasks();
loadQueueExecutor.finishNextPendingTasks(loadedSegments);
- env.executorFactory.loadCallbackExecutor.finishAllPendingTasks();
+ if (executeCallbacks) {
+ env.executorFactory.loadCallbackExecutor.finishAllPendingTasks();
+ }
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
index dabf1f2ee81..41e38ab40e2 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
@@ -92,6 +92,47 @@ public class SegmentBalancingTest extends
CoordinatorSimulationBaseTest
verifyDatasourceIsFullyLoaded(datasource);
}
+ @Test
+ public void testBalancingDoesNotUnderReplicateSegment()
+ {
+ // historicals = 2(T1), replicas = 1(T1)
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11, historicalT12)
+ .withRules(datasource, Load.on(Tier.T1,
1).forever())
+ .build();
+
+ // Put all the segments on histT11
+ segments.forEach(historicalT11::addDataSegment);
+
+ // Run cycle and verify that segments have been chosen for balancing
+ startSimulation(sim);
+ runCoordinatorCycle();
+ verifyValue(Metric.MOVED_COUNT, 5L);
+
+ // Load segments, skip callbacks and verify that some segments are now
loaded on histT12
+ loadQueuedSegmentsSkipCallbacks();
+ Assert.assertEquals(10, historicalT11.getTotalSegments());
+ Assert.assertEquals(5, historicalT12.getTotalSegments());
+
+ // Run another coordinator cycle
+ runCoordinatorCycle();
+ loadQueuedSegmentsSkipCallbacks();
+
+ // Verify that segments have not been dropped from either server since
+ // MOVE_FROM operation is still not complete
+ Assert.assertEquals(10, historicalT11.getTotalSegments());
+ Assert.assertEquals(5, historicalT12.getTotalSegments());
+ verifyNotEmitted(Metric.DROPPED_COUNT);
+ verifyNotEmitted(Metric.MOVED_COUNT);
+
+ // Finish the move operations
+ loadQueuedSegments();
+ Assert.assertEquals(5, historicalT11.getTotalSegments());
+ Assert.assertEquals(5, historicalT12.getTotalSegments());
+ }
+
@Test
public void testDropDoesNotHappenWhenLoadFails()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]