This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d6076cf5df Switch to counter for tracking used task slots on 
ForkingTaskRunner (#18045)
4d6076cf5df is described below

commit 4d6076cf5dff8a0590fd5fb17bac24db53ebaec1
Author: jtuglu-netflix <[email protected]>
AuthorDate: Mon Jun 2 11:51:23 2025 -0700

    Switch to counter for tracking used task slots on ForkingTaskRunner (#18045)
    
    * Switch to used slot counter for tracking running tasks on 
ForkingTaskRunner
    
    * Fix checkstyle
    
    * More nits
    
    * Address comments
    
    * Nits
---
 .../indexing/common/TaskStorageDirTracker.java     | 17 +++++++++
 .../druid/indexing/overlord/ForkingTaskRunner.java | 25 +++++---------
 .../indexing/common/TaskStorageDirTrackerTest.java | 40 ++++++++++++++++++++++
 3 files changed, 66 insertions(+), 16 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
index 56050665955..9e0b97646c6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common;
 
 import com.google.common.collect.ImmutableList;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
@@ -46,6 +47,9 @@ public class TaskStorageDirTracker
 {
   private static final Logger log = new Logger(TaskStorageDirTracker.class);
 
+  @GuardedBy("this")
+  private long numUsedSlots = 0;
+
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, 
TaskConfig taskConfig)
   {
     final List<String> basePaths = workerConfig.getBaseTaskDirs();
@@ -144,6 +148,7 @@ public class TaskStorageDirTracker
       final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % 
slots.length);
       final StorageSlot candidateSlot = slots[currIncrement % slots.length];
       if (candidateSlot.runningTaskId == null) {
+        ++numUsedSlots;
         candidateSlot.runningTaskId = taskId;
         return candidateSlot;
       }
@@ -155,6 +160,7 @@ public class TaskStorageDirTracker
   {
     if (slot.getParentRef() == this) {
       slot.runningTaskId = null;
+      --numUsedSlots;
     } else {
       throw new IAE("Cannot return storage slot for task [%s] that I don't 
own.", slot.runningTaskId);
     }
@@ -178,6 +184,7 @@ public class TaskStorageDirTracker
     // correct in-memory accounting for anything that is currently running in 
a known slot.  After that, for
     // compatibility with an old implementation, we need to check the base 
directories to see if any of
     // the tasks are running in the legacy locations and assign them to one of 
the free task slots.
+    numUsedSlots = 0;
     for (String taskId : taskIds) {
       StorageSlot candidateSlot = Arrays.stream(slots)
                                         .filter(slot -> slot.runningTaskId == 
null)
@@ -188,6 +195,7 @@ public class TaskStorageDirTracker
       if (candidateSlot == null) {
         missingIds.add(taskId);
       } else {
+        ++numUsedSlots;
         candidateSlot.runningTaskId = taskId;
         retVal.put(taskId, candidateSlot);
       }
@@ -259,4 +267,13 @@ public class TaskStorageDirTracker
              '}';
     }
   }
+
+  /**
+   * Retrieves the number of currently used storage slots.
+   * @return the number of storage slots currently in use.
+   */
+  public synchronized long getNumUsedSlots()
+  {
+    return numUsedSlots;
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index d51d919a295..bff251b09b1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -713,29 +713,22 @@ public class ForkingTaskRunner
   @Override
   public Map<String, Long> getTotalTaskSlotCount()
   {
-    return ImmutableMap.of(workerConfig.getCategory(), 
getTotalTaskSlotCountLong());
-  }
-
-  public long getTotalTaskSlotCountLong()
-  {
-    return workerConfig.getCapacity();
+    return Map.of(workerConfig.getCategory(), getWorkerTotalTaskSlotCount());
   }
 
   @Override
   public Map<String, Long> getIdleTaskSlotCount()
   {
-    return ImmutableMap.of(workerConfig.getCategory(), 
Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0));
+    return Map.of(
+        workerConfig.getCategory(),
+        Math.max(getWorkerTotalTaskSlotCount() - getWorkerUsedTaskSlotCount(), 
0)
+    );
   }
 
   @Override
   public Map<String, Long> getUsedTaskSlotCount()
   {
-    return ImmutableMap.of(workerConfig.getCategory(), 
Long.valueOf(portFinder.findUsedPortCount()));
-  }
-
-  public long getUsedTaskSlotCountLong()
-  {
-    return portFinder.findUsedPortCount();
+    return Map.of(workerConfig.getCategory(), getWorkerUsedTaskSlotCount());
   }
 
   @Override
@@ -762,19 +755,19 @@ public class ForkingTaskRunner
   @Override
   public Long getWorkerIdleTaskSlotCount()
   {
-    return Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 
0);
+    return Math.max(getWorkerTotalTaskSlotCount() - 
getWorkerUsedTaskSlotCount(), 0);
   }
 
   @Override
   public Long getWorkerUsedTaskSlotCount()
   {
-    return (long) portFinder.findUsedPortCount();
+    return getTracker().getNumUsedSlots();
   }
 
   @Override
   public Long getWorkerTotalTaskSlotCount()
   {
-    return getTotalTaskSlotCountLong();
+    return (long) workerConfig.getCapacity();
   }
 
   @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
index 602033c7967..99e34fb8f08 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
@@ -218,6 +218,9 @@ public class TaskStorageDirTrackerTest
         Arrays.asList("task1", "task2", "task3", "task4")
     );
 
+    // Ensure that task count is correctly updated
+    Assert.assertEquals(2, tracker.getNumUsedSlots());
+
     Assert.assertNull(dirs.get("task3"));
     Assert.assertNull(dirs.get("task4"));
 
@@ -236,6 +239,43 @@ public class TaskStorageDirTrackerTest
     verifier.validate(tracker.pickStorageSlot("task5"), "A", "slot1");
   }
 
+  @Test
+  public void testGetNumUsedSlots() throws IOException
+  {
+    File tmpFolder = TMP.newFolder();
+    List<File> files = ImmutableList.of(
+        new File(tmpFolder, "A"),
+        new File(tmpFolder, "B"),
+        new File(tmpFolder, "C")
+    );
+    final int workerCapacity = 7;
+    final int baseTaskDirSize = 100_000_000;
+
+    final TaskStorageDirTracker tracker = 
TaskStorageDirTracker.fromBaseDirs(files, workerCapacity, baseTaskDirSize);
+    tracker.ensureDirectories();
+
+    Assert.assertEquals(0, tracker.getNumUsedSlots());
+
+    tracker.pickStorageSlot("task0");
+    tracker.pickStorageSlot("task1");
+    tracker.pickStorageSlot("task2");
+
+    Assert.assertEquals(3, tracker.getNumUsedSlots());
+
+    tracker.returnStorageSlot(tracker.pickStorageSlot("task0"));
+
+    Assert.assertEquals(2, tracker.getNumUsedSlots());
+
+    tracker.pickStorageSlot("task3");
+    tracker.pickStorageSlot("task4");
+    tracker.pickStorageSlot("task5");
+    tracker.pickStorageSlot("task6");
+
+    Assert.assertEquals(6, tracker.getNumUsedSlots());
+
+    FileUtils.deleteDirectory(tmpFolder);
+  }
+
   public static class StorageSlotVerifier
   {
     private final File baseDir;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to