This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 55dc013470f Fix segment move race condition, add simulation test 
(#18162)
55dc013470f is described below

commit 55dc013470f8c8e6c5940b69d6ebb2aeec3327d8
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jun 20 14:56:46 2025 +0530

    Fix segment move race condition, add simulation test (#18162)
    
    Description:
    Addresses the race condition identified in #18161
    
    Race condition:
    - Segment is moving from server A to server B.
    - If it has just finished its MOVE_TO operation on B but the MOVE_FROM 
operation on A is still pending,
    the segment may be chosen to drop from server B causing under-replication.
    - If it is chosen to drop from server A, that operation would be ignored 
since the segment already has
    an ongoing operation on server A.
    
    Fix:
    - Use the delta of MOVE_FROM - MOVE_TO to track pending drop callbacks
    - Add a simulation test to verify the change. This test fails before the 
fix.
---
 .../coordinator/loading/SegmentReplicaCount.java   | 23 +++++++++---
 .../loading/StrategicSegmentAssigner.java          |  3 +-
 .../simulate/CoordinatorSimulation.java            |  6 ++++
 .../simulate/CoordinatorSimulationBaseTest.java    |  6 ++++
 .../simulate/CoordinatorSimulationBuilder.java     | 19 ++++++++--
 .../coordinator/simulate/SegmentBalancingTest.java | 41 ++++++++++++++++++++++
 6 files changed, 91 insertions(+), 7 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java
index bcaa23083a6..1f9650a59f0 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java
@@ -33,7 +33,8 @@ public class SegmentReplicaCount
 
   private int loading;
   private int dropping;
-  private int moving;
+  private int movingTo;
+  private int movingFrom;
 
   /**
    * Increments number of replicas loaded on historical servers.
@@ -63,7 +64,10 @@ public class SegmentReplicaCount
         ++loading;
         break;
       case MOVE_TO:
-        ++moving;
+        ++movingTo;
+        break;
+      case MOVE_FROM:
+        ++movingFrom;
         break;
       case DROP:
         ++dropping;
@@ -110,7 +114,17 @@ public class SegmentReplicaCount
 
   int moving()
   {
-    return moving;
+    return movingTo;
+  }
+
+  /**
+   * Number of moving segments which have been loaded on the target server but
+   * are yet to be dropped from the source server. This value can be negative
+   * only if the source server disappears before the move has finished.
+   */
+  int moveCompletedPendingDrop()
+  {
+    return movingFrom - movingTo;
   }
 
   /**
@@ -162,6 +176,7 @@ public class SegmentReplicaCount
 
     this.loading += other.loading;
     this.dropping += other.dropping;
-    this.moving += other.moving;
+    this.movingTo += other.movingTo;
+    this.movingFrom += other.movingFrom;
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
index 654fe42b220..b87e545605c 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
@@ -264,7 +264,8 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
         = replicaCountMap.get(segment.getId(), tier);
 
     final int projectedReplicas = replicaCountOnTier.loadedNotDropping()
-                                  + replicaCountOnTier.loading();
+                                  + replicaCountOnTier.loading()
+                                  - Math.max(0, 
replicaCountOnTier.moveCompletedPendingDrop());
 
     final int movingReplicas = replicaCountOnTier.moving();
     final boolean shouldCancelMoves = requiredReplicas == 0 && movingReplicas 
> 0;
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index e0dc5b128bf..114b03da2b8 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -107,6 +107,12 @@ public interface CoordinatorSimulation
      */
     void loadQueuedSegments();
 
+    /**
+     * Finishes load of all the segments that were queued in the previous
+     * coordinator run. Does not execute the respective callbacks on the 
coordinator.
+     */
+    void loadQueuedSegmentsSkipCallbacks();
+
     /**
      * Removes the specified server from the cluster.
      */
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index b9db5921ee3..be985fffca0 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -126,6 +126,12 @@ public abstract class CoordinatorSimulationBaseTest 
implements
     sim.cluster().loadQueuedSegments();
   }
 
+  @Override
+  public void loadQueuedSegmentsSkipCallbacks()
+  {
+    sim.cluster().loadQueuedSegmentsSkipCallbacks();
+  }
+
   @Override
   public void removeServer(DruidServer server)
   {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index b8e56419c02..223a29e5d85 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -327,8 +327,19 @@ public class CoordinatorSimulationBuilder
       return env.coordinatorInventoryView.getInventoryValue(serverName);
     }
 
+    @Override
+    public void loadQueuedSegmentsSkipCallbacks()
+    {
+      loadSegments(false);
+    }
+
     @Override
     public void loadQueuedSegments()
+    {
+      loadSegments(true);
+    }
+
+    private void loadSegments(boolean executeCallbacks)
     {
       verifySimulationRunning();
       Preconditions.checkState(
@@ -337,7 +348,9 @@ public class CoordinatorSimulationBuilder
       );
 
       final BlockingExecutorService loadQueueExecutor = 
env.executorFactory.loadQueueExecutor;
-      while (loadQueueExecutor.hasPendingTasks()) {
+      final BlockingExecutorService loadCallbackExecutor = 
env.executorFactory.loadCallbackExecutor;
+      while (loadQueueExecutor.hasPendingTasks()
+             || (executeCallbacks && loadCallbackExecutor.hasPendingTasks())) {
         // Drain all the items from the load queue executor
         // This sends at most 1 load/drop request to each server
         loadQueueExecutor.finishAllPendingTasks();
@@ -345,7 +358,9 @@ public class CoordinatorSimulationBuilder
         // Load all the queued segments, handle their responses and execute 
callbacks
         int loadedSegments = 
env.executorFactory.historicalLoader.finishAllPendingTasks();
         loadQueueExecutor.finishNextPendingTasks(loadedSegments);
-        env.executorFactory.loadCallbackExecutor.finishAllPendingTasks();
+        if (executeCallbacks) {
+          env.executorFactory.loadCallbackExecutor.finishAllPendingTasks();
+        }
       }
     }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
index dabf1f2ee81..41e38ab40e2 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
@@ -92,6 +92,47 @@ public class SegmentBalancingTest extends 
CoordinatorSimulationBaseTest
     verifyDatasourceIsFullyLoaded(datasource);
   }
 
+  @Test
+  public void testBalancingDoesNotUnderReplicateSegment()
+  {
+    // historicals = 2(T1), replicas = 1(T1)
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 
1).forever())
+                             .build();
+
+    // Put all the segments on histT11
+    segments.forEach(historicalT11::addDataSegment);
+
+    // Run cycle and verify that segments have been chosen for balancing
+    startSimulation(sim);
+    runCoordinatorCycle();
+    verifyValue(Metric.MOVED_COUNT, 5L);
+
+    // Load segments, skip callbacks and verify that some segments are now 
loaded on histT12
+    loadQueuedSegmentsSkipCallbacks();
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(5, historicalT12.getTotalSegments());
+
+    // Run another coordinator cycle
+    runCoordinatorCycle();
+    loadQueuedSegmentsSkipCallbacks();
+
+    // Verify that segments have not been dropped from either server since
+    // MOVE_FROM operation is still not complete
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(5, historicalT12.getTotalSegments());
+    verifyNotEmitted(Metric.DROPPED_COUNT);
+    verifyNotEmitted(Metric.MOVED_COUNT);
+
+    // Finish the move operations
+    loadQueuedSegments();
+    Assert.assertEquals(5, historicalT11.getTotalSegments());
+    Assert.assertEquals(5, historicalT12.getTotalSegments());
+  }
+
   @Test
   public void testDropDoesNotHappenWhenLoadFails()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to