This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4505a97e75e Cancel loads on clones if source server removes segment 
(#18283)
4505a97e75e is described below

commit 4505a97e75e02fdbe04adaeef192454fc031ef9a
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Jul 19 20:25:16 2025 +0530

    Cancel loads on clones if source server removes segment (#18283)
    
    Description:
    This patch is an improvement to historical cloning feature which avoids 
unnecessary downloads
    on the clone server by cancelling load of segments that have been removed 
from the source server.
    
    Changes:
    - Update `CloneHistoricals` duty to cancel segment load on clone if the 
segment is not present on source server
    - Add simulation test case
---
 .../server/coordinator/duty/CloneHistoricals.java  | 28 +++++++++-----
 .../simulate/CoordinatorSimulation.java            |  5 +++
 .../simulate/CoordinatorSimulationBaseTest.java    |  6 +++
 .../simulate/CoordinatorSimulationBuilder.java     | 11 ++++++
 .../simulate/HistoricalCloningTest.java            | 45 ++++++++++++++++++++++
 5 files changed, 86 insertions(+), 9 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
index fd4c293afc0..da201127f60 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
@@ -29,7 +29,6 @@ import org.apache.druid.server.coordinator.ServerCloneStatus;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.loading.SegmentAction;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
-import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Dimension;
 import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
@@ -66,7 +65,6 @@ public class CloneHistoricals implements CoordinatorDuty
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
     final Map<String, String> cloneServers = 
params.getCoordinatorDynamicConfig().getCloneServers();
-    final CoordinatorRunStats stats = params.getCoordinatorStats();
     final DruidCluster cluster = params.getDruidCluster();
 
     if (cloneServers.isEmpty()) {
@@ -111,13 +109,8 @@ public class CloneHistoricals implements CoordinatorDuty
 
       // Drop any segments missing from the clone source.
       for (DataSegment segment : targetProjectedSegments) {
-        if (!sourceProjectedSegments.contains(segment)
-            && loadQueueManager.dropSegment(segment, targetServer)) {
-          stats.add(
-              Stats.Segments.DROPPED_FROM_CLONE,
-              RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
-              1L
-          );
+        if (!sourceProjectedSegments.contains(segment)) {
+          dropSegmentFromTargetServer(segment, targetServer, params);
         }
       }
     }
@@ -154,6 +147,23 @@ public class CloneHistoricals implements CoordinatorDuty
     }
   }
 
+  private void dropSegmentFromTargetServer(
+      DataSegment segment,
+      ServerHolder targetServer,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    if (targetServer.isLoadingSegment(segment)) {
+      targetServer.cancelOperation(SegmentAction.LOAD, segment);
+    } else if (loadQueueManager.dropSegment(segment, targetServer)) {
+      params.getCoordinatorStats().add(
+          Stats.Segments.DROPPED_FROM_CLONE,
+          RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
+          1L
+      );
+    }
+  }
+
   /**
    * Returns a DataSegment with the correct value of loadSpec (as obtained from
    * metadata store). This method may return null if there is no snapshot 
available
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index 114b03da2b8..5dd5e9b192d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -127,5 +127,10 @@ public interface CoordinatorSimulation
      * Publishes the given segments to the cluster.
      */
     void addSegments(List<DataSegment> segments);
+
+    /**
+     * Deletes the given segments from the cluster.
+     */
+    void deleteSegments(List<DataSegment> segments);
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index be985fffca0..ca9c0750c93 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -150,6 +150,12 @@ public abstract class CoordinatorSimulationBaseTest 
implements
     sim.cluster().addSegments(segments);
   }
 
+  @Override
+  public void deleteSegments(List<DataSegment> segments)
+  {
+    sim.cluster().deleteSegments(segments);
+  }
+
   @Override
   public double getLoadPercentage(String datasource)
   {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 223a29e5d85..8d634923753 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -84,6 +84,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 /**
  * Builder for {@link CoordinatorSimulation}.
@@ -384,6 +385,16 @@ public class CoordinatorSimulationBuilder
       }
     }
 
+    @Override
+    public void deleteSegments(List<DataSegment> segments)
+    {
+      if (segments != null) {
+        env.segmentManager.markSegmentsAsUnused(
+            
segments.stream().map(DataSegment::getId).collect(Collectors.toSet())
+        );
+      }
+    }
+
     private void verifySimulationRunning()
     {
       if (!running.get()) {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
index 60be0e1496d..ed9846eb503 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
@@ -297,4 +297,49 @@ public class HistoricalCloningTest extends 
CoordinatorSimulationBaseTest
       Assert.assertEquals(segment, historicalT13.getSegment(segment.getId()));
     });
   }
+
+  @Test
+  public void test_loadsAreCancelledOnClone_ifSegmentsAreRemovedFromSource()
+  {
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(Segments.WIKI_10X1D)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 
1).forever())
+                             .withDynamicConfig(
+                                 CoordinatorDynamicConfig.builder()
+                                                         
.withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost()))
+                                                         
.withSmartSegmentLoading(true)
+                                                         .build()
+                             )
+                             .build();
+
+
+    // Load all segments on histT11
+    Segments.WIKI_10X1D.forEach(historicalT11::addDataSegment);
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    verifyValue(
+        Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(),
+        Map.of("server", historicalT12.getName()),
+        10L
+    );
+
+    // Remove some segments from histT11
+    deleteSegments(Segments.WIKI_10X1D.subList(0, 5));
+
+    // Verify that the loads are cancelled from the clone
+    runCoordinatorCycle();
+    verifyValue(
+        Metric.CANCELLED_ACTIONS,
+        Map.of("server", historicalT12.getName()),
+        5L
+    );
+
+    loadQueuedSegments();
+    Assert.assertEquals(5, historicalT11.getTotalSegments());
+    Assert.assertEquals(5, historicalT12.getTotalSegments());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to