This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 78542d018f1 MSQ: Allow configurable maxFrameSize. (#18442)
78542d018f1 is described below
commit 78542d018f1e4c794c6200f31caf716e8e40e6a3
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Aug 27 21:22:32 2025 -0700
MSQ: Allow configurable maxFrameSize. (#18442)
In most situations, there is no need to change this parameter. Running
into this limit is often a sign that something is wrong. However, with
datasets that legitimately have very large rows, it does become
necessary to use a larger frame size.
---
docs/multi-stage-query/reference.md | 1 +
.../msq/dart/controller/DartControllerContext.java | 3 ++-
.../druid/msq/exec/ControllerMemoryParameters.java | 5 +++--
.../apache/druid/msq/exec/WorkerMemoryParameters.java | 3 ++-
.../druid/msq/indexing/IndexerControllerContext.java | 3 ++-
.../druid/msq/indexing/error/RowTooLargeFault.java | 9 ++++++++-
.../apache/druid/msq/util/MultiStageQueryContext.java | 11 +++++++++++
.../druid/msq/exec/ControllerMemoryParametersTest.java | 18 ++++++++++++------
.../druid/msq/util/MultiStageQueryContextTest.java | 18 ++++++++++++++++++
9 files changed, 59 insertions(+), 12 deletions(-)
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index 1b6aa7f7dd6..672f8050a1e 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -417,6 +417,7 @@ The following table lists the context parameters for the
MSQ task engine:
| `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the
default), an INSERT query generating no output rows will be no-op, and a
REPLACE query generating no output rows will delete all data that matches the
OVERWRITE clause. When set to true, an ingest query generating no output rows
will throw an `InsertCannotBeEmpty` fault. | `false` |
| `storeCompactionState` | REPLACE<br /><br /> When set to true, a REPLACE
query stores as part of each segment's metadata a `lastCompactionState` field
that captures the various specs used to create the segment. Future compaction
jobs skip segments whose `lastCompactionState` matches the desired compaction
state. Works the same as
[`storeCompactionState`](../ingestion/tasks.md#context-parameters) task context
flag. | `false` |
| `removeNullBytes` | SELECT, INSERT or REPLACE<br /><br /> The MSQ engine
cannot process null bytes in strings and throws `InvalidNullByteFault` if it
encounters them in the source data. If the parameter is set to true, The MSQ
engine will remove the null bytes in string fields when reading the data. |
`false` |
+| `maxFrameSize` | SELECT, INSERT or REPLACE<br /><br />Size of frames used
for data transfer within the MSQ engine. You generally do not need to change
this unless you have very large rows. | `1000000` (1 MB) |
## Joins
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index cdd317c5466..6a1b65c09f5 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -140,7 +140,8 @@ public class DartControllerContext implements
ControllerContext
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
memoryIntrospector,
- workerIds.size()
+ workerIds.size(),
+ MultiStageQueryContext.getFrameSize(context)
);
final int maxConcurrentStages =
MultiStageQueryContext.getMaxConcurrentStagesWithDefault(
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
index c5131ddd84e..f85b1c8d7c7 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
@@ -66,14 +66,15 @@ public class ControllerMemoryParameters
*/
public static ControllerMemoryParameters createProductionInstance(
final MemoryIntrospector memoryIntrospector,
- final int maxWorkerCount
+ final int maxWorkerCount,
+ final int frameSize
)
{
final long totalMemory = memoryIntrospector.memoryPerTask();
final long memoryForInputChannels =
WorkerMemoryParameters.computeProcessorMemoryForInputChannels(
maxWorkerCount,
- WorkerMemoryParameters.DEFAULT_FRAME_SIZE
+ frameSize
);
final int partitionStatisticsMaxRetainedBytes = (int) Math.min(
totalMemory - memoryForInputChannels,
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
index 462123f21b3..aa0a2f0d686 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
@@ -41,6 +41,7 @@ import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
+import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.incremental.IncrementalIndex;
import javax.annotation.Nullable;
@@ -160,7 +161,7 @@ public class WorkerMemoryParameters
final StageDefinition stageDef = workOrder.getStageDefinition();
return createInstance(
memoryIntrospector,
- DEFAULT_FRAME_SIZE,
+ MultiStageQueryContext.getFrameSize(workOrder.getWorkerContext()),
workOrder.getInputs(),
stageDef.getBroadcastInputNumbers(),
stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 67fc6aadb24..317b7d4602d 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -125,7 +125,8 @@ public class IndexerControllerContext implements
ControllerContext
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
memoryIntrospector,
- querySpec.getTuningConfig().getMaxNumWorkers()
+ querySpec.getTuningConfig().getMaxNumWorkers(),
+ MultiStageQueryContext.getFrameSize(querySpec.getContext())
);
final ControllerQueryKernelConfig config =
makeQueryKernelConfig(querySpec, memoryParameters);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
index 865cb6b0c1d..790e83ceff8 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.indexing.error;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.msq.util.MultiStageQueryContext;
import java.util.Objects;
@@ -35,7 +36,13 @@ public class RowTooLargeFault extends BaseMSQFault
@JsonCreator
public RowTooLargeFault(@JsonProperty("maxFrameSize") final long
maxFrameSize)
{
- super(CODE, "Encountered row that cannot fit in a single frame (max frame
size = %,d)", maxFrameSize);
+ super(
+ CODE,
+ "Encountered row that cannot fit in a single frame (max frame size =
%,d). "
+ + "Reduce your row size or increase the context parameter[%s].",
+ maxFrameSize,
+ MultiStageQueryContext.CTX_MAX_FRAME_SIZE
+ );
this.maxFrameSize = maxFrameSize;
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 9cef4899496..1cd222c4f6c 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -37,6 +37,7 @@ import org.apache.druid.msq.counters.NilQueryCounterSnapshot;
import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.SegmentSource;
+import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
@@ -219,6 +220,11 @@ public class MultiStageQueryContext
*/
public static final String CTX_TARGET_PARTITIONS_PER_WORKER =
"targetPartitionsPerWorker";
+ /**
+ * Maximum size of frames to create. Defaults to {@link
WorkerMemoryParameters#DEFAULT_FRAME_SIZE}.
+ */
+ public static final String CTX_MAX_FRAME_SIZE = "maxFrameSize";
+
private static final Pattern LOOKS_LIKE_JSON_ARRAY =
Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);
public static String getMSQMode(final QueryContext queryContext)
@@ -497,6 +503,11 @@ public class MultiStageQueryContext
return new HashSet<>(decodeList(CTX_SKIP_TYPE_VERIFICATION,
queryContext.getString(CTX_SKIP_TYPE_VERIFICATION)));
}
+ public static int getFrameSize(final QueryContext queryContext)
+ {
+ return queryContext.getInt(CTX_MAX_FRAME_SIZE,
WorkerMemoryParameters.DEFAULT_FRAME_SIZE);
+ }
+
/**
* Decodes a list from either a JSON or CSV string.
*/
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
index d6ae0d7e190..3f5ba303414 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
@@ -36,7 +36,8 @@ public class ControllerMemoryParametersTest
{
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(128_000_000, 1),
- 1
+ 1,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);
Assert.assertEquals(101_400_000,
memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -47,7 +48,8 @@ public class ControllerMemoryParametersTest
{
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(256_000_000, 1),
- 100
+ 100,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);
Assert.assertEquals(104_800_000,
memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -58,7 +60,8 @@ public class ControllerMemoryParametersTest
{
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(128_000_000, 2),
- 1
+ 1,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);
Assert.assertEquals(50_200_000,
memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -69,7 +72,8 @@ public class ControllerMemoryParametersTest
{
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(1_000_000_000, 1),
- 1
+ 1,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);
Assert.assertEquals(300_000_000,
memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -82,7 +86,8 @@ public class ControllerMemoryParametersTest
MSQException.class,
() -> ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(30_000_000, 1),
- 1
+ 1,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
)
);
@@ -98,7 +103,8 @@ public class ControllerMemoryParametersTest
{
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(33_750_000, 1),
- 1
+ 1,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);
Assert.assertEquals(26_000_000,
memoryParameters.getPartitionStatisticsMaxRetainedBytes());
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index bd3cf2b8e1b..3dc8591641f 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.query.BadQueryContextException;
@@ -46,6 +47,7 @@ import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_ARRAY_INGEST_
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_FAULT_TOLERANCE;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS;
+import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_FRAME_SIZE;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_NUM_TASKS;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MSQ_MODE;
import static
org.apache.druid.msq.util.MultiStageQueryContext.CTX_REMOVE_NULL_BYTES;
@@ -343,6 +345,22 @@ public class MultiStageQueryContextTest
);
}
+ @Test
+ public void getFrameSize_unset_returnsDefaultValue()
+ {
+ Assert.assertEquals(
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE,
+ MultiStageQueryContext.getFrameSize(QueryContext.empty())
+ );
+ }
+
+ @Test
+ public void getFrameSize_set_returnsCorrectValue()
+ {
+ Map<String, Object> propertyMap = ImmutableMap.of(CTX_MAX_FRAME_SIZE,
500000);
+ Assert.assertEquals(500000,
MultiStageQueryContext.getFrameSize(QueryContext.of(propertyMap)));
+ }
+
private static List<String> decodeSortOrder(@Nullable final String input)
{
return
MultiStageQueryContext.decodeList(MultiStageQueryContext.CTX_SORT_ORDER, input);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]