This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d5348413409 MSQFaultsTest: Speed up TooManyClusteredByColumns test.
(#18926)
d5348413409 is described below
commit d5348413409ee2a80b86363ed9c2efb33fa949de
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Jan 20 09:32:58 2026 -0800
MSQFaultsTest: Speed up TooManyClusteredByColumns test. (#18926)
Speed up the test by introducing a context parameter maxClusteredByColumns,
and use it to lower the threshold needed to generate the fault. The test
formerly took over 30s, and becomes less than a second.
---
.../org/apache/druid/msq/exec/QueryValidator.java | 6 ++-
.../druid/msq/util/MultiStageQueryContext.java | 61 +++++++++++++---------
.../org/apache/druid/msq/exec/MSQFaultsTest.java | 16 ++++--
3 files changed, 52 insertions(+), 31 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
index b322b70b07a..dcabe6399df 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java
@@ -26,6 +26,7 @@ import
org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.util.MultiStageQueryContext;
public class QueryValidator
{
@@ -43,11 +44,12 @@ public class QueryValidator
}
final int numClusteredByColumns =
stageDef.getClusterBy().getColumns().size();
- if (numClusteredByColumns > Limits.MAX_CLUSTERED_BY_COLUMNS) {
+ final int maxClusteredByColumns =
MultiStageQueryContext.getMaxClusteredByColumns(queryDef.getContext());
+ if (numClusteredByColumns > maxClusteredByColumns) {
throw new MSQException(
new TooManyClusteredByColumnsFault(
numClusteredByColumns,
- Limits.MAX_CLUSTERED_BY_COLUMNS,
+ maxClusteredByColumns,
stageDef.getStageNumber()
)
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 696f96f39e2..c12ce2d30ec 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -26,7 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.frame.FrameType;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
@@ -146,6 +146,13 @@ public class MultiStageQueryContext
public static final String CTX_MAX_INPUT_FILES_PER_WORKER =
"maxInputFilesPerWorker";
public static final String CTX_MAX_PARTITIONS = "maxPartitions";
+ /**
+ * Used by {@link #getMaxClusteredByColumns(QueryContext)}. Can be used to
adjust the limit of columns appearing
+ * in clusterBy, where the default is {@link
Limits#MAX_CLUSTERED_BY_COLUMNS}. This parameter is undocumented
+ * because its main purpose is to speed up the test {@code
MSQFaultsTest#testInsertWithHugeClusteringKeys}.
+ */
+ public static final String CTX_MAX_CLUSTERED_BY_COLUMNS =
"maxClusteredByColumns";
+
public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE =
"clusterStatisticsMergeMode";
public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE =
ClusterStatisticsMergeMode.SEQUENTIAL.toString();
@@ -349,9 +356,7 @@ public class MultiStageQueryContext
return Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER;
}
if (value <= 0) {
- throw DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.INVALID_INPUT)
- .build("%s must be a positive integer, got[%d]",
CTX_MAX_INPUT_FILES_PER_WORKER, value);
+ throw InvalidInput.exception("%s must be a positive integer, got[%d]",
CTX_MAX_INPUT_FILES_PER_WORKER, value);
}
return value;
}
@@ -363,9 +368,19 @@ public class MultiStageQueryContext
return Limits.DEFAULT_MAX_PARTITIONS;
}
if (value <= 0) {
- throw DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.INVALID_INPUT)
- .build("%s must be a positive integer, got[%d]",
CTX_MAX_PARTITIONS, value);
+ throw InvalidInput.exception("%s must be a positive integer, got[%d]",
CTX_MAX_PARTITIONS, value);
+ }
+ return value;
+ }
+
+ public static int getMaxClusteredByColumns(final QueryContext queryContext)
+ {
+ final Integer value = queryContext.getInt(CTX_MAX_CLUSTERED_BY_COLUMNS);
+ if (value == null) {
+ return Limits.MAX_CLUSTERED_BY_COLUMNS;
+ }
+ if (value <= 0) {
+ throw InvalidInput.exception("%s must be a positive integer, got[%d]",
CTX_MAX_CLUSTERED_BY_COLUMNS, value);
}
return value;
}
@@ -676,26 +691,22 @@ public class MultiStageQueryContext
+ "remove this key for automatic lock type selection",
Tasks.TASK_LOCK_TYPE);
if (isReplaceQuery && !(taskLockType.equals(TaskLockType.EXCLUSIVE) ||
taskLockType.equals(TaskLockType.REPLACE))) {
- throw DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.INVALID_INPUT)
- .build(
- "TaskLock must be of type [%s] or [%s] for a
REPLACE query. Found invalid type [%s] set."
- + appendErrorMessage,
- TaskLockType.EXCLUSIVE,
- TaskLockType.REPLACE,
- taskLockType
- );
+ throw InvalidInput.exception(
+ "TaskLock must be of type [%s] or [%s] for a REPLACE query. Found
invalid type [%s] set."
+ + appendErrorMessage,
+ TaskLockType.EXCLUSIVE,
+ TaskLockType.REPLACE,
+ taskLockType
+ );
}
if (!isReplaceQuery && !(taskLockType.equals(TaskLockType.SHARED) ||
taskLockType.equals(TaskLockType.APPEND))) {
- throw DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.INVALID_INPUT)
- .build(
- "TaskLock must be of type [%s] or [%s] for an
INSERT query. Found invalid type [%s] set."
- + appendErrorMessage,
- TaskLockType.SHARED,
- TaskLockType.APPEND,
- taskLockType
- );
+ throw InvalidInput.exception(
+ "TaskLock must be of type [%s] or [%s] for an INSERT query. Found
invalid type [%s] set."
+ + appendErrorMessage,
+ TaskLockType.SHARED,
+ TaskLockType.APPEND,
+ taskLockType
+ );
}
return taskLockType;
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
index 50c88f025b5..f83b86be77c 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
@@ -427,9 +427,10 @@ public class MSQFaultsTest extends MSQTestBase
@Test
public void testInsertWithHugeClusteringKeys()
{
- RowSignature dummyRowSignature = RowSignature.builder().add("__time",
ColumnType.LONG).build();
+ final RowSignature dummyRowSignature =
RowSignature.builder().add("__time", ColumnType.LONG).build();
- final int numColumns = 1700;
+ final int numColumns = 20;
+ final int maxClusteredByColumns = 15;
String columnNames = IntStream.range(1, numColumns)
.mapToObj(i -> "col" +
i).collect(Collectors.joining(", "));
@@ -445,6 +446,12 @@ public class MSQFaultsTest extends MSQTestBase
))
.collect(Collectors.joining(", "));
+ final Map<String, Object> context =
+ ImmutableMap.<String, Object>builder()
+ .putAll(DEFAULT_MSQ_CONTEXT)
+ .put(MultiStageQueryContext.CTX_MAX_CLUSTERED_BY_COLUMNS,
maxClusteredByColumns)
+ .build();
+
testIngestQuery()
.setSql(StringUtils.format(
" insert into foo1 SELECT\n"
@@ -463,7 +470,8 @@ public class MSQFaultsTest extends MSQTestBase
))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(dummyRowSignature)
- .setExpectedMSQFault(new TooManyClusteredByColumnsFault(numColumns +
2, 1500, 0))
+ .setQueryContext(context)
+ .setExpectedMSQFault(new TooManyClusteredByColumnsFault(numColumns +
2, maxClusteredByColumns, 0))
.verifyResults();
}
@@ -716,7 +724,7 @@ public class MSQFaultsTest extends MSQTestBase
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
- "general"
+ "invalidInput"
).expectMessageContains(
errorMessage))
.verifyPlanningErrors();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]