This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2534e628c33 fix error message for MSQ TooManyInputFilesFault (#18799)
2534e628c33 is described below
commit 2534e628c33308f6de6e366e122db2e4796d900a
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Dec 5 01:51:13 2025 -0800
fix error message for MSQ TooManyInputFilesFault (#18799)
---
.../org/apache/druid/msq/exec/QueryValidator.java | 25 ----------------
.../kernel/controller/ControllerQueryKernel.java | 20 +++++++++++--
.../org/apache/druid/msq/exec/MSQFaultsTest.java | 5 ++--
.../apache/druid/msq/exec/QueryValidatorTest.java | 34 ----------------------
4 files changed, 21 insertions(+), 63 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
index 75e5e156ca1..b322b70b07a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
@@ -19,20 +19,13 @@
package org.apache.druid.msq.exec;
-import com.google.common.math.IntMath;
-import com.google.common.primitives.Ints;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
-import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.StageDefinition;
-import org.apache.druid.msq.kernel.WorkOrder;
-
-import java.math.RoundingMode;
public class QueryValidator
{
@@ -68,22 +61,4 @@ public class QueryValidator
}
}
}
-
- /**
- * Validate that a {@link WorkOrder} falls within the {@link
Limits#MAX_INPUT_FILES_PER_WORKER} limit.
- */
- public static void validateWorkOrder(final WorkOrder order)
- {
- final int numInputFiles =
Ints.checkedCast(order.getInputs().stream().mapToLong(InputSlice::fileCount).sum());
-
- if (numInputFiles > Limits.MAX_INPUT_FILES_PER_WORKER) {
- throw new MSQException(
- new TooManyInputFilesFault(
- numInputFiles,
- Limits.MAX_INPUT_FILES_PER_WORKER,
- IntMath.divide(numInputFiles, Limits.MAX_INPUT_FILES_PER_WORKER,
RoundingMode.CEILING)
- )
- );
- }
- }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index 8ad4131d2a4..f61022044b0 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -23,6 +23,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.math.IntMath;
+import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
@@ -35,15 +37,17 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.ExtraInfoHolder;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.OutputChannelMode;
-import org.apache.druid.msq.exec.QueryValidator;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
+import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
+import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpecSlicerFactory;
import org.apache.druid.msq.input.stage.ReadablePartitions;
import org.apache.druid.msq.kernel.QueryDefinition;
@@ -56,6 +60,7 @@ import
org.apache.druid.msq.statistics.CompleteKeyStatisticsInformation;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
import javax.annotation.Nullable;
+import java.math.RoundingMode;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
@@ -292,6 +297,8 @@ public class ControllerQueryKernel
final WorkerInputs workerInputs = stageKernel.getWorkerInputs();
final OutputChannelMode outputChannelMode =
stageOutputChannelModes.get(stageKernel.getStageDefinition().getId());
+ int totalFileCount = 0;
+ boolean fault = false;
for (int workerNumber : workerInputs.workers()) {
final Object extraInfo = extraInfos != null ?
extraInfos.get(workerNumber) : null;
@@ -310,9 +317,18 @@ public class ControllerQueryKernel
config.getWorkerContextMap()
);
- QueryValidator.validateWorkOrder(workOrder);
+ final int numInputFiles =
Ints.checkedCast(workOrder.getInputs().stream().mapToLong(InputSlice::fileCount).sum());
+ fault = fault || IntMath.divide(numInputFiles,
Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING) > 1;
+ totalFileCount += numInputFiles;
workerToWorkOrder.put(workerNumber, workOrder);
}
+
+ final int requiredWorkers = IntMath.divide(totalFileCount,
Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING);
+ if (fault) {
+ throw new MSQException(
+ new TooManyInputFilesFault(totalFileCount,
Limits.MAX_INPUT_FILES_PER_WORKER, requiredWorkers)
+ );
+ }
stageWorkOrders.put(new StageId(queryDef.getQueryId(), stageNumber),
workerToWorkOrder);
return workerToWorkOrder;
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
index 2918391c0be..7e85722e480 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
@@ -472,7 +472,7 @@ public class MSQFaultsTest extends MSQTestBase
{
RowSignature dummyRowSignature = RowSignature.builder().add("__time",
ColumnType.LONG).build();
- final int numFiles = 20000;
+ final int numFiles = 100000;
final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
final String toReadFileNameAsJson =
queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
@@ -492,9 +492,10 @@ public class MSQFaultsTest extends MSQTestBase
+ ") PARTITIONED by day",
externalFiles
))
+ .setQueryContext(Map.of("maxNumTasks", 8))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(dummyRowSignature)
- .setExpectedMSQFault(new TooManyInputFilesFault(numFiles,
Limits.MAX_INPUT_FILES_PER_WORKER, 2))
+ .setExpectedMSQFault(new TooManyInputFilesFault(numFiles,
Limits.MAX_INPUT_FILES_PER_WORKER, 10))
.verifyResults();
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
index b81f49b73ca..0e521dafd1e 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
@@ -26,16 +26,13 @@ import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
-import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.querykit.common.OffsetLimitStageProcessor;
-import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import java.util.Collections;
import java.util.UUID;
import java.util.stream.IntStream;
@@ -95,37 +92,6 @@ public class QueryValidatorTest
QueryValidator.validateQueryDef(createQueryDefinition(Limits.MAX_FRAME_COLUMNS
+ 1, 1));
}
- @Test
- public void testMoreInputFiles()
- {
- int numWorkers = 3;
- int inputFiles = numWorkers * Limits.MAX_INPUT_FILES_PER_WORKER + 1;
-
- final WorkOrder workOrder = new WorkOrder(
- createQueryDefinition(inputFiles, numWorkers),
- 0,
- 0,
- Collections.singletonList(() -> inputFiles), // Slice with a large
number of inputFiles
- null,
- null,
- null,
- null
- );
-
- expectedException.expect(MSQException.class);
- expectedException.expectMessage(StringUtils.format(
- "Too many input files/segments [%d] encountered. Maximum input
files/segments per worker is set to [%d]. Try"
- + " breaking your query up into smaller queries, or increasing the
number of workers to at least [%d] by"
- + " setting %s in your query context",
- inputFiles,
- Limits.MAX_INPUT_FILES_PER_WORKER,
- numWorkers + 1,
- MultiStageQueryContext.CTX_MAX_NUM_TASKS
- ));
-
- QueryValidator.validateWorkOrder(workOrder);
- }
-
public static QueryDefinition createQueryDefinition(int numColumns, int
numWorkers)
{
QueryDefinitionBuilder builder =
QueryDefinition.builder(UUID.randomUUID().toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]