kfaraz commented on code in PR #16719:
URL: https://github.com/apache/druid/pull/16719#discussion_r1673268762
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -75,14 +77,23 @@ public class KillUnusedSegments implements CoordinatorDuty
private final Duration durationToRetain;
private final boolean ignoreDurationToRetain;
private final int maxSegmentsToKill;
+ private final Duration bufferPeriod;
/**
* Used to keep track of the last interval end time that was killed for each
* datasource.
*/
private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
+
+ /**
+ * State that is maintained in the duty to determine if the duty needs to
run or not.
+ */
Review Comment:
I am not sure if the new javadocs for the private fields are really adding
any value.
The field names are self-explanatory and it is trivial to check how they are
being used.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.error.InvalidInput;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A round-robin iterator that is backed by an ordered list of candidates
containing no duplicates. The iterator
+ * iterates endlessly over all the candidates. The caller must explicitly
terminate it.
+ * The iterator has the following properties:
+ * <ul>
+ * <li> Starts with an initial random cursor position in an ordered list of
candidates. </li>
+ * <li> Invoking {@code next()} on {@link #getIterator()} is guaranteed to
result in a deterministic order
+ * unless the set of candidates change when {@link #updateCandidates(Set)}
is called. When the candidates change,
+ * the cursor is reset to a random position in the new list of ordered
candidates. </li>
+ * <li> Guarantees that no duplicate candidates are returned in two
consecutive {@code next()} iterations. </li>
+ * </ul>
+ *
+ * <p>
+ * This class is not thread-safe and must be used from a single thread.
+ */
+@NotThreadSafe
+public class RoundRobinIterator
Review Comment:
Some suggestions to make this class nicer:
1. The iteration aspect of this class seems vaguely similar to
`RoundRobinServerSelector.CircularServerList`. Let's just pull out
`CircularServerList` as a generic class `CircularList<T>` and put it in the
package `org.apache.druid.collections`.
2. Make `CircularList` implement `Iterable`.
3. Ideally, this class would have no public methods right now, but since
`RoundRobinServerSelector` is already using `peekNext()` and `advanceCursor()`,
let's keep them for now. I can later take a look at `EligibleServerSelector` to
see if we can't do without these methods.
4. I am not entirely sold on requiring to reset the cursor to a random
position (it would be nice to have a unit test that verifies the spread after
say 1000 runs and compare when we reset to random and don't reset at all).
- Either way, the `CircularList` should expose a method `resetCursor(int
position)`. The randomness should not be baked into this class and should
instead be provided by the caller (in this case, the kill duty).
- Sorting and randomization should be mutually exclusive. If we are going
to randomize, there is no point in sorting the set.
5. We can also add a method `equalsSet(Set<>)`.
6. The requirement to avoid duplicates should be a part of the caller, i.e.
kill duty. (I recall having suggested it to be a part of the
`RoundRobinIterator` but this seems cleaner now 😛 ).
After all of this, the kill duty would look as follows:
```java
private String prevDatasourceKilled;
// I don't like this variable name myself, if something better comes to
mind, use it
private CircularList<String> datasourceCircularKillList;
public void run()
{
...
...
// This goes where we identify the datasourcesToKill in the duty
final Set<String> datasourcesToKill = ...;
if (datasourceCircularKillList == null ||
!datasourceCircularKillList.equalsSet(datasourcesToKill)) {
datasourceCircularKillList = new CircularList<>(datasourcesToKill);
// optional
datasourceCircularKillList.resetCursor(randomPosition);
}
...
...
// This goes where we iterate over the circular list
for (String datasource : datasourceCircularKillList) {
if (datasource.equals(prevDatasourceKilled)) {
continue;
} else {
prevDatasourceKilled = datasource;
}
// Do the rest of the kill processing
}
...
...
}
```
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -141,18 +162,24 @@ private DruidCoordinatorRuntimeParams runInternal(final
DruidCoordinatorRuntimeP
final CoordinatorRunStats stats = params.getCoordinatorStats();
final int availableKillTaskSlots =
getAvailableKillTaskSlots(dynamicConfig, stats);
- Collection<String> dataSourcesToKill =
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
-
- if (availableKillTaskSlots > 0) {
- // If no datasource has been specified, all are eligible for killing
unused segments
- if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
- dataSourcesToKill =
segmentsMetadataManager.retrieveAllDataSourceNames();
- }
+ if (availableKillTaskSlots <= 0) {
+ log.info("Skipping KillUnusedSegments because there are no available
kill task slots.");
+ return params;
+ }
- lastKillTime = DateTimes.nowUtc();
- killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
+ final Set<String> dataSourcesToKill;
+ if
(!CollectionUtils.isNullOrEmpty(dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()))
{
Review Comment:
Please invert the condition for readability.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -141,18 +162,24 @@ private DruidCoordinatorRuntimeParams runInternal(final
DruidCoordinatorRuntimeP
final CoordinatorRunStats stats = params.getCoordinatorStats();
final int availableKillTaskSlots =
getAvailableKillTaskSlots(dynamicConfig, stats);
- Collection<String> dataSourcesToKill =
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
-
- if (availableKillTaskSlots > 0) {
- // If no datasource has been specified, all are eligible for killing
unused segments
- if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
- dataSourcesToKill =
segmentsMetadataManager.retrieveAllDataSourceNames();
- }
+ if (availableKillTaskSlots <= 0) {
+ log.info("Skipping KillUnusedSegments because there are no available
kill task slots.");
Review Comment:
This log can become verbose too. Once we have submitted all the kill tasks
that we can, it is going to be a while before slots free up. So, if kill period
is low, we might see many of these logs.
I would advise leaving out all the new info logs for now.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -141,18 +162,24 @@ private DruidCoordinatorRuntimeParams runInternal(final
DruidCoordinatorRuntimeP
final CoordinatorRunStats stats = params.getCoordinatorStats();
final int availableKillTaskSlots =
getAvailableKillTaskSlots(dynamicConfig, stats);
- Collection<String> dataSourcesToKill =
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
-
- if (availableKillTaskSlots > 0) {
- // If no datasource has been specified, all are eligible for killing
unused segments
- if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
- dataSourcesToKill =
segmentsMetadataManager.retrieveAllDataSourceNames();
- }
+ if (availableKillTaskSlots <= 0) {
+ log.info("Skipping KillUnusedSegments because there are no available
kill task slots.");
+ return params;
+ }
- lastKillTime = DateTimes.nowUtc();
- killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
+ final Set<String> dataSourcesToKill;
+ if
(!CollectionUtils.isNullOrEmpty(dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()))
{
+ dataSourcesToKill =
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
+ } else {
+ // If no datasource has been specified, all are eligible for killing
unused segments by default
Review Comment:
Feels redundant as the code itself is doing exactly that.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -163,30 +190,44 @@ private DruidCoordinatorRuntimeParams runInternal(final
DruidCoordinatorRuntimeP
* Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto
{@code availableKillTaskSlots}.
*/
private void killUnusedSegments(
- @Nullable final Collection<String> dataSourcesToKill,
+ final Set<String> dataSourcesToKill,
final int availableKillTaskSlots,
final CoordinatorRunStats stats
)
{
- if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) ||
availableKillTaskSlots <= 0) {
+ if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+ log.info("Skipping KillUnusedSegments because there are no datasources
to kill.");
stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
return;
}
+ final Iterator<String> dataSourcesToKillIterator =
this.datasourceIterator.getIterator();
+ final Set<String> remainingDatasourcesToKill = new
HashSet<>(dataSourcesToKill);
+ final Set<String> datasourcesKilled = new HashSet<>();
Review Comment:
It is redundant to maintain both these sets. Let's just keep
`datasourcesKilled`, we can build the other from it as it is only needed once
at the end.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -75,14 +77,23 @@ public class KillUnusedSegments implements CoordinatorDuty
private final Duration durationToRetain;
private final boolean ignoreDurationToRetain;
private final int maxSegmentsToKill;
+ private final Duration bufferPeriod;
/**
* Used to keep track of the last interval end time that was killed for each
* datasource.
*/
private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
+
+ /**
+ * State that is maintained in the duty to determine if the duty needs to
run or not.
+ */
private DateTime lastKillTime;
- private final Duration bufferPeriod;
+
+ /**
+ * Round-robin iterator of the datasources to kill. It's updated in every
run by the duty.
+ */
+ private final RoundRobinIterator datasourceIterator;
Review Comment:
Good to mark the new class as not thread-safe. But it is not required to be
thread-safe as a duty should never run on multiple threads (`CompactSegments`
is the only one that could potentially run concurrently if manual compaction is
triggered).
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -127,7 +148,7 @@ public DruidCoordinatorRuntimeParams run(final
DruidCoordinatorRuntimeParams par
if (canDutyRun()) {
return runInternal(params);
} else {
- log.debug(
+ log.info(
Review Comment:
+1 on keeping this as debug.
While testing, it is natural to want more visibility but that can simply be
achieved by setting log level to debug.
In production, we should try that:
- redundant logs of normal operation are avoided
- all important warn or error information is included
I plan to revisit other duties too to reduce redundant logging.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -163,30 +190,44 @@ private DruidCoordinatorRuntimeParams runInternal(final
DruidCoordinatorRuntimeP
* Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto
{@code availableKillTaskSlots}.
*/
private void killUnusedSegments(
- @Nullable final Collection<String> dataSourcesToKill,
+ final Set<String> dataSourcesToKill,
final int availableKillTaskSlots,
final CoordinatorRunStats stats
)
{
- if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) ||
availableKillTaskSlots <= 0) {
+ if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+ log.info("Skipping KillUnusedSegments because there are no datasources
to kill.");
stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
return;
}
+ final Iterator<String> dataSourcesToKillIterator =
this.datasourceIterator.getIterator();
+ final Set<String> remainingDatasourcesToKill = new
HashSet<>(dataSourcesToKill);
+ final Set<String> datasourcesKilled = new HashSet<>();
- final Collection<String> remainingDatasourcesToKill = new
ArrayList<>(dataSourcesToKill);
int submittedTasks = 0;
- for (String dataSource : dataSourcesToKill) {
+ while (dataSourcesToKillIterator.hasNext()) {
Review Comment:
> I tried to find documentation on how the duties operate but couldn't find
much information. I think adding some code/public-facing documentation on this
would help clarify this. :-)
Nah, it is really an implementation detail that only devs need to be privy
to.
In `DruidCoordinator`, every duty group has a dedicated single-threaded
executor. See `DruidCoordinator.getOrCreateDutyGroupExecutors()`. We can add a
comment there asserting that every executor __must always be single-threaded__.
Note: Until a couple of releases ago, all the duties ran on a single thread,
so even less concurrency 😂 .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]