This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4505a97e75e Cancel loads on clones if source server removes segment
(#18283)
4505a97e75e is described below
commit 4505a97e75e02fdbe04adaeef192454fc031ef9a
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Jul 19 20:25:16 2025 +0530
Cancel loads on clones if source server removes segment (#18283)
Description:
This patch is an improvement to historical cloning feature which avoids
unnecessary downloads
on the clone server by cancelling load of segments that have been removed
from the source server.
Changes:
- Update `CloneHistoricals` duty to cancel segment load on clone if the
segment is not present on source server
- Add simulation test case
---
.../server/coordinator/duty/CloneHistoricals.java | 28 +++++++++-----
.../simulate/CoordinatorSimulation.java | 5 +++
.../simulate/CoordinatorSimulationBaseTest.java | 6 +++
.../simulate/CoordinatorSimulationBuilder.java | 11 ++++++
.../simulate/HistoricalCloningTest.java | 45 ++++++++++++++++++++++
5 files changed, 86 insertions(+), 9 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
index fd4c293afc0..da201127f60 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
@@ -29,7 +29,6 @@ import org.apache.druid.server.coordinator.ServerCloneStatus;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
-import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
@@ -66,7 +65,6 @@ public class CloneHistoricals implements CoordinatorDuty
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
{
final Map<String, String> cloneServers =
params.getCoordinatorDynamicConfig().getCloneServers();
- final CoordinatorRunStats stats = params.getCoordinatorStats();
final DruidCluster cluster = params.getDruidCluster();
if (cloneServers.isEmpty()) {
@@ -111,13 +109,8 @@ public class CloneHistoricals implements CoordinatorDuty
// Drop any segments missing from the clone source.
for (DataSegment segment : targetProjectedSegments) {
- if (!sourceProjectedSegments.contains(segment)
- && loadQueueManager.dropSegment(segment, targetServer)) {
- stats.add(
- Stats.Segments.DROPPED_FROM_CLONE,
- RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
- 1L
- );
+ if (!sourceProjectedSegments.contains(segment)) {
+ dropSegmentFromTargetServer(segment, targetServer, params);
}
}
}
@@ -154,6 +147,23 @@ public class CloneHistoricals implements CoordinatorDuty
}
}
+ private void dropSegmentFromTargetServer(
+ DataSegment segment,
+ ServerHolder targetServer,
+ DruidCoordinatorRuntimeParams params
+ )
+ {
+ if (targetServer.isLoadingSegment(segment)) {
+ targetServer.cancelOperation(SegmentAction.LOAD, segment);
+ } else if (loadQueueManager.dropSegment(segment, targetServer)) {
+ params.getCoordinatorStats().add(
+ Stats.Segments.DROPPED_FROM_CLONE,
+ RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
+ 1L
+ );
+ }
+ }
+
/**
* Returns a DataSegment with the correct value of loadSpec (as obtained from
* metadata store). This method may return null if there is no snapshot
available
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index 114b03da2b8..5dd5e9b192d 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -127,5 +127,10 @@ public interface CoordinatorSimulation
* Publishes the given segments to the cluster.
*/
void addSegments(List<DataSegment> segments);
+
+ /**
+ * Deletes the given segments from the cluster.
+ */
+ void deleteSegments(List<DataSegment> segments);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index be985fffca0..ca9c0750c93 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -150,6 +150,12 @@ public abstract class CoordinatorSimulationBaseTest
implements
sim.cluster().addSegments(segments);
}
+ @Override
+ public void deleteSegments(List<DataSegment> segments)
+ {
+ sim.cluster().deleteSegments(segments);
+ }
+
@Override
public double getLoadPercentage(String datasource)
{
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 223a29e5d85..8d634923753 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -84,6 +84,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
/**
* Builder for {@link CoordinatorSimulation}.
@@ -384,6 +385,16 @@ public class CoordinatorSimulationBuilder
}
}
+ @Override
+ public void deleteSegments(List<DataSegment> segments)
+ {
+ if (segments != null) {
+ env.segmentManager.markSegmentsAsUnused(
+
segments.stream().map(DataSegment::getId).collect(Collectors.toSet())
+ );
+ }
+ }
+
private void verifySimulationRunning()
{
if (!running.get()) {
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
index 60be0e1496d..ed9846eb503 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
@@ -297,4 +297,49 @@ public class HistoricalCloningTest extends
CoordinatorSimulationBaseTest
Assert.assertEquals(segment, historicalT13.getSegment(segment.getId()));
});
}
+
+ @Test
+ public void test_loadsAreCancelledOnClone_ifSegmentsAreRemovedFromSource()
+ {
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(Segments.WIKI_10X1D)
+ .withServers(historicalT11, historicalT12)
+ .withRules(datasource, Load.on(Tier.T1,
1).forever())
+ .withDynamicConfig(
+ CoordinatorDynamicConfig.builder()
+
.withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost()))
+
.withSmartSegmentLoading(true)
+ .build()
+ )
+ .build();
+
+
+ // Load all segments on histT11
+ Segments.WIKI_10X1D.forEach(historicalT11::addDataSegment);
+
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ verifyValue(
+ Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(),
+ Map.of("server", historicalT12.getName()),
+ 10L
+ );
+
+ // Remove some segments from histT11
+ deleteSegments(Segments.WIKI_10X1D.subList(0, 5));
+
+ // Verify that the loads are cancelled from the clone
+ runCoordinatorCycle();
+ verifyValue(
+ Metric.CANCELLED_ACTIONS,
+ Map.of("server", historicalT12.getName()),
+ 5L
+ );
+
+ loadQueuedSegments();
+ Assert.assertEquals(5, historicalT11.getTotalSegments());
+ Assert.assertEquals(5, historicalT12.getTotalSegments());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]