This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 78542d018f1 MSQ: Allow configurable maxFrameSize. (#18442)
78542d018f1 is described below

commit 78542d018f1e4c794c6200f31caf716e8e40e6a3
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Aug 27 21:22:32 2025 -0700

    MSQ: Allow configurable maxFrameSize. (#18442)
    
    In most situations, there is no need to change this parameter. Running
    into this limit is often a sign that something is wrong. However, with
    datasets that legitimately have very large rows, it does become
    necessary to use a larger frame size.
---
 docs/multi-stage-query/reference.md                    |  1 +
 .../msq/dart/controller/DartControllerContext.java     |  3 ++-
 .../druid/msq/exec/ControllerMemoryParameters.java     |  5 +++--
 .../apache/druid/msq/exec/WorkerMemoryParameters.java  |  3 ++-
 .../druid/msq/indexing/IndexerControllerContext.java   |  3 ++-
 .../druid/msq/indexing/error/RowTooLargeFault.java     |  9 ++++++++-
 .../apache/druid/msq/util/MultiStageQueryContext.java  | 11 +++++++++++
 .../druid/msq/exec/ControllerMemoryParametersTest.java | 18 ++++++++++++------
 .../druid/msq/util/MultiStageQueryContextTest.java     | 18 ++++++++++++++++++
 9 files changed, 59 insertions(+), 12 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index 1b6aa7f7dd6..672f8050a1e 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -417,6 +417,7 @@ The following table lists the context parameters for the 
MSQ task engine:
 | `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the 
default), an INSERT query generating no output rows will be no-op, and a 
REPLACE query generating no output rows will delete all data that matches the 
OVERWRITE clause.  When set to true, an ingest query generating no output rows 
will throw an `InsertCannotBeEmpty` fault. | `false` |
 | `storeCompactionState` | REPLACE<br /><br /> When set to true, a REPLACE 
query stores as part of each segment's metadata a `lastCompactionState` field 
that captures the various specs used to create the segment. Future compaction 
jobs skip segments whose `lastCompactionState` matches the desired compaction 
state. Works the same as 
[`storeCompactionState`](../ingestion/tasks.md#context-parameters) task context 
flag. | `false` |
 | `removeNullBytes` | SELECT, INSERT or REPLACE<br /><br /> The MSQ engine 
cannot process null bytes in strings and throws `InvalidNullByteFault` if it 
encounters them in the source data. If the parameter is set to true, The MSQ 
engine will remove the null bytes in string fields when reading the data. | 
`false` |
+| `maxFrameSize` | SELECT, INSERT or REPLACE<br /><br />Size of frames used 
for data transfer within the MSQ engine. You generally do not need to change 
this unless you have very large rows. | `1000000` (1 MB) |
 
 ## Joins
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index cdd317c5466..6a1b65c09f5 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -140,7 +140,8 @@ public class DartControllerContext implements 
ControllerContext
     final ControllerMemoryParameters memoryParameters =
         ControllerMemoryParameters.createProductionInstance(
             memoryIntrospector,
-            workerIds.size()
+            workerIds.size(),
+            MultiStageQueryContext.getFrameSize(context)
         );
 
     final int maxConcurrentStages = 
MultiStageQueryContext.getMaxConcurrentStagesWithDefault(
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
index c5131ddd84e..f85b1c8d7c7 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
@@ -66,14 +66,15 @@ public class ControllerMemoryParameters
    */
   public static ControllerMemoryParameters createProductionInstance(
       final MemoryIntrospector memoryIntrospector,
-      final int maxWorkerCount
+      final int maxWorkerCount,
+      final int frameSize
   )
   {
     final long totalMemory = memoryIntrospector.memoryPerTask();
     final long memoryForInputChannels =
         WorkerMemoryParameters.computeProcessorMemoryForInputChannels(
             maxWorkerCount,
-            WorkerMemoryParameters.DEFAULT_FRAME_SIZE
+            frameSize
         );
     final int partitionStatisticsMaxRetainedBytes = (int) Math.min(
         totalMemory - memoryForInputChannels,
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
index 462123f21b3..aa0a2f0d686 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
@@ -41,6 +41,7 @@ import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
+import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 
 import javax.annotation.Nullable;
@@ -160,7 +161,7 @@ public class WorkerMemoryParameters
     final StageDefinition stageDef = workOrder.getStageDefinition();
     return createInstance(
         memoryIntrospector,
-        DEFAULT_FRAME_SIZE,
+        MultiStageQueryContext.getFrameSize(workOrder.getWorkerContext()),
         workOrder.getInputs(),
         stageDef.getBroadcastInputNumbers(),
         stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 67fc6aadb24..317b7d4602d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -125,7 +125,8 @@ public class IndexerControllerContext implements 
ControllerContext
     final ControllerMemoryParameters memoryParameters =
         ControllerMemoryParameters.createProductionInstance(
             memoryIntrospector,
-            querySpec.getTuningConfig().getMaxNumWorkers()
+            querySpec.getTuningConfig().getMaxNumWorkers(),
+            MultiStageQueryContext.getFrameSize(querySpec.getContext())
         );
 
     final ControllerQueryKernelConfig config = 
makeQueryKernelConfig(querySpec, memoryParameters);
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
index 865cb6b0c1d..790e83ceff8 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.indexing.error;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.msq.util.MultiStageQueryContext;
 
 import java.util.Objects;
 
@@ -35,7 +36,13 @@ public class RowTooLargeFault extends BaseMSQFault
   @JsonCreator
   public RowTooLargeFault(@JsonProperty("maxFrameSize") final long 
maxFrameSize)
   {
-    super(CODE, "Encountered row that cannot fit in a single frame (max frame 
size = %,d)", maxFrameSize);
+    super(
+        CODE,
+        "Encountered row that cannot fit in a single frame (max frame size = 
%,d). "
+        + "Reduce your row size or increase the context parameter[%s].",
+        maxFrameSize,
+        MultiStageQueryContext.CTX_MAX_FRAME_SIZE
+    );
     this.maxFrameSize = maxFrameSize;
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 9cef4899496..1cd222c4f6c 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -37,6 +37,7 @@ import org.apache.druid.msq.counters.NilQueryCounterSnapshot;
 import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
 import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.exec.SegmentSource;
+import org.apache.druid.msq.exec.WorkerMemoryParameters;
 import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
 import org.apache.druid.msq.indexing.error.MSQWarnings;
 import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
@@ -219,6 +220,11 @@ public class MultiStageQueryContext
    */
   public static final String CTX_TARGET_PARTITIONS_PER_WORKER = 
"targetPartitionsPerWorker";
 
+  /**
+   * Maximum size of frames to create. Defaults to {@link 
WorkerMemoryParameters#DEFAULT_FRAME_SIZE}.
+   */
+  public static final String CTX_MAX_FRAME_SIZE = "maxFrameSize";
+
   private static final Pattern LOOKS_LIKE_JSON_ARRAY = 
Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);
 
   public static String getMSQMode(final QueryContext queryContext)
@@ -497,6 +503,11 @@ public class MultiStageQueryContext
     return new HashSet<>(decodeList(CTX_SKIP_TYPE_VERIFICATION, 
queryContext.getString(CTX_SKIP_TYPE_VERIFICATION)));
   }
 
+  public static int getFrameSize(final QueryContext queryContext)
+  {
+    return queryContext.getInt(CTX_MAX_FRAME_SIZE, 
WorkerMemoryParameters.DEFAULT_FRAME_SIZE);
+  }
+
   /**
    * Decodes a list from either a JSON or CSV string.
    */
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
index d6ae0d7e190..3f5ba303414 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
@@ -36,7 +36,8 @@ public class ControllerMemoryParametersTest
   {
     final ControllerMemoryParameters memoryParameters = 
ControllerMemoryParameters.createProductionInstance(
         makeMemoryIntrospector(128_000_000, 1),
-        1
+        1,
+        WorkerMemoryParameters.DEFAULT_FRAME_SIZE
     );
 
     Assert.assertEquals(101_400_000, 
memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -47,7 +48,8 @@ public class ControllerMemoryParametersTest
   {
     final ControllerMemoryParameters memoryParameters = 
ControllerMemoryParameters.createProductionInstance(
         makeMemoryIntrospector(256_000_000, 1),
-        100
+        100,
+        WorkerMemoryParameters.DEFAULT_FRAME_SIZE
     );
 
     Assert.assertEquals(104_800_000, 
memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -58,7 +60,8 @@ public class ControllerMemoryParametersTest
   {
     final ControllerMemoryParameters memoryParameters = 
ControllerMemoryParameters.createProductionInstance(
         makeMemoryIntrospector(128_000_000, 2),
-        1
+        1,
+        WorkerMemoryParameters.DEFAULT_FRAME_SIZE
     );
 
     Assert.assertEquals(50_200_000, 
memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -69,7 +72,8 @@ public class ControllerMemoryParametersTest
   {
     final ControllerMemoryParameters memoryParameters = 
ControllerMemoryParameters.createProductionInstance(
         makeMemoryIntrospector(1_000_000_000, 1),
-        1
+        1,
+        WorkerMemoryParameters.DEFAULT_FRAME_SIZE
     );
 
     Assert.assertEquals(300_000_000, 
memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -82,7 +86,8 @@ public class ControllerMemoryParametersTest
         MSQException.class,
         () -> ControllerMemoryParameters.createProductionInstance(
             makeMemoryIntrospector(30_000_000, 1),
-            1
+            1,
+            WorkerMemoryParameters.DEFAULT_FRAME_SIZE
         )
     );
 
@@ -98,7 +103,8 @@ public class ControllerMemoryParametersTest
   {
     final ControllerMemoryParameters memoryParameters = 
ControllerMemoryParameters.createProductionInstance(
         makeMemoryIntrospector(33_750_000, 1),
-        1
+        1,
+        WorkerMemoryParameters.DEFAULT_FRAME_SIZE
     );
 
     Assert.assertEquals(26_000_000, 
memoryParameters.getPartitionStatisticsMaxRetainedBytes());
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index bd3cf2b8e1b..3dc8591641f 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.msq.exec.WorkerMemoryParameters;
 import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
 import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
 import org.apache.druid.query.BadQueryContextException;
@@ -46,6 +47,7 @@ import static 
org.apache.druid.msq.util.MultiStageQueryContext.CTX_ARRAY_INGEST_
 import static 
org.apache.druid.msq.util.MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE;
 import static 
org.apache.druid.msq.util.MultiStageQueryContext.CTX_FAULT_TOLERANCE;
 import static 
org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS;
+import static 
org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_FRAME_SIZE;
 import static 
org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_NUM_TASKS;
 import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MSQ_MODE;
 import static 
org.apache.druid.msq.util.MultiStageQueryContext.CTX_REMOVE_NULL_BYTES;
@@ -343,6 +345,22 @@ public class MultiStageQueryContextTest
     );
   }
 
+  @Test
+  public void getFrameSize_unset_returnsDefaultValue()
+  {
+    Assert.assertEquals(
+        WorkerMemoryParameters.DEFAULT_FRAME_SIZE,
+        MultiStageQueryContext.getFrameSize(QueryContext.empty())
+    );
+  }
+
+  @Test
+  public void getFrameSize_set_returnsCorrectValue()
+  {
+    Map<String, Object> propertyMap = ImmutableMap.of(CTX_MAX_FRAME_SIZE, 
500000);
+    Assert.assertEquals(500000, 
MultiStageQueryContext.getFrameSize(QueryContext.of(propertyMap)));
+  }
+
   private static List<String> decodeSortOrder(@Nullable final String input)
   {
     return 
MultiStageQueryContext.decodeList(MultiStageQueryContext.CTX_SORT_ORDER, input);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to