This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2534e628c33 fix error message for MSQ TooManyInputFilesFault (#18799)
2534e628c33 is described below

commit 2534e628c33308f6de6e366e122db2e4796d900a
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Dec 5 01:51:13 2025 -0800

    fix error message for MSQ TooManyInputFilesFault (#18799)
---
 .../org/apache/druid/msq/exec/QueryValidator.java  | 25 ----------------
 .../kernel/controller/ControllerQueryKernel.java   | 20 +++++++++++--
 .../org/apache/druid/msq/exec/MSQFaultsTest.java   |  5 ++--
 .../apache/druid/msq/exec/QueryValidatorTest.java  | 34 ----------------------
 4 files changed, 21 insertions(+), 63 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
index 75e5e156ca1..b322b70b07a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
@@ -19,20 +19,13 @@
 
 package org.apache.druid.msq.exec;
 
-import com.google.common.math.IntMath;
-import com.google.common.primitives.Ints;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
 import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
-import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
 import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.kernel.StageDefinition;
-import org.apache.druid.msq.kernel.WorkOrder;
-
-import java.math.RoundingMode;
 
 public class QueryValidator
 {
@@ -68,22 +61,4 @@ public class QueryValidator
       }
     }
   }
-
-  /**
-   * Validate that a {@link WorkOrder} falls within the {@link 
Limits#MAX_INPUT_FILES_PER_WORKER} limit.
-   */
-  public static void validateWorkOrder(final WorkOrder order)
-  {
-    final int numInputFiles = 
Ints.checkedCast(order.getInputs().stream().mapToLong(InputSlice::fileCount).sum());
-
-    if (numInputFiles > Limits.MAX_INPUT_FILES_PER_WORKER) {
-      throw new MSQException(
-          new TooManyInputFilesFault(
-              numInputFiles,
-              Limits.MAX_INPUT_FILES_PER_WORKER,
-              IntMath.divide(numInputFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 
RoundingMode.CEILING)
-          )
-      );
-    }
-  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index 8ad4131d2a4..f61022044b0 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -23,6 +23,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.math.IntMath;
+import com.google.common.primitives.Ints;
 import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
@@ -35,15 +37,17 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.exec.ExtraInfoHolder;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.exec.OutputChannelMode;
-import org.apache.druid.msq.exec.QueryValidator;
 import org.apache.druid.msq.indexing.error.CanceledFault;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.MSQFault;
 import org.apache.druid.msq.indexing.error.MSQFaultUtils;
+import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
 import org.apache.druid.msq.indexing.error.UnknownFault;
 import org.apache.druid.msq.indexing.error.WorkerFailedFault;
 import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
+import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSpecSlicerFactory;
 import org.apache.druid.msq.input.stage.ReadablePartitions;
 import org.apache.druid.msq.kernel.QueryDefinition;
@@ -56,6 +60,7 @@ import 
org.apache.druid.msq.statistics.CompleteKeyStatisticsInformation;
 import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
 
 import javax.annotation.Nullable;
+import java.math.RoundingMode;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -292,6 +297,8 @@ public class ControllerQueryKernel
     final WorkerInputs workerInputs = stageKernel.getWorkerInputs();
     final OutputChannelMode outputChannelMode = 
stageOutputChannelModes.get(stageKernel.getStageDefinition().getId());
 
+    int totalFileCount = 0;
+    boolean fault = false;
     for (int workerNumber : workerInputs.workers()) {
       final Object extraInfo = extraInfos != null ? 
extraInfos.get(workerNumber) : null;
 
@@ -310,9 +317,18 @@ public class ControllerQueryKernel
           config.getWorkerContextMap()
       );
 
-      QueryValidator.validateWorkOrder(workOrder);
+      final int numInputFiles = 
Ints.checkedCast(workOrder.getInputs().stream().mapToLong(InputSlice::fileCount).sum());
+      fault = fault || IntMath.divide(numInputFiles, 
Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING) > 1;
+      totalFileCount += numInputFiles;
       workerToWorkOrder.put(workerNumber, workOrder);
     }
+
+    final int requiredWorkers = IntMath.divide(totalFileCount, 
Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING);
+    if (fault) {
+      throw new MSQException(
+          new TooManyInputFilesFault(totalFileCount, 
Limits.MAX_INPUT_FILES_PER_WORKER, requiredWorkers)
+      );
+    }
     stageWorkOrders.put(new StageId(queryDef.getQueryId(), stageNumber), 
workerToWorkOrder);
     return workerToWorkOrder;
   }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
index 2918391c0be..7e85722e480 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
@@ -472,7 +472,7 @@ public class MSQFaultsTest extends MSQTestBase
   {
     RowSignature dummyRowSignature = RowSignature.builder().add("__time", 
ColumnType.LONG).build();
 
-    final int numFiles = 20000;
+    final int numFiles = 100000;
 
     final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
     final String toReadFileNameAsJson = 
queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
@@ -492,9 +492,10 @@ public class MSQFaultsTest extends MSQTestBase
             + ") PARTITIONED by day",
             externalFiles
         ))
+        .setQueryContext(Map.of("maxNumTasks", 8))
         .setExpectedDataSource("foo1")
         .setExpectedRowSignature(dummyRowSignature)
-        .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, 
Limits.MAX_INPUT_FILES_PER_WORKER, 2))
+        .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, 
Limits.MAX_INPUT_FILES_PER_WORKER, 10))
         .verifyResults();
   }
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
index b81f49b73ca..0e521dafd1e 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
@@ -26,16 +26,13 @@ import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.StageDefinitionBuilder;
-import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.querykit.common.OffsetLimitStageProcessor;
-import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.Collections;
 import java.util.UUID;
 import java.util.stream.IntStream;
 
@@ -95,37 +92,6 @@ public class QueryValidatorTest
     
QueryValidator.validateQueryDef(createQueryDefinition(Limits.MAX_FRAME_COLUMNS 
+ 1, 1));
   }
 
-  @Test
-  public void testMoreInputFiles()
-  {
-    int numWorkers = 3;
-    int inputFiles = numWorkers * Limits.MAX_INPUT_FILES_PER_WORKER + 1;
-
-    final WorkOrder workOrder = new WorkOrder(
-        createQueryDefinition(inputFiles, numWorkers),
-        0,
-        0,
-        Collections.singletonList(() -> inputFiles), // Slice with a large 
number of inputFiles
-        null,
-        null,
-        null,
-        null
-    );
-
-    expectedException.expect(MSQException.class);
-    expectedException.expectMessage(StringUtils.format(
-        "Too many input files/segments [%d] encountered. Maximum input 
files/segments per worker is set to [%d]. Try"
-        + " breaking your query up into smaller queries, or increasing the 
number of workers to at least [%d] by"
-        + " setting %s in your query context",
-        inputFiles,
-        Limits.MAX_INPUT_FILES_PER_WORKER,
-        numWorkers + 1,
-        MultiStageQueryContext.CTX_MAX_NUM_TASKS
-    ));
-
-    QueryValidator.validateWorkOrder(workOrder);
-  }
-
   public static QueryDefinition createQueryDefinition(int numColumns, int 
numWorkers)
   {
     QueryDefinitionBuilder builder = 
QueryDefinition.builder(UUID.randomUUID().toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to