This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 844a9c7ffb Cancel loads of unused segments (#14644)
844a9c7ffb is described below

commit 844a9c7ffb6c294d9bc73f73466520c5b3615b74
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Jul 31 18:01:50 2023 +0530

    Cancel loads of unused segments (#14644)
---
 .../druid/server/coordinator/DruidCluster.java     |  9 +-
 .../coordinator/duty/UnloadUnusedSegments.java     | 96 ++++++++++++++++------
 .../RoundRobinServerSelectorTest.java              |  6 +-
 .../simulate/CoordinatorSimulation.java            |  6 ++
 .../simulate/CoordinatorSimulationBaseTest.java    | 20 +++++
 .../simulate/CoordinatorSimulationBuilder.java     | 11 +++
 .../coordinator/simulate/SegmentLoadingTest.java   | 27 ++++++
 .../simulate/TestSegmentsMetadataManager.java      | 11 ++-
 8 files changed, 154 insertions(+), 32 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
index 34cd830285..da0b8e8b2e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
@@ -45,6 +45,7 @@ public class DruidCluster
   private final Set<ServerHolder> realtimes;
   private final Map<String, NavigableSet<ServerHolder>> historicals;
   private final Set<ServerHolder> brokers;
+  private final List<ServerHolder> allServers;
 
   private DruidCluster(
       Set<ServerHolder> realtimes,
@@ -58,6 +59,7 @@ public class DruidCluster
         holders -> CollectionUtils.newTreeSet(Comparator.naturalOrder(), 
holders)
     );
     this.brokers = Collections.unmodifiableSet(brokers);
+    this.allServers = initAllServers();
   }
 
   public Set<ServerHolder> getRealtimes()
@@ -85,7 +87,12 @@ public class DruidCluster
     return historicals.get(tier);
   }
 
-  public Collection<ServerHolder> getAllServers()
+  public List<ServerHolder> getAllServers()
+  {
+    return allServers;
+  }
+
+  private List<ServerHolder> initAllServers()
   {
     final int historicalSize = 
historicals.values().stream().mapToInt(Collection::size).sum();
     final int realtimeSize = realtimes.size();
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
index 7e7301cd17..8bdcee3641 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
@@ -31,8 +31,10 @@ import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Unloads segments that are no longer marked as used from servers.
@@ -56,53 +58,46 @@ public class UnloadUnusedSegments implements CoordinatorDuty
       broadcastStatusByDatasource.put(broadcastDatasource, true);
     }
 
+    final List<ServerHolder> allServers = 
params.getDruidCluster().getAllServers();
+    int numCancelledLoads = allServers.stream().mapToInt(
+        server -> cancelLoadOfUnusedSegments(server, 
broadcastStatusByDatasource, params)
+    ).sum();
+
     final CoordinatorRunStats stats = params.getCoordinatorStats();
-    params.getDruidCluster().getAllServers().forEach(
-        server -> handleUnusedSegmentsForServer(
-            server,
-            params,
-            stats,
-            broadcastStatusByDatasource
-        )
-    );
+    int numQueuedDrops = allServers.stream().mapToInt(
+        server -> dropUnusedSegments(server, params, stats, 
broadcastStatusByDatasource)
+    ).sum();
+
+    if (numCancelledLoads > 0 || numQueuedDrops > 0) {
+      log.info("Cancelled [%d] loads and started [%d] drops of unused 
segments.", numCancelledLoads, numQueuedDrops);
+    }
 
     return params;
   }
 
-  private void handleUnusedSegmentsForServer(
+  private int dropUnusedSegments(
       ServerHolder serverHolder,
       DruidCoordinatorRuntimeParams params,
       CoordinatorRunStats stats,
       Map<String, Boolean> broadcastStatusByDatasource
   )
   {
-    ImmutableDruidServer server = serverHolder.getServer();
+    final Set<DataSegment> usedSegments = params.getUsedSegments();
+
+    final AtomicInteger numQueuedDrops = new AtomicInteger(0);
+    final ImmutableDruidServer server = serverHolder.getServer();
     for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
-      boolean isBroadcastDatasource = 
broadcastStatusByDatasource.computeIfAbsent(
-          dataSource.getName(),
-          dataSourceName -> isBroadcastDatasource(dataSourceName, params)
-      );
-
-      // The coordinator tracks used segments by examining the metadata store.
-      // For tasks, the segments they create are unpublished, so those 
segments will get dropped
-      // unless we exclude them here. We currently drop only broadcast 
segments in that case.
-      // This check relies on the assumption that queryable stream tasks will 
never
-      // ingest data to a broadcast datasource. If a broadcast datasource is 
switched to become a non-broadcast
-      // datasource, this will result in the those segments not being dropped 
from tasks.
-      // A more robust solution which requires a larger rework could be to 
expose
-      // the set of segments that were created by a task/indexer here, and 
exclude them.
-      if (serverHolder.isRealtimeServer() && !isBroadcastDatasource) {
+      if (shouldSkipUnload(serverHolder, dataSource.getName(), 
broadcastStatusByDatasource, params)) {
         continue;
       }
 
       int totalUnneededCount = 0;
-      final Set<DataSegment> usedSegments = params.getUsedSegments();
       for (DataSegment segment : dataSource.getSegments()) {
         if (!usedSegments.contains(segment)
             && loadQueueManager.dropSegment(segment, serverHolder)) {
           totalUnneededCount++;
-          log.info(
-              "Dropping uneeded segment [%s] from server [%s] in tier [%s]",
+          log.debug(
+              "Dropping uneeded segment[%s] from server[%s] in tier[%s]",
               segment.getId(), server.getName(), server.getTier()
           );
         }
@@ -110,8 +105,55 @@ public class UnloadUnusedSegments implements 
CoordinatorDuty
 
       if (totalUnneededCount > 0) {
         stats.addToSegmentStat(Stats.Segments.UNNEEDED, server.getTier(), 
dataSource.getName(), totalUnneededCount);
+        numQueuedDrops.addAndGet(totalUnneededCount);
       }
     }
+
+    return numQueuedDrops.get();
+  }
+
+  private int cancelLoadOfUnusedSegments(
+      ServerHolder server,
+      Map<String, Boolean> broadcastStatusByDatasource,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    final Set<DataSegment> usedSegments = params.getUsedSegments();
+
+    final AtomicInteger cancelledOperations = new AtomicInteger(0);
+    server.getQueuedSegments().forEach((segment, action) -> {
+      if (shouldSkipUnload(server, segment.getDataSource(), 
broadcastStatusByDatasource, params)) {
+        // do nothing
+      } else if (usedSegments.contains(segment)) {
+        // do nothing
+      } else if (action.isLoad() && server.cancelOperation(action, segment)) {
+        cancelledOperations.incrementAndGet();
+      }
+    });
+
+    return cancelledOperations.get();
+  }
+
+  /**
+   * Returns true if the given server is a realtime server AND the datasource 
is
+   * NOT a broadcast datasource.
+   * <p>
+   * Realtime tasks work with unpublished segments and the tasks themselves are
+   * responsible for dropping those segments. However, segments belonging to a
+   * broadcast datasource should still be dropped by the Coordinator as 
realtime
+   * tasks do not ingest data to a broadcast datasource and are thus not
+   * responsible for the load/unload of those segments.
+   */
+  private boolean shouldSkipUnload(
+      ServerHolder server,
+      String dataSource,
+      Map<String, Boolean> broadcastStatusByDatasource,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    boolean isBroadcastDatasource = broadcastStatusByDatasource
+        .computeIfAbsent(dataSource, ds -> isBroadcastDatasource(ds, params));
+    return server.isRealtimeServer() && !isBroadcastDatasource;
   }
 
   /**
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java
similarity index 96%
rename from 
server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
rename to 
server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java
index 62f84b631a..a3c769ef96 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java
@@ -17,14 +17,14 @@
  * under the License.
  */
 
-package org.apache.druid.server.coordinator;
+package org.apache.druid.server.coordinator.loading;
 
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordinator.loading.RoundRobinServerSelector;
-import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
+import org.apache.druid.server.coordinator.DruidCluster;
+import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.junit.Assert;
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index c0565c19ba..e0dc5b128b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.simulate;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.java.util.metrics.MetricsVerifier;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.List;
@@ -74,6 +75,11 @@ public interface CoordinatorSimulation
      */
     void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig);
 
+    /**
+     * Sets the retention rules for the given datasource.
+     */
+    void setRetentionRules(String datasource, Rule... rules);
+
     /**
      * Gets the inventory view of the specified server as maintained by the
      * coordinator.
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index c9343da85a..984fe217af 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import org.apache.druid.server.coordinator.CreateDataSegments;
 import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.ForeverDropRule;
 import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.server.coordinator.stats.Dimension;
@@ -112,6 +113,12 @@ public abstract class CoordinatorSimulationBaseTest 
implements
     sim.coordinator().setDynamicConfig(dynamicConfig);
   }
 
+  @Override
+  public void setRetentionRules(String datasource, Rule... rules)
+  {
+    sim.coordinator().setRetentionRules(datasource, rules);
+  }
+
   @Override
   public void loadQueuedSegments()
   {
@@ -211,6 +218,8 @@ public abstract class CoordinatorSimulationBaseTest 
implements
     static final String ASSIGNED_COUNT = "segment/assigned/count";
     static final String MOVED_COUNT = "segment/moved/count";
     static final String DROPPED_COUNT = "segment/dropped/count";
+    static final String OVERSHADOWED_COUNT = "segment/overshadowed/count";
+    static final String DELETED_COUNT = "segment/deleted/count";
     static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count";
     static final String DROP_QUEUE_COUNT = "segment/dropQueue/count";
     static final String CANCELLED_ACTIONS = "segment/loadQueue/cancelled";
@@ -288,4 +297,15 @@ public abstract class CoordinatorSimulationBaseTest 
implements
       return new ForeverBroadcastDistributionRule();
     }
   }
+
+  /**
+   * Builder for a drop rule.
+   */
+  static class Drop
+  {
+    static Rule forever()
+    {
+      return new ForeverDropRule();
+    }
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 43f7bd9872..c5d5d94293 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.simulate;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import org.apache.druid.audit.AuditInfo;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.curator.discovery.ServiceAnnouncer;
@@ -330,6 +331,16 @@ public class CoordinatorSimulationBuilder
       env.setDynamicConfig(dynamicConfig);
     }
 
+    @Override
+    public void setRetentionRules(String datasource, Rule... rules)
+    {
+      env.ruleManager.overrideRule(
+          datasource,
+          Arrays.asList(rules),
+          new AuditInfo("sim", "sim", "localhost")
+      );
+    }
+
     @Override
     public DruidServer getInventoryView(String serverName)
     {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
index f0ea5cc9ae..4b7965e959 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
@@ -482,6 +482,33 @@ public class SegmentLoadingTest extends 
CoordinatorSimulationBaseTest
     Assert.assertEquals(0, historicalT12.getTotalSegments());
   }
 
+  @Test
+  public void testLoadOfUnusedSegmentIsCancelled()
+  {
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11)
+                             .withRules(datasource, Load.on(Tier.T1, 
1).forever())
+                             .build();
+
+    startSimulation(sim);
+
+    // Run 1: All segments are assigned
+    runCoordinatorCycle();
+    verifyValue(Metric.ASSIGNED_COUNT, 10L);
+
+    // Run 2: Update rules, all segments are marked as unused
+    setRetentionRules(datasource, Drop.forever());
+    runCoordinatorCycle();
+    verifyValue(Metric.DELETED_COUNT, 10L);
+
+    // Run 3: Loads of unused segments are cancelled
+    runCoordinatorCycle();
+    verifyValue(Metric.LOAD_QUEUE_COUNT, 0L);
+    verifyValue(Metric.CANCELLED_ACTIONS, 10L);
+  }
+
   @Test
   public void testSegmentsAreDroppedFromFullServersFirst()
   {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
index ac7f2dacb6..e0d1d887ef 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
@@ -127,13 +127,22 @@ public class TestSegmentsMetadataManager implements 
SegmentsMetadataManager
         ++numModifiedSegments;
       }
     }
+
+    if (numModifiedSegments > 0) {
+      snapshot = null;
+    }
     return numModifiedSegments;
   }
 
   @Override
   public boolean markSegmentAsUnused(SegmentId segmentId)
   {
-    return usedSegments.remove(segmentId.toString()) != null;
+    boolean updated = usedSegments.remove(segmentId.toString()) != null;
+    if (updated) {
+      snapshot = null;
+    }
+
+    return updated;
   }
 
   @Nullable


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to