This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch 34.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/34.0.0 by this push:
new d8da0e1f0a5 Cancel loads on clones if source server removes segment
(#18283) (#18296)
d8da0e1f0a5 is described below
commit d8da0e1f0a5a8c01237832150d5203a0c9615770
Author: Lucas Capistrant <[email protected]>
AuthorDate: Sun Jul 20 10:32:45 2025 -0500
Cancel loads on clones if source server removes segment (#18283) (#18296)
---
.../server/coordinator/duty/CloneHistoricals.java | 28 +++++++++-----
.../simulate/CoordinatorSimulation.java | 5 +++
.../simulate/CoordinatorSimulationBaseTest.java | 6 +++
.../simulate/CoordinatorSimulationBuilder.java | 11 ++++++
.../simulate/HistoricalCloningTest.java | 45 ++++++++++++++++++++++
5 files changed, 86 insertions(+), 9 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
index fd4c293afc0..da201127f60 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
@@ -29,7 +29,6 @@ import org.apache.druid.server.coordinator.ServerCloneStatus;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
-import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
@@ -66,7 +65,6 @@ public class CloneHistoricals implements CoordinatorDuty
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
{
final Map<String, String> cloneServers =
params.getCoordinatorDynamicConfig().getCloneServers();
- final CoordinatorRunStats stats = params.getCoordinatorStats();
final DruidCluster cluster = params.getDruidCluster();
if (cloneServers.isEmpty()) {
@@ -111,13 +109,8 @@ public class CloneHistoricals implements CoordinatorDuty
// Drop any segments missing from the clone source.
for (DataSegment segment : targetProjectedSegments) {
- if (!sourceProjectedSegments.contains(segment)
- && loadQueueManager.dropSegment(segment, targetServer)) {
- stats.add(
- Stats.Segments.DROPPED_FROM_CLONE,
- RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
- 1L
- );
+ if (!sourceProjectedSegments.contains(segment)) {
+ dropSegmentFromTargetServer(segment, targetServer, params);
}
}
}
@@ -154,6 +147,23 @@ public class CloneHistoricals implements CoordinatorDuty
}
}
+ private void dropSegmentFromTargetServer(
+ DataSegment segment,
+ ServerHolder targetServer,
+ DruidCoordinatorRuntimeParams params
+ )
+ {
+ if (targetServer.isLoadingSegment(segment)) {
+ targetServer.cancelOperation(SegmentAction.LOAD, segment);
+ } else if (loadQueueManager.dropSegment(segment, targetServer)) {
+ params.getCoordinatorStats().add(
+ Stats.Segments.DROPPED_FROM_CLONE,
+ RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
+ 1L
+ );
+ }
+ }
+
/**
* Returns a DataSegment with the correct value of loadSpec (as obtained from
* metadata store). This method may return null if there is no snapshot
available
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index 114b03da2b8..5dd5e9b192d 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -127,5 +127,10 @@ public interface CoordinatorSimulation
* Publishes the given segments to the cluster.
*/
void addSegments(List<DataSegment> segments);
+
+ /**
+ * Deletes the given segments from the cluster.
+ */
+ void deleteSegments(List<DataSegment> segments);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index be985fffca0..ca9c0750c93 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -150,6 +150,12 @@ public abstract class CoordinatorSimulationBaseTest
implements
sim.cluster().addSegments(segments);
}
+ @Override
+ public void deleteSegments(List<DataSegment> segments)
+ {
+ sim.cluster().deleteSegments(segments);
+ }
+
@Override
public double getLoadPercentage(String datasource)
{
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 223a29e5d85..8d634923753 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -84,6 +84,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
/**
* Builder for {@link CoordinatorSimulation}.
@@ -384,6 +385,16 @@ public class CoordinatorSimulationBuilder
}
}
+ @Override
+ public void deleteSegments(List<DataSegment> segments)
+ {
+ if (segments != null) {
+ env.segmentManager.markSegmentsAsUnused(
+
segments.stream().map(DataSegment::getId).collect(Collectors.toSet())
+ );
+ }
+ }
+
private void verifySimulationRunning()
{
if (!running.get()) {
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
index 60be0e1496d..ed9846eb503 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java
@@ -297,4 +297,49 @@ public class HistoricalCloningTest extends
CoordinatorSimulationBaseTest
Assert.assertEquals(segment, historicalT13.getSegment(segment.getId()));
});
}
+
+ @Test
+ public void test_loadsAreCancelledOnClone_ifSegmentsAreRemovedFromSource()
+ {
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(Segments.WIKI_10X1D)
+ .withServers(historicalT11, historicalT12)
+ .withRules(datasource, Load.on(Tier.T1,
1).forever())
+ .withDynamicConfig(
+ CoordinatorDynamicConfig.builder()
+
.withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost()))
+
.withSmartSegmentLoading(true)
+ .build()
+ )
+ .build();
+
+
+ // Load all segments on histT11
+ Segments.WIKI_10X1D.forEach(historicalT11::addDataSegment);
+
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ verifyValue(
+ Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(),
+ Map.of("server", historicalT12.getName()),
+ 10L
+ );
+
+ // Remove some segments from histT11
+ deleteSegments(Segments.WIKI_10X1D.subList(0, 5));
+
+ // Verify that the loads are cancelled from the clone
+ runCoordinatorCycle();
+ verifyValue(
+ Metric.CANCELLED_ACTIONS,
+ Map.of("server", historicalT12.getName()),
+ 5L
+ );
+
+ loadQueuedSegments();
+ Assert.assertEquals(5, historicalT11.getTotalSegments());
+ Assert.assertEquals(5, historicalT12.getTotalSegments());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]