kfaraz commented on code in PR #16719:
URL: https://github.com/apache/druid/pull/16719#discussion_r1693055681


##########
processing/src/main/java/org/apache/druid/collections/CircularList.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A circular list that is backed by an ordered list of elements containing no 
duplicates. The list is ordered by the
+ * supplied comparator. Callers are responsible for terminating the iterator 
explicitly.
+ * <p>
+ * This class is not thread-safe and must be used from a single thread.
+ */
+@NotThreadSafe
+public class CircularList<T> implements Iterable<T>
+{
+  private final List<T> collection = new ArrayList<>();
+  private final Comparator<? super T> comparator;
+  private int currentPosition;
+
+  public CircularList(final Set<T> elements, Comparator<? super T> comparator)
+  {
+    this.collection.addAll(elements);
+    this.comparator = comparator;
+    this.collection.sort(comparator);
+  }
+
+  @Override
+  public Iterator<T> iterator()
+  {
+    return new Iterator<T>()
+    {
+      @Override
+      public boolean hasNext()
+      {
+        return collection.size() > 0;
+      }
+
+      @Override
+      public T next()
+      {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+
+        T nextCandidate = peekNext();
+        advanceCursor();
+        return nextCandidate;
+      }
+
+      private T peekNext()
+      {
+        int nextPosition = currentPosition < collection.size() ? 
currentPosition : 0;
+        return collection.get(nextPosition);
+      }
+
+      private void advanceCursor()
+      {
+        if (++currentPosition >= collection.size()) {
+          currentPosition = 0;
+        }
+      }
+    };
+  }
+
+  /**
+   * @return true if the supplied set is equal to the set used to instantiate 
this circular list, otherwise false.
+   */
+  public boolean equalsSet(final Set<T> inputSet)
+  {
+    final List<T> sortedList = new ArrayList<>(inputSet);
+    sortedList.sort(comparator);
+    return collection.equals(sortedList);

Review Comment:
   Maybe we don't need to sort for the equality check:
   
   ```suggestion
       return new HashSet<>(collection).equals(inputSet);
   ```



##########
processing/src/test/java/org/apache/druid/collections/CircularListTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+public class CircularListTest
+{
+  @Test
+  public void testIterateInNaturalOrder()
+  {
+    final Set<String> input = ImmutableSet.of("b", "a", "c");
+    final CircularList<String> circularList = new CircularList<>(input, 
Comparator.naturalOrder());
+    final List<String> observedElements = new ArrayList<>();
+    int cnt = 0;
+    for (String x : circularList) {
+      observedElements.add(x);
+      if (++cnt >= input.size()) {
+        break;
+      }
+    }
+    Assert.assertEquals(ImmutableList.of("a", "b", "c"), observedElements);
+  }
+
+  @Test
+  public void testIterateInReverseOrder()

Review Comment:
   Should we have a test where we have two `for` loops? We break the first in 
the middle of the cycle and ensure that the second resumes from where we 
started.
   Or is that not a valid use case? 😛 
   
   I think your current code will maintain the order with two or more `for` 
loops since the current position is maintained in the `CircularList` itself 
rather than inside the returned iterator.
   
   Either way, it would be nice to call out the behaviour in the javadoc and 
have a test for it.



##########
processing/src/main/java/org/apache/druid/collections/CircularList.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A circular list that is backed by an ordered list of elements containing no 
duplicates. The list is ordered by the
+ * supplied comparator. Callers are responsible for terminating the iterator 
explicitly.
+ * <p>
+ * This class is not thread-safe and must be used from a single thread.
+ */
+@NotThreadSafe
+public class CircularList<T> implements Iterable<T>
+{
+  private final List<T> collection = new ArrayList<>();
+  private final Comparator<? super T> comparator;
+  private int currentPosition;
+
+  public CircularList(final Set<T> elements, Comparator<? super T> comparator)

Review Comment:
   __Non-blocking suggestion:__
   
   We should be able to iterate over any list in a circular fashion using this 
class.
   Whether the list is sorted or not or has duplicates or not should be up to 
the caller.
   
   It is possible to have some usage of this class that requires the values to 
be in a specific order (I guess that would still be possible using an 
`Ordering.explicit()` for the comparator.)
   
   If not, then this class is more of a `CircularSortedSet` which is a nice 
alliteration but `CircularList` still has a better ring to it 😛 (and is easier 
to visualize).
   
   That said, if you prefer the current data structure, I am okay with it too.



##########
processing/src/main/java/org/apache/druid/collections/CircularList.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A circular list that is backed by an ordered list of elements containing no 
duplicates. The list is ordered by the
+ * supplied comparator. Callers are responsible for terminating the iterator 
explicitly.
+ * <p>
+ * This class is not thread-safe and must be used from a single thread.
+ */
+@NotThreadSafe
+public class CircularList<T> implements Iterable<T>
+{
+  private final List<T> collection = new ArrayList<>();
+  private final Comparator<? super T> comparator;
+  private int currentPosition;
+
+  public CircularList(final Set<T> elements, Comparator<? super T> comparator)
+  {
+    this.collection.addAll(elements);
+    this.comparator = comparator;
+    this.collection.sort(comparator);
+  }
+
+  @Override
+  public Iterator<T> iterator()
+  {
+    return new Iterator<T>()
+    {
+      @Override
+      public boolean hasNext()
+      {
+        return collection.size() > 0;
+      }
+
+      @Override
+      public T next()
+      {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+
+        T nextCandidate = peekNext();
+        advanceCursor();
+        return nextCandidate;

Review Comment:
   I can't recall why RoundRobinServerSelector had a `peek` semantic (maybe 
because `advanceCursor()` was public?), but I think the current code can be 
simplified:
   
   ```suggestion
           advanceCursor();
           return collection.get(currentPosition);
   ```
   
   `currentPosition` would have to be initialized to -1 in the constructor and 
we wouldn't need the `peekNext()` method anymore.



##########
processing/src/main/java/org/apache/druid/collections/CircularList.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A circular list that is backed by an ordered list of elements containing no 
duplicates. The list is ordered by the
+ * supplied comparator. Callers are responsible for terminating the iterator 
explicitly.
+ * <p>
+ * This class is not thread-safe and must be used from a single thread.
+ */
+@NotThreadSafe
+public class CircularList<T> implements Iterable<T>
+{
+  private final List<T> collection = new ArrayList<>();

Review Comment:
   Super Nit:
   Maybe a better name would be `items` or `elements`.
   Or if you want to refer to the list itself, you could call it `delegate` or 
`backingList`.



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -225,6 +225,154 @@ public void testKillWithMultipleDatasources()
     validateLastKillStateAndReset(DS2, null);
   }
 
+  /**
+   * Set up multiple datasources {@link #DS1}, {@link #DS2} and {@link #DS3} 
with unused segments with 2 kill task
+   * slots. Running the kill duty each time should pick at least one unique 
datasource in a round-robin manner.
+   */
+  @Test
+  public void testKillMultipleDatasourcesInRoundRobinManner()

Review Comment:
   Super Nit: Can we avoid using the suffix `Manner`?
   Maybe rename to `testRoundRobinKillMultipleDatasources()`?
   Same comment for the other test name.



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -723,6 +901,17 @@ private void validateLastKillStateAndReset(final String 
dataSource, @Nullable fi
     overlordClient.deleteLastKillInterval(dataSource);
   }
 
+  private void createAndAddUsedSegment(final String dataSource, final Interval 
interval, final String version)
+  {
+    final DataSegment segment = createSegment(dataSource, interval, version);
+    try {
+      SqlSegmentsMetadataManagerTestBase.publishSegment(connector, config, 
TestHelper.makeJsonMapper(), segment);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private void createAndAddUnusedSegment(

Review Comment:
   Nit: This method can now call the `createAndAddUsedSegment` as that part is 
common.



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -225,6 +225,154 @@ public void testKillWithMultipleDatasources()
     validateLastKillStateAndReset(DS2, null);
   }
 
+  /**
+   * Set up multiple datasources {@link #DS1}, {@link #DS2} and {@link #DS3} 
with unused segments with 2 kill task
+   * slots. Running the kill duty each time should pick at least one unique 
datasource in a round-robin manner.
+   */

Review Comment:
   Style:
   I generally prefer leaving one-line comments within the test method where 
relevant instead of javadocs (unless the test is too complicated). Ideally, the 
test name and the test code itself should be descriptive enough to clarify what 
is being verified.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -204,7 +230,10 @@ private void killUnusedSegments(
         );
         ++submittedTasks;
         datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
-        remainingDatasourcesToKill.remove(dataSource);
+
+        if (remainingDatasourcesToKill.isEmpty() || submittedTasks >= 
availableKillTaskSlots) {
+          break;
+        }

Review Comment:
   Would it be better to have this check right at the beginning of the loop?
   Currently there are multiple breaks and continue which makes the code a 
little difficult to follow.



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -225,6 +225,154 @@ public void testKillWithMultipleDatasources()
     validateLastKillStateAndReset(DS2, null);
   }
 
+  /**
+   * Set up multiple datasources {@link #DS1}, {@link #DS2} and {@link #DS3} 
with unused segments with 2 kill task
+   * slots. Running the kill duty each time should pick at least one unique 
datasource in a round-robin manner.
+   */
+  @Test
+  public void testKillMultipleDatasourcesInRoundRobinManner()
+  {
+    configBuilder.withIgnoreDurationToRetain(true)
+                 .withMaxSegmentsToKill(2);
+    dynamicConfigBuilder.withMaxKillTaskSlots(2);
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(1));
+
+    createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
+
+    createAndAddUnusedSegment(DS3, YEAR_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS3, DAY_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS3, NEXT_DAY, VERSION, NOW.minusDays(1));
+
+    initDuty();
+    CoordinatorRunStats stats = runDutyAndGetStats();
+
+    Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+    Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
+    Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
+    Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS1_STAT_KEY));
+    Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS2_STAT_KEY));
+
+    stats = runDutyAndGetStats();
+
+    Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+    Assert.assertEquals(4, stats.get(Stats.Kill.SUBMITTED_TASKS));
+    Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS));
+    Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS3_STAT_KEY));
+    Assert.assertEquals(4, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS1_STAT_KEY));
+
+    stats = runDutyAndGetStats();
+
+    Assert.assertEquals(6, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+    Assert.assertEquals(6, stats.get(Stats.Kill.SUBMITTED_TASKS));
+    Assert.assertEquals(6, stats.get(Stats.Kill.MAX_SLOTS));
+    Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS2_STAT_KEY));
+    Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS3_STAT_KEY));
+
+    stats = runDutyAndGetStats();
+
+    Assert.assertEquals(8, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+    Assert.assertEquals(7, stats.get(Stats.Kill.SUBMITTED_TASKS));
+    Assert.assertEquals(8, stats.get(Stats.Kill.MAX_SLOTS));
+    Assert.assertEquals(5, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS1_STAT_KEY));
+    Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS2_STAT_KEY));
+  }
+
+  /**
+   * The set of datasources to kill change in consecutive runs. The kill duty 
should avoid selecting two
+   * consecutive datasources across runs as long as there are other 
datasources to kill.
+   */
+  @Test
+  public void testKillInRoundRobinMannerWhenDatasourcesChange()
+  {
+    configBuilder.withIgnoreDurationToRetain(true)
+                 .withMaxSegmentsToKill(2);
+    dynamicConfigBuilder.withMaxKillTaskSlots(1);
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
+
+    initDuty();
+    CoordinatorRunStats stats = runDutyAndGetStats();
+
+    Assert.assertEquals(1, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+    Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
+    Assert.assertEquals(1, stats.get(Stats.Kill.MAX_SLOTS));
+    Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS1_STAT_KEY));
+
+    validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), 
MONTH_OLD.getEnd()));
+
+    createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
+
+    stats = runDutyAndGetStats();
+
+    Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+    Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
+    Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
+    Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS2_STAT_KEY));
+
+    stats = runDutyAndGetStats();
+
+    Assert.assertEquals(3, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+    Assert.assertEquals(3, stats.get(Stats.Kill.SUBMITTED_TASKS));
+    Assert.assertEquals(3, stats.get(Stats.Kill.MAX_SLOTS));
+    Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS1_STAT_KEY));
+
+    stats = runDutyAndGetStats();
+
+    Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+    Assert.assertEquals(4, stats.get(Stats.Kill.SUBMITTED_TASKS));
+    Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS));
+    Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, 
DS2_STAT_KEY));
+  }
+
+  /**
+   * There is a single datasource to kill across multiple runs. The duty 
should keep picking the same datasource.
+   */

Review Comment:
   Not needed, test name is self-explanatory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to