This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9091f9307f9 Fix computation of task slots required for MSQ compaction
task (#18756)
9091f9307f9 is described below
commit 9091f9307f9b32d4fc015b6366727dae2e894cfe
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Nov 19 20:32:57 2025 +0530
Fix computation of task slots required for MSQ compaction task (#18756)
Changes:
- Add method `CompactionSlotManager.computeSlotsRequiredForTask()`
- Use method in compaction template to compute task slots based on engine
type
---
.../compact/CompactionConfigBasedJobTemplate.java | 2 +-
.../client/indexing/ClientCompactionTaskQuery.java | 9 +++
.../druid/indexing/template/BatchIndexingJob.java | 2 +-
.../server/compaction/CompactionSlotManager.java | 40 +++++++----
.../compaction/CompactionSlotManagerTest.java | 82 ++++++++++++++++++++++
5 files changed, 118 insertions(+), 17 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
index fd89714dadb..a5db148372d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
@@ -80,7 +80,7 @@ public class CompactionConfigBasedJobTemplate implements
CompactionJobTemplate
new CompactionJob(
taskPayload,
candidate,
-
CompactionSlotManager.getMaxTaskSlotsForNativeCompactionTask(taskPayload.getTuningConfig())
+ CompactionSlotManager.computeSlotsRequiredForTask(taskPayload)
)
);
}
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
index 052366418f1..6dd6842622c 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.transform.CompactionTransformSpec;
@@ -161,6 +162,14 @@ public class ClientCompactionTaskQuery implements
ClientTaskQuery
return compactionRunner;
}
+ /**
+ * @return true if this is an MSQ compaction task.
+ */
+ public boolean isMsq()
+ {
+ return compactionRunner != null && compactionRunner.getType() ==
CompactionEngine.MSQ;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/server/src/main/java/org/apache/druid/indexing/template/BatchIndexingJob.java
b/server/src/main/java/org/apache/druid/indexing/template/BatchIndexingJob.java
index 509226da108..aa5e0de37a8 100644
---
a/server/src/main/java/org/apache/druid/indexing/template/BatchIndexingJob.java
+++
b/server/src/main/java/org/apache/druid/indexing/template/BatchIndexingJob.java
@@ -71,7 +71,7 @@ public class BatchIndexingJob
}
/**
- * @return true if this is an MSQ job.
+ * @return true if this is an MSQ SQL job.
*/
public boolean isMsq()
{
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java
index 428367db8ea..2b63633b9e8 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java
@@ -25,7 +25,6 @@ import
org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientMSQContext;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
@@ -100,18 +99,7 @@ public class CompactionSlotManager
*/
public void reserveTaskSlots(ClientCompactionTaskQuery compactionTaskQuery)
{
- // Note: The default compactionRunnerType used here should match the
default runner used in CompactionTask when
- // no runner is provided there.
- CompactionEngine compactionRunnerType =
compactionTaskQuery.getCompactionRunner() == null
- ? CompactionEngine.NATIVE
- :
compactionTaskQuery.getCompactionRunner().getType();
- if (compactionRunnerType == CompactionEngine.NATIVE) {
- numAvailableTaskSlots -=
-
getMaxTaskSlotsForNativeCompactionTask(compactionTaskQuery.getTuningConfig());
- } else {
- numAvailableTaskSlots -=
-
getMaxTaskSlotsForMSQCompactionTask(compactionTaskQuery.getContext());
- }
+ numAvailableTaskSlots -= computeSlotsRequiredForTask(compactionTaskQuery);
}
/**
@@ -311,7 +299,7 @@ public class CompactionSlotManager
}
/**
- * @return Maximum number of task slots used by an MSQ compaction task at any
+ * Maximum number of task slots used by an MSQ compaction task at any
* time when the task is run with the given context.
*/
public static int getMaxTaskSlotsForMSQCompactionTask(@Nullable Map<String,
Object> context)
@@ -344,12 +332,22 @@ public class CompactionSlotManager
return tuningConfig.getPartitionsSpec() instanceof
DimensionRangePartitionsSpec;
}
+ /**
+ * Computes the maximum number of slots required for a compaction task. This
+ * is the legacy method used by the Coordinator compaction duty.
+ * <p>
+ * MSQ-based Compaction tasks launched by the Coordinator use up all
available
+ * task slots if {@code maxNumTasks} is not specified in the context.
However,
+ * {@link #computeSlotsRequiredForTask(ClientCompactionTaskQuery)}, used by
+ * compaction supervisors uses {@link ClientMSQContext#DEFAULT_MAX_NUM_TASKS}
+ * instead.
+ */
public int computeSlotsRequiredForTask(
ClientCompactionTaskQuery task,
DataSourceCompactionConfig config
)
{
- if (task.getCompactionRunner().getType() == CompactionEngine.MSQ) {
+ if (task.isMsq()) {
final Map<String, Object> autoCompactionContext = task.getContext();
if
(autoCompactionContext.containsKey(ClientMSQContext.CTX_MAX_NUM_TASKS)) {
return (int)
autoCompactionContext.get(ClientMSQContext.CTX_MAX_NUM_TASKS);
@@ -373,4 +371,16 @@ public class CompactionSlotManager
);
}
}
+
+ /**
+ * Computes the number of task slots required to run this compaction task.
+ */
+ public static int computeSlotsRequiredForTask(ClientCompactionTaskQuery task)
+ {
+ if (task.isMsq()) {
+ return getMaxTaskSlotsForMSQCompactionTask(task.getContext());
+ } else {
+ return getMaxTaskSlotsForNativeCompactionTask(task.getTuningConfig());
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/server/compaction/CompactionSlotManagerTest.java
b/server/src/test/java/org/apache/druid/server/compaction/CompactionSlotManagerTest.java
new file mode 100644
index 00000000000..e4a698dec99
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/compaction/CompactionSlotManagerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
+import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.client.indexing.ClientMSQContext;
+import org.apache.druid.indexer.CompactionEngine;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class CompactionSlotManagerTest
+{
+ @Test
+ public void
test_computeSlotsRequiredForTask_forMsqTask_whenContextIsNull_returnsDefault()
+ {
+ Assertions.assertEquals(
+ ClientMSQContext.DEFAULT_MAX_NUM_TASKS,
+ CompactionSlotManager.computeSlotsRequiredForTask(
+ createMsqTask(null)
+ )
+ );
+ }
+
+ @Test
+ public void
test_computeSlotsRequiredForTask_forMsqTask_whenContextHasMaxNumTasks_returnsValue()
+ {
+ Assertions.assertEquals(
+ 50,
+ CompactionSlotManager.computeSlotsRequiredForTask(
+ createMsqTask(Map.of(ClientMSQContext.CTX_MAX_NUM_TASKS, 50))
+ )
+ );
+ }
+
+ @Test
+ public void
test_computeSlotsRequiredForTask_forMsqTask_whenContextIsEmpty_returnsDefault()
+ {
+ Assertions.assertEquals(
+ ClientMSQContext.DEFAULT_MAX_NUM_TASKS,
+ CompactionSlotManager.computeSlotsRequiredForTask(
+ createMsqTask(Map.of())
+ )
+ );
+ }
+
+ private ClientCompactionTaskQuery createMsqTask(Map<String, Object> context)
+ {
+ return new ClientCompactionTaskQuery(
+ "msq_compact_1",
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ context,
+ new ClientCompactionRunnerInfo(CompactionEngine.MSQ)
+ );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]