This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9091f9307f9 Fix computation of task slots required for MSQ compaction 
task (#18756)
9091f9307f9 is described below

commit 9091f9307f9b32d4fc015b6366727dae2e894cfe
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Nov 19 20:32:57 2025 +0530

    Fix computation of task slots required for MSQ compaction task (#18756)
    
    Changes:
    - Add method `CompactionSlotManager.computeSlotsRequiredForTask()`
    - Use method in compaction template to compute task slots based on engine 
type
---
 .../compact/CompactionConfigBasedJobTemplate.java  |  2 +-
 .../client/indexing/ClientCompactionTaskQuery.java |  9 +++
 .../druid/indexing/template/BatchIndexingJob.java  |  2 +-
 .../server/compaction/CompactionSlotManager.java   | 40 +++++++----
 .../compaction/CompactionSlotManagerTest.java      | 82 ++++++++++++++++++++++
 5 files changed, 118 insertions(+), 17 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
index fd89714dadb..a5db148372d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
@@ -80,7 +80,7 @@ public class CompactionConfigBasedJobTemplate implements 
CompactionJobTemplate
           new CompactionJob(
               taskPayload,
               candidate,
-              
CompactionSlotManager.getMaxTaskSlotsForNativeCompactionTask(taskPayload.getTuningConfig())
+              CompactionSlotManager.computeSlotsRequiredForTask(taskPayload)
           )
       );
     }
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
index 052366418f1..6dd6842622c 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.transform.CompactionTransformSpec;
 
@@ -161,6 +162,14 @@ public class ClientCompactionTaskQuery implements 
ClientTaskQuery
     return compactionRunner;
   }
 
+  /**
+   * @return true if this is an MSQ compaction task.
+   */
+  public boolean isMsq()
+  {
+    return compactionRunner != null && compactionRunner.getType() == 
CompactionEngine.MSQ;
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/server/src/main/java/org/apache/druid/indexing/template/BatchIndexingJob.java 
b/server/src/main/java/org/apache/druid/indexing/template/BatchIndexingJob.java
index 509226da108..aa5e0de37a8 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/template/BatchIndexingJob.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/template/BatchIndexingJob.java
@@ -71,7 +71,7 @@ public class BatchIndexingJob
   }
 
   /**
-   * @return true if this is an MSQ job.
+   * @return true if this is an MSQ SQL job.
    */
   public boolean isMsq()
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java
 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java
index 428367db8ea..2b63633b9e8 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java
@@ -25,7 +25,6 @@ import 
org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.ClientMSQContext;
 import org.apache.druid.client.indexing.TaskPayloadResponse;
 import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
@@ -100,18 +99,7 @@ public class CompactionSlotManager
    */
   public void reserveTaskSlots(ClientCompactionTaskQuery compactionTaskQuery)
   {
-    // Note: The default compactionRunnerType used here should match the 
default runner used in CompactionTask when
-    // no runner is provided there.
-    CompactionEngine compactionRunnerType = 
compactionTaskQuery.getCompactionRunner() == null
-                                            ? CompactionEngine.NATIVE
-                                            : 
compactionTaskQuery.getCompactionRunner().getType();
-    if (compactionRunnerType == CompactionEngine.NATIVE) {
-      numAvailableTaskSlots -=
-          
getMaxTaskSlotsForNativeCompactionTask(compactionTaskQuery.getTuningConfig());
-    } else {
-      numAvailableTaskSlots -=
-          
getMaxTaskSlotsForMSQCompactionTask(compactionTaskQuery.getContext());
-    }
+    numAvailableTaskSlots -= computeSlotsRequiredForTask(compactionTaskQuery);
   }
 
   /**
@@ -311,7 +299,7 @@ public class CompactionSlotManager
   }
 
   /**
-   * @return Maximum number of task slots used by an MSQ compaction task at any
+   * Maximum number of task slots used by an MSQ compaction task at any
    * time when the task is run with the given context.
    */
   public static int getMaxTaskSlotsForMSQCompactionTask(@Nullable Map<String, 
Object> context)
@@ -344,12 +332,22 @@ public class CompactionSlotManager
     return tuningConfig.getPartitionsSpec() instanceof 
DimensionRangePartitionsSpec;
   }
 
+  /**
+   * Computes the maximum number of slots required for a compaction task. This
+   * is the legacy method used by the Coordinator compaction duty.
+   * <p>
+   * MSQ-based Compaction tasks launched by the Coordinator use up all 
available
+   * task slots if {@code maxNumTasks} is not specified in the context. 
However,
+   * {@link #computeSlotsRequiredForTask(ClientCompactionTaskQuery)}, used by
+   * compaction supervisors uses {@link ClientMSQContext#DEFAULT_MAX_NUM_TASKS}
+   * instead.
+   */
   public int computeSlotsRequiredForTask(
       ClientCompactionTaskQuery task,
       DataSourceCompactionConfig config
   )
   {
-    if (task.getCompactionRunner().getType() == CompactionEngine.MSQ) {
+    if (task.isMsq()) {
       final Map<String, Object> autoCompactionContext = task.getContext();
       if 
(autoCompactionContext.containsKey(ClientMSQContext.CTX_MAX_NUM_TASKS)) {
         return (int) 
autoCompactionContext.get(ClientMSQContext.CTX_MAX_NUM_TASKS);
@@ -373,4 +371,16 @@ public class CompactionSlotManager
       );
     }
   }
+
+  /**
+   * Computes the number of task slots required to run this compaction task.
+   */
+  public static int computeSlotsRequiredForTask(ClientCompactionTaskQuery task)
+  {
+    if (task.isMsq()) {
+      return getMaxTaskSlotsForMSQCompactionTask(task.getContext());
+    } else {
+      return getMaxTaskSlotsForNativeCompactionTask(task.getTuningConfig());
+    }
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/compaction/CompactionSlotManagerTest.java
 
b/server/src/test/java/org/apache/druid/server/compaction/CompactionSlotManagerTest.java
new file mode 100644
index 00000000000..e4a698dec99
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/compaction/CompactionSlotManagerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
+import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.client.indexing.ClientMSQContext;
+import org.apache.druid.indexer.CompactionEngine;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class CompactionSlotManagerTest
+{
+  @Test
+  public void 
test_computeSlotsRequiredForTask_forMsqTask_whenContextIsNull_returnsDefault()
+  {
+    Assertions.assertEquals(
+        ClientMSQContext.DEFAULT_MAX_NUM_TASKS,
+        CompactionSlotManager.computeSlotsRequiredForTask(
+            createMsqTask(null)
+        )
+    );
+  }
+
+  @Test
+  public void 
test_computeSlotsRequiredForTask_forMsqTask_whenContextHasMaxNumTasks_returnsValue()
+  {
+    Assertions.assertEquals(
+        50,
+        CompactionSlotManager.computeSlotsRequiredForTask(
+            createMsqTask(Map.of(ClientMSQContext.CTX_MAX_NUM_TASKS, 50))
+        )
+    );
+  }
+
+  @Test
+  public void 
test_computeSlotsRequiredForTask_forMsqTask_whenContextIsEmpty_returnsDefault()
+  {
+    Assertions.assertEquals(
+        ClientMSQContext.DEFAULT_MAX_NUM_TASKS,
+        CompactionSlotManager.computeSlotsRequiredForTask(
+            createMsqTask(Map.of())
+        )
+    );
+  }
+
+  private ClientCompactionTaskQuery createMsqTask(Map<String, Object> context)
+  {
+    return new ClientCompactionTaskQuery(
+        "msq_compact_1",
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        context,
+        new ClientCompactionRunnerInfo(CompactionEngine.MSQ)
+    );
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to