kfaraz commented on code in PR #16719:
URL: https://github.com/apache/druid/pull/16719#discussion_r1673268762


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -75,14 +77,23 @@ public class KillUnusedSegments implements CoordinatorDuty
   private final Duration durationToRetain;
   private final boolean ignoreDurationToRetain;
   private final int maxSegmentsToKill;
+  private final Duration bufferPeriod;
 
   /**
    * Used to keep track of the last interval end time that was killed for each
    * datasource.
    */
   private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
+
+  /**
+   * State that is maintained in the duty to determine if the duty needs to 
run or not.
+   */

Review Comment:
   I am not sure if the new javadocs for the private fields are really adding 
any value.
   The field names are self-explanatory and it is trivial to check how they are 
being used.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.error.InvalidInput;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A round-robin iterator that is backed by an ordered list of candidates 
containing no duplicates. The iterator
+ * iterates endlessly over all the candidates. The caller must explicitly 
terminate it.
+ * The iterator has the following properties:
+ * <ul>
+ *   <li> Starts with an initial random cursor position in an ordered list of 
candidates. </li>
+ *   <li> Invoking {@code next()} on {@link #getIterator()} is guaranteed to 
result in a deterministic order
+ *   unless the set of candidates change when {@link #updateCandidates(Set)} 
is called. When the candidates change,
+ *   the cursor is reset to a random position in the new list of ordered 
candidates. </li>
+ *   <li> Guarantees that no duplicate candidates are returned in two 
consecutive {@code next()} iterations. </li>
+ * </ul>
+ *
+ * <p>
+ * This class is not thread-safe and must be used from a single thread.
+ */
+@NotThreadSafe
+public class RoundRobinIterator

Review Comment:
   Some suggestions to make this class nicer:
   
   1. The iteration aspect of this class seems vaguely similar to 
`RoundRobinServerSelector.CircularServerList`. Let's just pull out 
`CircularServerList` as a generic class `CircularList<T>` and put it in the 
package `org.apache.druid.collections`.
   2. Make `CircularList` implement `Iterable`.
   3. Ideally, this class would have no public methods right now, but since 
`RoundRobinServerSelector` is already using `peekNext()` and `advanceCursor()`, 
let's keep them for now. I can later take a look at `EligibleServerSelector` to 
see if we can't do without these methods.
   4.  I am not entirely sold on requiring to reset the cursor to a random 
position (it would be nice to have a unit test that verifies the spread after 
say 1000 runs and compare when we reset to random and don't reset at all).
     - Either way, the `CircularList` should expose a method `resetCursor(int 
position)`. The randomness should not be baked into this class and should 
instead be provided by the caller (in this case, the kill duty).
     - Sorting and randomization should be mutually exclusive. If we are going 
to randomize, there is no point in sorting the set.
   5. We can also add a method `equalsSet(Set<>)`.
   6. The requirement to avoid duplicates should be a part of the caller, i.e. 
kill duty. (I recall having suggested it to be a part of the 
`RoundRobinIterator` but this seems cleaner now 😛 ).
   
   After all of this, the kill duty would look as follows:
   
   
   ```java
   private String prevDatasourceKilled;
   
   // I don't like this variable name myself, if something better comes to 
mind, use it
   private CircularList<String> datasourceCircularKillList;
   
   public void run()
   {
      ...
      ...
      // This goes where we identify the datasourcesToKill in the duty
      final Set<String> datasourcesToKill = ...;
      if (datasourceCircularKillList == null ||
            !datasourceCircularKillList.equalsSet(datasourcesToKill)) {
          datasourceCircularKillList = new CircularList<>(datasourcesToKill);
          
          // optional
          datasourceCircularKillList.resetCursor(randomPosition);
      }
      ...
      ...
      // This goes where we iterate over the circular list
      for (String datasource : datasourceCircularKillList) {
          if (datasource.equals(prevDatasourceKilled)) {
             continue;
          } else {
             prevDatasourceKilled = datasource;
          }
          
          // Do the rest of the kill processing
      }
      ...
      ...
   }
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -141,18 +162,24 @@ private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeP
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
     final int availableKillTaskSlots = 
getAvailableKillTaskSlots(dynamicConfig, stats);
-    Collection<String> dataSourcesToKill = 
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
-
-    if (availableKillTaskSlots > 0) {
-      // If no datasource has been specified, all are eligible for killing 
unused segments
-      if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-        dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
-      }
+    if (availableKillTaskSlots <= 0) {
+      log.info("Skipping KillUnusedSegments because there are no available 
kill task slots.");
+      return params;
+    }
 
-      lastKillTime = DateTimes.nowUtc();
-      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
+    final Set<String> dataSourcesToKill;
+    if 
(!CollectionUtils.isNullOrEmpty(dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()))
 {

Review Comment:
   Please invert the condition for readability.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -141,18 +162,24 @@ private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeP
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
     final int availableKillTaskSlots = 
getAvailableKillTaskSlots(dynamicConfig, stats);
-    Collection<String> dataSourcesToKill = 
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
-
-    if (availableKillTaskSlots > 0) {
-      // If no datasource has been specified, all are eligible for killing 
unused segments
-      if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-        dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
-      }
+    if (availableKillTaskSlots <= 0) {
+      log.info("Skipping KillUnusedSegments because there are no available 
kill task slots.");

Review Comment:
   This log can become verbose too. Once we have submitted all the kill tasks 
that we can, it is going to be a while before slots free up. So, if kill period 
is low, we might see many of these logs.
   
   I would advise leaving out all the new info logs for now.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -141,18 +162,24 @@ private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeP
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
     final int availableKillTaskSlots = 
getAvailableKillTaskSlots(dynamicConfig, stats);
-    Collection<String> dataSourcesToKill = 
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
-
-    if (availableKillTaskSlots > 0) {
-      // If no datasource has been specified, all are eligible for killing 
unused segments
-      if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-        dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
-      }
+    if (availableKillTaskSlots <= 0) {
+      log.info("Skipping KillUnusedSegments because there are no available 
kill task slots.");
+      return params;
+    }
 
-      lastKillTime = DateTimes.nowUtc();
-      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
+    final Set<String> dataSourcesToKill;
+    if 
(!CollectionUtils.isNullOrEmpty(dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()))
 {
+      dataSourcesToKill = 
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
+    } else {
+      // If no datasource has been specified, all are eligible for killing 
unused segments by default

Review Comment:
   Feels redundant as the code itself is doing exactly that.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -163,30 +190,44 @@ private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeP
    * Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto 
{@code availableKillTaskSlots}.
    */
   private void killUnusedSegments(
-      @Nullable final Collection<String> dataSourcesToKill,
+      final Set<String> dataSourcesToKill,
       final int availableKillTaskSlots,
       final CoordinatorRunStats stats
   )
   {
-    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || 
availableKillTaskSlots <= 0) {
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+      log.info("Skipping KillUnusedSegments because there are no datasources 
to kill.");
       stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
       return;
     }
+    final Iterator<String> dataSourcesToKillIterator = 
this.datasourceIterator.getIterator();
+    final Set<String> remainingDatasourcesToKill = new 
HashSet<>(dataSourcesToKill);
+    final Set<String> datasourcesKilled = new HashSet<>();

Review Comment:
   It is redundant to maintain both these sets. Let's just keep 
`datasourcesKilled`, we can build the other from it as it is only needed once 
at the end.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -75,14 +77,23 @@ public class KillUnusedSegments implements CoordinatorDuty
   private final Duration durationToRetain;
   private final boolean ignoreDurationToRetain;
   private final int maxSegmentsToKill;
+  private final Duration bufferPeriod;
 
   /**
    * Used to keep track of the last interval end time that was killed for each
    * datasource.
    */
   private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
+
+  /**
+   * State that is maintained in the duty to determine if the duty needs to 
run or not.
+   */
   private DateTime lastKillTime;
-  private final Duration bufferPeriod;
+
+  /**
+   * Round-robin iterator of the datasources to kill. It's updated in every 
run by the duty.
+   */
+  private final RoundRobinIterator datasourceIterator;

Review Comment:
   Good to mark the new class as not thread-safe. But it is not required to be 
thread-safe as a duty should never run on multiple threads (`CompactSegments` 
is the only one that could potentially run concurrently if manual compaction is 
triggered).



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -127,7 +148,7 @@ public DruidCoordinatorRuntimeParams run(final 
DruidCoordinatorRuntimeParams par
     if (canDutyRun()) {
       return runInternal(params);
     } else {
-      log.debug(
+      log.info(

Review Comment:
   +1 on keeping this as debug.
   
   While testing, it is natural to want more visibility but that can simply be 
achieved by setting log level to debug.
   
   In production, we should try that:
   - redundant logs of normal operation are avoided
   - all important warn or error information is included
   
   I plan to revisit other duties too to reduce redundant logging.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -163,30 +190,44 @@ private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeP
    * Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto 
{@code availableKillTaskSlots}.
    */
   private void killUnusedSegments(
-      @Nullable final Collection<String> dataSourcesToKill,
+      final Set<String> dataSourcesToKill,
       final int availableKillTaskSlots,
       final CoordinatorRunStats stats
   )
   {
-    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || 
availableKillTaskSlots <= 0) {
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+      log.info("Skipping KillUnusedSegments because there are no datasources 
to kill.");
       stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
       return;
     }
+    final Iterator<String> dataSourcesToKillIterator = 
this.datasourceIterator.getIterator();
+    final Set<String> remainingDatasourcesToKill = new 
HashSet<>(dataSourcesToKill);
+    final Set<String> datasourcesKilled = new HashSet<>();
 
-    final Collection<String> remainingDatasourcesToKill = new 
ArrayList<>(dataSourcesToKill);
     int submittedTasks = 0;
-    for (String dataSource : dataSourcesToKill) {
+    while (dataSourcesToKillIterator.hasNext()) {

Review Comment:
   > I tried to find documentation on how the duties operate but couldn't find 
much information. I think adding some code/public-facing documentation on this 
would help clarify this. :-)
   
   Nah, it is really an implementation detail that only devs need to be privy 
to.
   
   In `DruidCoordinator`, every duty group has a dedicated single-threaded 
executor. See `DruidCoordinator.getOrCreateDutyGroupExecutors()`. We can add a 
comment there asserting that every executor __must always be single-threaded__.
   
   Note: Until a couple of releases ago, all the duties ran on a single thread, 
so even less concurrency 😂 .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to