This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch 34.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/34.0.0 by this push:
     new d8da0e1f0a5 Cancel loads on clones if source server removes segment 
(#18283) (#18296)
d8da0e1f0a5 is described below

commit d8da0e1f0a5a8c01237832150d5203a0c9615770
Author: Lucas Capistrant <[email protected]>
AuthorDate: Sun Jul 20 10:32:45 2025 -0500

    Cancel loads on clones if source server removes segment (#18283) (#18296)
---
 .../server/coordinator/duty/CloneHistoricals.java  | 28 +++++++++-----
 .../simulate/CoordinatorSimulation.java            |  5 +++
 .../simulate/CoordinatorSimulationBaseTest.java    |  6 +++
 .../simulate/CoordinatorSimulationBuilder.java     | 11 ++++++
 .../simulate/HistoricalCloningTest.java            | 45 ++++++++++++++++++++++
 5 files changed, 86 insertions(+), 9 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
index fd4c293afc0..da201127f60 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
@@ -29,7 +29,6 @@ import org.apache.druid.server.coordinator.ServerCloneStatus;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.loading.SegmentAction;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
-import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Dimension;
 import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
@@ -66,7 +65,6 @@ public class CloneHistoricals implements CoordinatorDuty
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
     final Map<String, String> cloneServers = 
params.getCoordinatorDynamicConfig().getCloneServers();
-    final CoordinatorRunStats stats = params.getCoordinatorStats();
     final DruidCluster cluster = params.getDruidCluster();
 
     if (cloneServers.isEmpty()) {
@@ -111,13 +109,8 @@ public class CloneHistoricals implements CoordinatorDuty
 
       // Drop any segments missing from the clone source.
       for (DataSegment segment : targetProjectedSegments) {
-        if (!sourceProjectedSegments.contains(segment)
-            && loadQueueManager.dropSegment(segment, targetServer)) {
-          stats.add(
-              Stats.Segments.DROPPED_FROM_CLONE,
-              RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
-              1L
-          );
+        if (!sourceProjectedSegments.contains(segment)) {
+          dropSegmentFromTargetServer(segment, targetServer, params);
         }
       }
     }
@@ -154,6 +147,23 @@ public class CloneHistoricals implements CoordinatorDuty
     }
   }
 
+  private void dropSegmentFromTargetServer(
+      DataSegment segment,
+      ServerHolder targetServer,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    if (targetServer.isLoadingSegment(segment)) {
+      targetServer.cancelOperation(SegmentAction.LOAD, segment);
+    } else if (loadQueueManager.dropSegment(segment, targetServer)) {
+      params.getCoordinatorStats().add(
+          Stats.Segments.DROPPED_FROM_CLONE,
+          RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
+          1L
+      );
+    }
+  }
+
   /**
    * Returns a DataSegment with the correct value of loadSpec (as obtained from
    * metadata store). This method may return null if there is no snapshot 
available
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index 114b03da2b8..5dd5e9b192d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -127,5 +127,10 @@ public interface CoordinatorSimulation
      * Publishes the given segments to the cluster.
      */
     void addSegments(List<DataSegment> segments);
+
+    /**
+     * Deletes the given segments from the cluster.
+     */
+    void deleteSegments(List<DataSegment> segments);
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index be985fffca0..ca9c0750c93 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -150,6 +150,12 @@ public abstract class CoordinatorSimulationBaseTest 
implements
     sim.cluster().addSegments(segments);
   }
 
+  @Override
+  public void deleteSegments(List<DataSegment> segments)
+  {
+    sim.cluster().deleteSegments(segments);
+  }
+
   @Override
   public double getLoadPercentage(String datasource)
   {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 223a29e5d85..8d634923753 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -84,6 +84,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 /**
  * Builder for {@link CoordinatorSimulation}.
@@ -384,6 +385,16 @@ public class CoordinatorSimulationBuilder
       }
     }
 
+    @Override
+    public void deleteSegments(List<DataSegment> segments)
+    {
+      if (segments != null) {
+        env.segmentManager.markSegmentsAsUnused(
+            
segments.stream().map(DataSegment::getId).collect(Collectors.toSet())
+        );
+      }
+    }
+
     private void verifySimulationRunning()
     {
       if (!running.get()) {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
index 60be0e1496d..ed9846eb503 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
@@ -297,4 +297,49 @@ public class HistoricalCloningTest extends 
CoordinatorSimulationBaseTest
       Assert.assertEquals(segment, historicalT13.getSegment(segment.getId()));
     });
   }
+
+  @Test
+  public void test_loadsAreCancelledOnClone_ifSegmentsAreRemovedFromSource()
+  {
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(Segments.WIKI_10X1D)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 
1).forever())
+                             .withDynamicConfig(
+                                 CoordinatorDynamicConfig.builder()
+                                                         
.withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost()))
+                                                         
.withSmartSegmentLoading(true)
+                                                         .build()
+                             )
+                             .build();
+
+
+    // Load all segments on histT11
+    Segments.WIKI_10X1D.forEach(historicalT11::addDataSegment);
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    verifyValue(
+        Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(),
+        Map.of("server", historicalT12.getName()),
+        10L
+    );
+
+    // Remove some segments from histT11
+    deleteSegments(Segments.WIKI_10X1D.subList(0, 5));
+
+    // Verify that the loads are cancelled from the clone
+    runCoordinatorCycle();
+    verifyValue(
+        Metric.CANCELLED_ACTIONS,
+        Map.of("server", historicalT12.getName()),
+        5L
+    );
+
+    loadQueuedSegments();
+    Assert.assertEquals(5, historicalT11.getTotalSegments());
+    Assert.assertEquals(5, historicalT12.getTotalSegments());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to