This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 844a9c7ffb Cancel loads of unused segments (#14644)
844a9c7ffb is described below
commit 844a9c7ffb6c294d9bc73f73466520c5b3615b74
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Jul 31 18:01:50 2023 +0530
Cancel loads of unused segments (#14644)
---
.../druid/server/coordinator/DruidCluster.java | 9 +-
.../coordinator/duty/UnloadUnusedSegments.java | 96 ++++++++++++++++------
.../RoundRobinServerSelectorTest.java | 6 +-
.../simulate/CoordinatorSimulation.java | 6 ++
.../simulate/CoordinatorSimulationBaseTest.java | 20 +++++
.../simulate/CoordinatorSimulationBuilder.java | 11 +++
.../coordinator/simulate/SegmentLoadingTest.java | 27 ++++++
.../simulate/TestSegmentsMetadataManager.java | 11 ++-
8 files changed, 154 insertions(+), 32 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
index 34cd830285..da0b8e8b2e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
@@ -45,6 +45,7 @@ public class DruidCluster
private final Set<ServerHolder> realtimes;
private final Map<String, NavigableSet<ServerHolder>> historicals;
private final Set<ServerHolder> brokers;
+ private final List<ServerHolder> allServers;
private DruidCluster(
Set<ServerHolder> realtimes,
@@ -58,6 +59,7 @@ public class DruidCluster
holders -> CollectionUtils.newTreeSet(Comparator.naturalOrder(),
holders)
);
this.brokers = Collections.unmodifiableSet(brokers);
+ this.allServers = initAllServers();
}
public Set<ServerHolder> getRealtimes()
@@ -85,7 +87,12 @@ public class DruidCluster
return historicals.get(tier);
}
- public Collection<ServerHolder> getAllServers()
+ public List<ServerHolder> getAllServers()
+ {
+ return allServers;
+ }
+
+ private List<ServerHolder> initAllServers()
{
final int historicalSize =
historicals.values().stream().mapToInt(Collection::size).sum();
final int realtimeSize = realtimes.size();
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
index 7e7301cd17..8bdcee3641 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
@@ -31,8 +31,10 @@ import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Unloads segments that are no longer marked as used from servers.
@@ -56,53 +58,46 @@ public class UnloadUnusedSegments implements CoordinatorDuty
broadcastStatusByDatasource.put(broadcastDatasource, true);
}
+ final List<ServerHolder> allServers =
params.getDruidCluster().getAllServers();
+ int numCancelledLoads = allServers.stream().mapToInt(
+ server -> cancelLoadOfUnusedSegments(server,
broadcastStatusByDatasource, params)
+ ).sum();
+
final CoordinatorRunStats stats = params.getCoordinatorStats();
- params.getDruidCluster().getAllServers().forEach(
- server -> handleUnusedSegmentsForServer(
- server,
- params,
- stats,
- broadcastStatusByDatasource
- )
- );
+ int numQueuedDrops = allServers.stream().mapToInt(
+ server -> dropUnusedSegments(server, params, stats,
broadcastStatusByDatasource)
+ ).sum();
+
+ if (numCancelledLoads > 0 || numQueuedDrops > 0) {
+ log.info("Cancelled [%d] loads and started [%d] drops of unused
segments.", numCancelledLoads, numQueuedDrops);
+ }
return params;
}
- private void handleUnusedSegmentsForServer(
+ private int dropUnusedSegments(
ServerHolder serverHolder,
DruidCoordinatorRuntimeParams params,
CoordinatorRunStats stats,
Map<String, Boolean> broadcastStatusByDatasource
)
{
- ImmutableDruidServer server = serverHolder.getServer();
+ final Set<DataSegment> usedSegments = params.getUsedSegments();
+
+ final AtomicInteger numQueuedDrops = new AtomicInteger(0);
+ final ImmutableDruidServer server = serverHolder.getServer();
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
- boolean isBroadcastDatasource =
broadcastStatusByDatasource.computeIfAbsent(
- dataSource.getName(),
- dataSourceName -> isBroadcastDatasource(dataSourceName, params)
- );
-
- // The coordinator tracks used segments by examining the metadata store.
- // For tasks, the segments they create are unpublished, so those
segments will get dropped
- // unless we exclude them here. We currently drop only broadcast
segments in that case.
- // This check relies on the assumption that queryable stream tasks will
never
- // ingest data to a broadcast datasource. If a broadcast datasource is
switched to become a non-broadcast
- // datasource, this will result in the those segments not being dropped
from tasks.
- // A more robust solution which requires a larger rework could be to
expose
- // the set of segments that were created by a task/indexer here, and
exclude them.
- if (serverHolder.isRealtimeServer() && !isBroadcastDatasource) {
+ if (shouldSkipUnload(serverHolder, dataSource.getName(),
broadcastStatusByDatasource, params)) {
continue;
}
int totalUnneededCount = 0;
- final Set<DataSegment> usedSegments = params.getUsedSegments();
for (DataSegment segment : dataSource.getSegments()) {
if (!usedSegments.contains(segment)
&& loadQueueManager.dropSegment(segment, serverHolder)) {
totalUnneededCount++;
- log.info(
- "Dropping uneeded segment [%s] from server [%s] in tier [%s]",
+ log.debug(
+ "Dropping uneeded segment[%s] from server[%s] in tier[%s]",
segment.getId(), server.getName(), server.getTier()
);
}
@@ -110,8 +105,55 @@ public class UnloadUnusedSegments implements
CoordinatorDuty
if (totalUnneededCount > 0) {
stats.addToSegmentStat(Stats.Segments.UNNEEDED, server.getTier(),
dataSource.getName(), totalUnneededCount);
+ numQueuedDrops.addAndGet(totalUnneededCount);
}
}
+
+ return numQueuedDrops.get();
+ }
+
+ private int cancelLoadOfUnusedSegments(
+ ServerHolder server,
+ Map<String, Boolean> broadcastStatusByDatasource,
+ DruidCoordinatorRuntimeParams params
+ )
+ {
+ final Set<DataSegment> usedSegments = params.getUsedSegments();
+
+ final AtomicInteger cancelledOperations = new AtomicInteger(0);
+ server.getQueuedSegments().forEach((segment, action) -> {
+ if (shouldSkipUnload(server, segment.getDataSource(),
broadcastStatusByDatasource, params)) {
+ // do nothing
+ } else if (usedSegments.contains(segment)) {
+ // do nothing
+ } else if (action.isLoad() && server.cancelOperation(action, segment)) {
+ cancelledOperations.incrementAndGet();
+ }
+ });
+
+ return cancelledOperations.get();
+ }
+
+ /**
+ * Returns true if the given server is a realtime server AND the datasource
is
+ * NOT a broadcast datasource.
+ * <p>
+ * Realtime tasks work with unpublished segments and the tasks themselves are
+ * responsible for dropping those segments. However, segments belonging to a
+ * broadcast datasource should still be dropped by the Coordinator as
realtime
+ * tasks do not ingest data to a broadcast datasource and are thus not
+ * responsible for the load/unload of those segments.
+ */
+ private boolean shouldSkipUnload(
+ ServerHolder server,
+ String dataSource,
+ Map<String, Boolean> broadcastStatusByDatasource,
+ DruidCoordinatorRuntimeParams params
+ )
+ {
+ boolean isBroadcastDatasource = broadcastStatusByDatasource
+ .computeIfAbsent(dataSource, ds -> isBroadcastDatasource(ds, params));
+ return server.isRealtimeServer() && !isBroadcastDatasource;
}
/**
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java
similarity index 96%
rename from
server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
rename to
server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java
index 62f84b631a..a3c769ef96 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java
@@ -17,14 +17,14 @@
* under the License.
*/
-package org.apache.druid.server.coordinator;
+package org.apache.druid.server.coordinator.loading;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordinator.loading.RoundRobinServerSelector;
-import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
+import org.apache.druid.server.coordinator.DruidCluster;
+import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index c0565c19ba..e0dc5b128b 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.simulate;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.metrics.MetricsVerifier;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;
import java.util.List;
@@ -74,6 +75,11 @@ public interface CoordinatorSimulation
*/
void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig);
+ /**
+ * Sets the retention rules for the given datasource.
+ */
+ void setRetentionRules(String datasource, Rule... rules);
+
/**
* Gets the inventory view of the specified server as maintained by the
* coordinator.
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index c9343da85a..984fe217af 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.CreateDataSegments;
import
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.ForeverDropRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.coordinator.stats.Dimension;
@@ -112,6 +113,12 @@ public abstract class CoordinatorSimulationBaseTest
implements
sim.coordinator().setDynamicConfig(dynamicConfig);
}
+ @Override
+ public void setRetentionRules(String datasource, Rule... rules)
+ {
+ sim.coordinator().setRetentionRules(datasource, rules);
+ }
+
@Override
public void loadQueuedSegments()
{
@@ -211,6 +218,8 @@ public abstract class CoordinatorSimulationBaseTest
implements
static final String ASSIGNED_COUNT = "segment/assigned/count";
static final String MOVED_COUNT = "segment/moved/count";
static final String DROPPED_COUNT = "segment/dropped/count";
+ static final String OVERSHADOWED_COUNT = "segment/overshadowed/count";
+ static final String DELETED_COUNT = "segment/deleted/count";
static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count";
static final String DROP_QUEUE_COUNT = "segment/dropQueue/count";
static final String CANCELLED_ACTIONS = "segment/loadQueue/cancelled";
@@ -288,4 +297,15 @@ public abstract class CoordinatorSimulationBaseTest
implements
return new ForeverBroadcastDistributionRule();
}
}
+
+ /**
+ * Builder for a drop rule.
+ */
+ static class Drop
+ {
+ static Rule forever()
+ {
+ return new ForeverDropRule();
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 43f7bd9872..c5d5d94293 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.simulate;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import org.apache.druid.audit.AuditInfo;
import org.apache.druid.client.DruidServer;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
@@ -330,6 +331,16 @@ public class CoordinatorSimulationBuilder
env.setDynamicConfig(dynamicConfig);
}
+ @Override
+ public void setRetentionRules(String datasource, Rule... rules)
+ {
+ env.ruleManager.overrideRule(
+ datasource,
+ Arrays.asList(rules),
+ new AuditInfo("sim", "sim", "localhost")
+ );
+ }
+
@Override
public DruidServer getInventoryView(String serverName)
{
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
index f0ea5cc9ae..4b7965e959 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
@@ -482,6 +482,33 @@ public class SegmentLoadingTest extends
CoordinatorSimulationBaseTest
Assert.assertEquals(0, historicalT12.getTotalSegments());
}
+ @Test
+ public void testLoadOfUnusedSegmentIsCancelled()
+ {
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11)
+ .withRules(datasource, Load.on(Tier.T1,
1).forever())
+ .build();
+
+ startSimulation(sim);
+
+ // Run 1: All segments are assigned
+ runCoordinatorCycle();
+ verifyValue(Metric.ASSIGNED_COUNT, 10L);
+
+ // Run 2: Update rules, all segments are marked as unused
+ setRetentionRules(datasource, Drop.forever());
+ runCoordinatorCycle();
+ verifyValue(Metric.DELETED_COUNT, 10L);
+
+ // Run 3: Loads of unused segments are cancelled
+ runCoordinatorCycle();
+ verifyValue(Metric.LOAD_QUEUE_COUNT, 0L);
+ verifyValue(Metric.CANCELLED_ACTIONS, 10L);
+ }
+
@Test
public void testSegmentsAreDroppedFromFullServersFirst()
{
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
index ac7f2dacb6..e0d1d887ef 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
@@ -127,13 +127,22 @@ public class TestSegmentsMetadataManager implements
SegmentsMetadataManager
++numModifiedSegments;
}
}
+
+ if (numModifiedSegments > 0) {
+ snapshot = null;
+ }
return numModifiedSegments;
}
@Override
public boolean markSegmentAsUnused(SegmentId segmentId)
{
- return usedSegments.remove(segmentId.toString()) != null;
+ boolean updated = usedSegments.remove(segmentId.toString()) != null;
+ if (updated) {
+ snapshot = null;
+ }
+
+ return updated;
}
@Nullable
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]