This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4d6076cf5df Switch to counter for tracking used task slots on
ForkingTaskRunner (#18045)
4d6076cf5df is described below
commit 4d6076cf5dff8a0590fd5fb17bac24db53ebaec1
Author: jtuglu-netflix <[email protected]>
AuthorDate: Mon Jun 2 11:51:23 2025 -0700
Switch to counter for tracking used task slots on ForkingTaskRunner (#18045)
* Switch to used slot counter for tracking running tasks on
ForkingTaskRunner
* Fix checkstyle
* More nits
* Address comments
* Nits
---
.../indexing/common/TaskStorageDirTracker.java | 17 +++++++++
.../druid/indexing/overlord/ForkingTaskRunner.java | 25 +++++---------
.../indexing/common/TaskStorageDirTrackerTest.java | 40 ++++++++++++++++++++++
3 files changed, 66 insertions(+), 16 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
index 56050665955..9e0b97646c6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.common;
import com.google.common.collect.ImmutableList;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.FileUtils;
@@ -46,6 +47,9 @@ public class TaskStorageDirTracker
{
private static final Logger log = new Logger(TaskStorageDirTracker.class);
+ @GuardedBy("this")
+ private long numUsedSlots = 0;
+
public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig,
TaskConfig taskConfig)
{
final List<String> basePaths = workerConfig.getBaseTaskDirs();
@@ -144,6 +148,7 @@ public class TaskStorageDirTracker
final int currIncrement = Math.abs(iterationCounter.getAndIncrement() %
slots.length);
final StorageSlot candidateSlot = slots[currIncrement % slots.length];
if (candidateSlot.runningTaskId == null) {
+ ++numUsedSlots;
candidateSlot.runningTaskId = taskId;
return candidateSlot;
}
@@ -155,6 +160,7 @@ public class TaskStorageDirTracker
{
if (slot.getParentRef() == this) {
slot.runningTaskId = null;
+ --numUsedSlots;
} else {
throw new IAE("Cannot return storage slot for task [%s] that I don't
own.", slot.runningTaskId);
}
@@ -178,6 +184,7 @@ public class TaskStorageDirTracker
// correct in-memory accounting for anything that is currently running in
a known slot. After that, for
// compatibility with an old implementation, we need to check the base
directories to see if any of
// the tasks are running in the legacy locations and assign them to one of
the free task slots.
+ numUsedSlots = 0;
for (String taskId : taskIds) {
StorageSlot candidateSlot = Arrays.stream(slots)
.filter(slot -> slot.runningTaskId ==
null)
@@ -188,6 +195,7 @@ public class TaskStorageDirTracker
if (candidateSlot == null) {
missingIds.add(taskId);
} else {
+ ++numUsedSlots;
candidateSlot.runningTaskId = taskId;
retVal.put(taskId, candidateSlot);
}
@@ -259,4 +267,13 @@ public class TaskStorageDirTracker
'}';
}
}
+
+ /**
+ * Retrieves the number of currently used storage slots.
+ * @return the number of storage slots currently in use.
+ */
+ public synchronized long getNumUsedSlots()
+ {
+ return numUsedSlots;
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index d51d919a295..bff251b09b1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -713,29 +713,22 @@ public class ForkingTaskRunner
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
- return ImmutableMap.of(workerConfig.getCategory(),
getTotalTaskSlotCountLong());
- }
-
- public long getTotalTaskSlotCountLong()
- {
- return workerConfig.getCapacity();
+ return Map.of(workerConfig.getCategory(), getWorkerTotalTaskSlotCount());
}
@Override
public Map<String, Long> getIdleTaskSlotCount()
{
- return ImmutableMap.of(workerConfig.getCategory(),
Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0));
+ return Map.of(
+ workerConfig.getCategory(),
+ Math.max(getWorkerTotalTaskSlotCount() - getWorkerUsedTaskSlotCount(),
0)
+ );
}
@Override
public Map<String, Long> getUsedTaskSlotCount()
{
- return ImmutableMap.of(workerConfig.getCategory(),
Long.valueOf(portFinder.findUsedPortCount()));
- }
-
- public long getUsedTaskSlotCountLong()
- {
- return portFinder.findUsedPortCount();
+ return Map.of(workerConfig.getCategory(), getWorkerUsedTaskSlotCount());
}
@Override
@@ -762,19 +755,19 @@ public class ForkingTaskRunner
@Override
public Long getWorkerIdleTaskSlotCount()
{
- return Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(),
0);
+ return Math.max(getWorkerTotalTaskSlotCount() -
getWorkerUsedTaskSlotCount(), 0);
}
@Override
public Long getWorkerUsedTaskSlotCount()
{
- return (long) portFinder.findUsedPortCount();
+ return getTracker().getNumUsedSlots();
}
@Override
public Long getWorkerTotalTaskSlotCount()
{
- return getTotalTaskSlotCountLong();
+ return (long) workerConfig.getCapacity();
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
index 602033c7967..99e34fb8f08 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
@@ -218,6 +218,9 @@ public class TaskStorageDirTrackerTest
Arrays.asList("task1", "task2", "task3", "task4")
);
+ // Ensure that task count is correctly updated
+ Assert.assertEquals(2, tracker.getNumUsedSlots());
+
Assert.assertNull(dirs.get("task3"));
Assert.assertNull(dirs.get("task4"));
@@ -236,6 +239,43 @@ public class TaskStorageDirTrackerTest
verifier.validate(tracker.pickStorageSlot("task5"), "A", "slot1");
}
+ @Test
+ public void testGetNumUsedSlots() throws IOException
+ {
+ File tmpFolder = TMP.newFolder();
+ List<File> files = ImmutableList.of(
+ new File(tmpFolder, "A"),
+ new File(tmpFolder, "B"),
+ new File(tmpFolder, "C")
+ );
+ final int workerCapacity = 7;
+ final int baseTaskDirSize = 100_000_000;
+
+ final TaskStorageDirTracker tracker =
TaskStorageDirTracker.fromBaseDirs(files, workerCapacity, baseTaskDirSize);
+ tracker.ensureDirectories();
+
+ Assert.assertEquals(0, tracker.getNumUsedSlots());
+
+ tracker.pickStorageSlot("task0");
+ tracker.pickStorageSlot("task1");
+ tracker.pickStorageSlot("task2");
+
+ Assert.assertEquals(3, tracker.getNumUsedSlots());
+
+ tracker.returnStorageSlot(tracker.pickStorageSlot("task0"));
+
+ Assert.assertEquals(2, tracker.getNumUsedSlots());
+
+ tracker.pickStorageSlot("task3");
+ tracker.pickStorageSlot("task4");
+ tracker.pickStorageSlot("task5");
+ tracker.pickStorageSlot("task6");
+
+ Assert.assertEquals(6, tracker.getNumUsedSlots());
+
+ FileUtils.deleteDirectory(tmpFolder);
+ }
+
public static class StorageSlotVerifier
{
private final File baseDir;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]