This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9423aa9163 MSQ: Consider PARTITION_STATS_MAX_BYTES in 
WorkerMemoryParameters. (#13274)
9423aa9163 is described below

commit 9423aa9163ce60f9572e9042c0dcb370fe81770f
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Nov 7 00:57:18 2022 -0800

    MSQ: Consider PARTITION_STATS_MAX_BYTES in WorkerMemoryParameters. (#13274)
    
    * MSQ: Consider PARTITION_STATS_MAX_BYTES in WorkerMemoryParameters.
    
    This consideration is important, because otherwise we can run out of
    memory due to large statistics-tracking objects.
    
    * Improved calculations.
---
 docs/multi-stage-query/concepts.md                 |   4 +-
 docs/multi-stage-query/reference.md                |   2 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  |   6 +-
 .../java/org/apache/druid/msq/exec/WorkerImpl.java |   6 +-
 .../druid/msq/exec/WorkerMemoryParameters.java     | 230 +++++++++++++++++++--
 .../druid/msq/indexing/IndexerWorkerContext.java   |  85 +-------
 .../msq/indexing/error/NotEnoughMemoryFault.java   |  16 +-
 .../apache/druid/msq/kernel/StageDefinition.java   |   7 +-
 .../kernel/controller/ControllerQueryKernel.java   |   7 +-
 .../kernel/controller/ControllerStageTracker.java  |  23 ++-
 .../druid/msq/exec/WorkerMemoryParametersTest.java |  57 +++--
 .../msq/indexing/IndexerWorkerContextTest.java     |   3 +-
 .../msq/indexing/error/MSQFaultSerdeTest.java      |   2 +-
 .../controller/BaseControllerQueryKernelTest.java  |  10 +-
 .../org/apache/druid/msq/test/MSQTestBase.java     |  31 ++-
 .../druid/msq/test/MSQTestWorkerContext.java       |   3 +-
 16 files changed, 326 insertions(+), 166 deletions(-)

diff --git a/docs/multi-stage-query/concepts.md 
b/docs/multi-stage-query/concepts.md
index 0e8dd52312..ae64478243 100644
--- a/docs/multi-stage-query/concepts.md
+++ b/docs/multi-stage-query/concepts.md
@@ -257,8 +257,8 @@ On Peons launched by Middle Managers, the bulk of the JVM 
heap (75%, less any sp
 [lookups](../querying/lookups.md)) is split up into two bundles of equal size: 
one processor bundle and one worker
 bundle. Each one comprises 37.5% of the available JVM heap, less any space 
used by [lookups](../querying/lookups.md).
 
-Depending on the type of query, each worker and controller task can use a 
sketch for generating partition boundaries.
-Each sketch uses at most approximately 300 MB.
+Depending on the type of query, controller and worker tasks may use sketches 
for determining partition boundaries.
+The heap footprint of these sketches is capped at 10% of available memory, or 
300 MB, whichever is lower.
 
 The processor memory bundle is used for query processing and segment 
generation. Each processor bundle must also
 provides space to buffer I/O between stages. Specifically, each downstream 
stage requires 1 MB of buffer space for each
diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index a50e7991dc..107631595b 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -251,7 +251,7 @@ The following table describes error codes you may encounter 
in the `multiStageQu
 |  TooManyColumns |  Exceeded the number of columns for a stage. See the 
[Limits](#limits) table for the specific limit.  | `maxColumns`: The limit on 
columns which was exceeded.  |
 |  TooManyWarnings |  Exceeded the allowed number of warnings of a particular 
type. | `rootErrorCode`: The error code corresponding to the exception that 
exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of 
warnings that are allowed for the corresponding `rootErrorCode`.   |
 |  TooManyWorkers |  Exceeded the supported number of workers running 
simultaneously. See the [Limits](#limits) table for the specific limit.  | 
`workers`: The number of simultaneously running workers that exceeded a hard or 
soft limit. This may be larger than the number of workers in any one stage if 
multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard 
or soft limit on workers that was exceeded.  |
-|  NotEnoughMemory  |  Insufficient memory to launch a stage.  |  
`serverMemory`: The amount of memory available to a single process.<br /><br 
/>`serverWorkers`: The number of workers running in a single process.<br /><br 
/>`serverThreads`: The number of threads in a single process.  |
+|  NotEnoughMemory  |  Insufficient memory to launch a stage.  |  
`serverMemory`: The amount of memory available to a single process.<br /><br 
/>`usableMemory`: The amount of server memory usable by query-related work. 
Excludes space taken by lookups plus an additional 25% overhead.<br /><br 
/>`serverWorkers`: The number of workers running in a single process.<br /><br 
/>`serverThreads`: The number of threads in a single process.  |
 |  WorkerFailed  |  A worker task failed unexpectedly.  |  `workerTaskId`: The 
ID of the worker task.  |
 |  WorkerRpcFailed  |  A remote procedure call to a worker task failed and 
could not recover.  |  `workerTaskId`: the id of the worker task  |
 |  UnknownError   |  All other errors.  |    |
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 8eb348bf50..647a756757 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1942,7 +1942,11 @@ public class ControllerImpl implements Controller
       this.queryDef = queryDef;
       this.inputSpecSlicerFactory = inputSpecSlicerFactory;
       this.closer = closer;
-      this.queryKernel = new ControllerQueryKernel(queryDef);
+      this.queryKernel = new ControllerQueryKernel(
+          queryDef,
+          
WorkerMemoryParameters.createProductionInstanceForController(context.injector())
+                                .getPartitionStatisticsMaxRetainedBytes()
+      );
     }
 
     /**
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index ad1cdfa04c..5b68041d0e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -1062,6 +1062,7 @@ public class WorkerImpl implements Worker
           clusterBy,
           processorOutputChannels,
           exec,
+          memoryParameters.getPartitionStatisticsMaxRetainedBytes(),
           cancellationId,
           kernelManipulationQueue
       );
@@ -1102,6 +1103,7 @@ public class WorkerImpl implements Worker
       final ClusterBy clusterBy,
       final OutputChannels processorOutputChannels,
       final FrameProcessorExecutor exec,
+      final int partitionStatisticsMaxRetainedBytes,
       final String cancellationId,
       final BlockingQueue<Consumer<KernelHolder>> kernelManipulationQueue
   )
@@ -1119,7 +1121,7 @@ public class WorkerImpl implements Worker
               channel.writable(),
               stageDefinition.getFrameReader(),
               clusterBy,
-              stageDefinition.createResultKeyStatisticsCollector()
+              
stageDefinition.createResultKeyStatisticsCollector(partitionStatisticsMaxRetainedBytes)
           )
       );
     }
@@ -1127,7 +1129,7 @@ public class WorkerImpl implements Worker
     final ListenableFuture<ClusterByStatisticsCollector> 
clusterByStatisticsCollectorFuture =
         exec.runAllFully(
             Sequences.simple(processors),
-            stageDefinition.createResultKeyStatisticsCollector(),
+            
stageDefinition.createResultKeyStatisticsCollector(partitionStatisticsMaxRetainedBytes),
             ClusterByStatisticsCollector::addAll,
             // Run all processors simultaneously. They are lightweight and 
this keeps things moving.
             processors.size(),
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
index 9f00331191..fe812e2a29 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
@@ -20,18 +20,35 @@
 package org.apache.druid.msq.exec;
 
 import com.google.common.primitives.Ints;
+import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.processor.Bouncer;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
 import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
+import org.apache.druid.query.lookup.LookupExtractor;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
+import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import 
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
 
 import java.util.Objects;
 
 /**
  * Class for determining how much JVM heap to allocate to various purposes.
  *
- * First, we take {@link #USABLE_MEMORY_FRACTION} out of the total JVM heap 
and split it into "bundles" of
- * equal size. The number of bundles is based entirely on server 
configuration; this makes the calculation
- * robust to different queries running simultaneously in the same JVM.
+ * First, we take a chunk out of the total JVM heap that is dedicated for MSQ; 
see {@link #computeUsableMemoryInJvm}.
+ *
+ * Then, we carve out some space for each worker that may be running in our 
JVM; see {@link #memoryPerWorker}.
+ *
+ * Then, we split the rest into "bundles" of equal size; see {@link 
#memoryPerBundle}. The number of bundles is based
+ * entirely on server configuration; this makes the calculation robust to 
different queries running simultaneously in
+ * the same JVM.
  *
  * Then, we split up the resources for each bundle in two different ways: one 
assuming it'll be used for a
  * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming 
it'll be used for a regular
@@ -40,11 +57,13 @@ import java.util.Objects;
  */
 public class WorkerMemoryParameters
 {
+  private static final Logger log = new Logger(WorkerMemoryParameters.class);
+
   /**
    * Percent of memory that we allocate to bundles. It is less than 100% 
because we need to leave some space
    * left over for miscellaneous other stuff, and to ensure that GC pressure 
does not get too high.
    */
-  private static final double USABLE_MEMORY_FRACTION = 0.75;
+  static final double USABLE_MEMORY_FRACTION = 0.75;
 
   /**
    * Percent of each bundle's memory that we allocate to appenderators. It is 
less than 100% because appenderators
@@ -89,6 +108,18 @@ public class WorkerMemoryParameters
    */
   private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
 
+  /**
+   * Maximum percent of *total* available memory (not each bundle), i.e. 
{@link #USABLE_MEMORY_FRACTION}, that we'll
+   * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl} 
across all workers.
+   */
+  private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+
+  /**
+   * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link 
ClusterByStatisticsCollectorImpl} for
+   * a single worker. Acts as a limit on the value computed based on {@link 
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+   */
+  private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+
   /**
    * Fraction of free memory per bundle that can be used by {@link 
org.apache.druid.msq.querykit.BroadcastJoinHelper}
    * to store broadcast data on-heap. This is used to limit the total size of 
input frames, which we expect to
@@ -102,22 +133,64 @@ public class WorkerMemoryParameters
   private final int superSorterMaxChannelsPerProcessor;
   private final long appenderatorMemory;
   private final long broadcastJoinMemory;
+  private final int partitionStatisticsMaxRetainedBytes;
 
   WorkerMemoryParameters(
       final int superSorterMaxActiveProcessors,
       final int superSorterMaxChannelsPerProcessor,
       final long appenderatorMemory,
-      final long broadcastJoinMemory
+      final long broadcastJoinMemory,
+      final int partitionStatisticsMaxRetainedBytes
   )
   {
     this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
     this.superSorterMaxChannelsPerProcessor = 
superSorterMaxChannelsPerProcessor;
     this.appenderatorMemory = appenderatorMemory;
     this.broadcastJoinMemory = broadcastJoinMemory;
+    this.partitionStatisticsMaxRetainedBytes = 
partitionStatisticsMaxRetainedBytes;
   }
 
   /**
-   * Returns an object specifying memory-usage parameters for a stage in a 
worker.
+   * Create a production instance for {@link 
org.apache.druid.msq.indexing.MSQControllerTask}.
+   */
+  public static WorkerMemoryParameters 
createProductionInstanceForController(final Injector injector)
+  {
+    return createInstance(
+        Runtime.getRuntime().maxMemory(),
+        computeUsableMemoryInJvm(injector),
+        computeNumWorkersInJvm(injector),
+        computeNumProcessorsInJvm(injector),
+        0
+    );
+  }
+
+  /**
+   * Create a production instance for {@link 
org.apache.druid.msq.indexing.MSQWorkerTask}.
+   */
+  public static WorkerMemoryParameters createProductionInstanceForWorker(
+      final Injector injector,
+      final QueryDefinition queryDef,
+      final int stageNumber
+  )
+  {
+    final IntSet inputStageNumbers =
+        
InputSpecs.getStageNumbers(queryDef.getStageDefinition(stageNumber).getInputSpecs());
+    final int numInputWorkers =
+        inputStageNumbers.intStream()
+                         .map(inputStageNumber -> 
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
+                         .sum();
+
+    return createInstance(
+        Runtime.getRuntime().maxMemory(),
+        computeUsableMemoryInJvm(injector),
+        computeNumWorkersInJvm(injector),
+        computeNumProcessorsInJvm(injector),
+        numInputWorkers
+    );
+  }
+
+  /**
+   * Returns an object specifying memory-usage parameters.
    *
    * Throws a {@link MSQException} with an appropriate fault if the provided 
combination of parameters cannot
    * yield a workable memory situation.
@@ -128,19 +201,21 @@ public class WorkerMemoryParameters
    * @param numProcessingThreadsInJvm size of the processing thread pool in 
the JVM.
    * @param numInputWorkers           number of workers across input stages 
that need to be merged together.
    */
-  public static WorkerMemoryParameters compute(
+  public static WorkerMemoryParameters createInstance(
       final long maxMemoryInJvm,
+      final long usableMemoryInJvm,
       final int numWorkersInJvm,
       final int numProcessingThreadsInJvm,
       final int numInputWorkers
   )
   {
-    final long bundleMemory = memoryPerBundle(maxMemoryInJvm, numWorkersInJvm, 
numProcessingThreadsInJvm);
+    final long workerMemory = memoryPerWorker(usableMemoryInJvm, 
numWorkersInJvm);
+    final long bundleMemory = memoryPerBundle(usableMemoryInJvm, 
numWorkersInJvm, numProcessingThreadsInJvm);
     final long bundleMemoryForInputChannels = 
memoryNeededForInputChannels(numInputWorkers);
     final long bundleMemoryForProcessing = bundleMemory - 
bundleMemoryForInputChannels;
 
     if (bundleMemoryForProcessing < PROCESSING_MINIMUM_BYTES) {
-      final int maxWorkers = computeMaxWorkers(maxMemoryInJvm, 
numWorkersInJvm, numProcessingThreadsInJvm);
+      final int maxWorkers = computeMaxWorkers(usableMemoryInJvm, 
numWorkersInJvm, numProcessingThreadsInJvm);
 
       if (maxWorkers > 0) {
         throw new MSQException(new TooManyWorkersFault(numInputWorkers, 
Math.min(Limits.MAX_WORKERS, maxWorkers)));
@@ -149,6 +224,7 @@ public class WorkerMemoryParameters
         throw new MSQException(
             new NotEnoughMemoryFault(
                 maxMemoryInJvm,
+                usableMemoryInJvm,
                 numWorkersInJvm,
                 numProcessingThreadsInJvm
             )
@@ -160,7 +236,14 @@ public class WorkerMemoryParameters
     final int maxNumFramesForSuperSorter = Ints.checkedCast(bundleMemory / 
WorkerMemoryParameters.LARGE_FRAME_SIZE);
 
     if (maxNumFramesForSuperSorter < MIN_SUPER_SORTER_FRAMES) {
-      throw new MSQException(new NotEnoughMemoryFault(maxMemoryInJvm, 
numWorkersInJvm, numProcessingThreadsInJvm));
+      throw new MSQException(
+          new NotEnoughMemoryFault(
+              maxMemoryInJvm,
+              usableMemoryInJvm,
+              numWorkersInJvm,
+              numProcessingThreadsInJvm
+          )
+      );
     }
 
     final int superSorterMaxActiveProcessors = Math.min(
@@ -178,7 +261,8 @@ public class WorkerMemoryParameters
         superSorterMaxActiveProcessors,
         superSorterMaxChannelsPerProcessor,
         (long) (bundleMemoryForProcessing * APPENDERATOR_MEMORY_FRACTION),
-        (long) (bundleMemoryForProcessing * BROADCAST_JOIN_MEMORY_FRACTION)
+        (long) (bundleMemoryForProcessing * BROADCAST_JOIN_MEMORY_FRACTION),
+        Ints.checkedCast(workerMemory) // 100% of worker memory is devoted to 
partition statistics
     );
   }
 
@@ -219,6 +303,11 @@ public class WorkerMemoryParameters
     return broadcastJoinMemory;
   }
 
+  public int getPartitionStatisticsMaxRetainedBytes()
+  {
+    return partitionStatisticsMaxRetainedBytes;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -232,7 +321,8 @@ public class WorkerMemoryParameters
     return superSorterMaxActiveProcessors == 
that.superSorterMaxActiveProcessors
            && superSorterMaxChannelsPerProcessor == 
that.superSorterMaxChannelsPerProcessor
            && appenderatorMemory == that.appenderatorMemory
-           && broadcastJoinMemory == that.broadcastJoinMemory;
+           && broadcastJoinMemory == that.broadcastJoinMemory
+           && partitionStatisticsMaxRetainedBytes == 
that.partitionStatisticsMaxRetainedBytes;
   }
 
   @Override
@@ -242,7 +332,8 @@ public class WorkerMemoryParameters
         superSorterMaxActiveProcessors,
         superSorterMaxChannelsPerProcessor,
         appenderatorMemory,
-        broadcastJoinMemory
+        broadcastJoinMemory,
+        partitionStatisticsMaxRetainedBytes
     );
   }
 
@@ -254,41 +345,138 @@ public class WorkerMemoryParameters
            ", superSorterMaxChannelsPerProcessor=" + 
superSorterMaxChannelsPerProcessor +
            ", appenderatorMemory=" + appenderatorMemory +
            ", broadcastJoinMemory=" + broadcastJoinMemory +
+           ", partitionStatisticsMaxRetainedBytes=" + 
partitionStatisticsMaxRetainedBytes +
            '}';
   }
 
   /**
    * Computes the highest value of numInputWorkers, for the given parameters, 
that can be passed to
-   * {@link #compute} without resulting in a {@link TooManyWorkersFault}.
+   * {@link #createInstance} without resulting in a {@link 
TooManyWorkersFault}.
    *
    * Returns 0 if no number of workers would be OK.
    */
   static int computeMaxWorkers(
-      final long maxMemoryInJvm,
+      final long usableMemoryInJvm,
       final int numWorkersInJvm,
       final int numProcessingThreadsInJvm
   )
   {
-    final long bundleMemory = memoryPerBundle(maxMemoryInJvm, numWorkersInJvm, 
numProcessingThreadsInJvm);
+    final long bundleMemory = memoryPerBundle(usableMemoryInJvm, 
numWorkersInJvm, numProcessingThreadsInJvm);
 
     // Inverse of memoryNeededForInputChannels.
-    return Ints.checkedCast((bundleMemory - PROCESSING_MINIMUM_BYTES) / 
STANDARD_FRAME_SIZE - 1);
+    return Math.max(0, Ints.checkedCast((bundleMemory - 
PROCESSING_MINIMUM_BYTES) / STANDARD_FRAME_SIZE - 1));
+  }
+
+  /**
+   * Maximum number of workers that may exist in the current JVM.
+   */
+  private static int computeNumWorkersInJvm(final Injector injector)
+  {
+    final AppenderatorsManager appenderatorsManager = 
injector.getInstance(AppenderatorsManager.class);
+
+    if (appenderatorsManager instanceof UnifiedIndexerAppenderatorsManager) {
+      // CliIndexer
+      return injector.getInstance(WorkerConfig.class).getCapacity();
+    } else {
+      // CliPeon
+      return 1;
+    }
   }
 
+  /**
+   * Maximum number of concurrent processors that exist in the current JVM.
+   */
+  private static int computeNumProcessorsInJvm(final Injector injector)
+  {
+    return injector.getInstance(Bouncer.class).getMaxCount();
+  }
+
+  /**
+   * Compute the memory allocated to each worker. Includes anything that 
exists outside of processing bundles.
+   *
+   * Today, we only look at one thing: the amount of memory taken up by
+   * {@link org.apache.druid.msq.statistics.ClusterByStatisticsCollector}. 
This is the single largest source of memory
+   * usage outside processing bundles.
+   */
+  private static long memoryPerWorker(
+      final long usableMemoryInJvm,
+      final int numWorkersInJvm
+  )
+  {
+    final long memoryForWorkers = (long) Math.min(
+        usableMemoryInJvm * PARTITION_STATS_MEMORY_MAX_FRACTION,
+        numWorkersInJvm * PARTITION_STATS_MEMORY_MAX_BYTES
+    );
+
+    return memoryForWorkers / numWorkersInJvm;
+  }
+
+  /**
+   * Compute the memory allocated to each processing bundle.
+   */
   private static long memoryPerBundle(
-      final long maxMemoryInJvm,
+      final long usableMemoryInJvm,
       final int numWorkersInJvm,
       final int numProcessingThreadsInJvm
   )
   {
     final int bundleCount = numWorkersInJvm + numProcessingThreadsInJvm;
-    return (long) (maxMemoryInJvm * USABLE_MEMORY_FRACTION) / bundleCount;
+
+    // Need to subtract memoryForWorkers off the top of usableMemoryInJvm, 
since this is reserved for
+    // statistics collection.
+    final long memoryForWorkers = numWorkersInJvm * 
memoryPerWorker(usableMemoryInJvm, numWorkersInJvm);
+    final long memoryForBundles = usableMemoryInJvm - memoryForWorkers;
+
+    // Divide up the usable memory per bundle.
+    return memoryForBundles / bundleCount;
   }
 
   private static long memoryNeededForInputChannels(final int numInputWorkers)
   {
-    // Regular processors require input-channel-merging for their inputs. 
Calculate how much that is.
-    // Requirement: inputChannelsPerProcessor number of input frames, one 
output frame.
+    // Workers that read sorted inputs must open all channels at once to do an 
N-way merge. Calculate memory needs.
+    // Requirement: one input frame per worker, one buffered output frame.
     return (long) STANDARD_FRAME_SIZE * (numInputWorkers + 1);
   }
+
+  /**
+   * Amount of heap memory available for our usage.
+   */
+  private static long computeUsableMemoryInJvm(final Injector injector)
+  {
+    return (long) ((Runtime.getRuntime().maxMemory() - 
computeTotalLookupFootprint(injector)) * USABLE_MEMORY_FRACTION);
+  }
+
+  /**
+   * Total estimated lookup footprint. Obtained by calling {@link 
LookupExtractor#estimateHeapFootprint()} on
+   * all available lookups.
+   */
+  private static long computeTotalLookupFootprint(final Injector injector)
+  {
+    // Subtract memory taken up by lookups. Correctness of this operation 
depends on lookups being loaded *before*
+    // we create this instance. Luckily, this is the typical mode of 
operation, since by default
+    // druid.lookup.enableLookupSyncOnStartup = true.
+    final LookupReferencesManager lookupManager = 
injector.getInstance(LookupReferencesManager.class);
+
+    int lookupCount = 0;
+    long lookupFootprint = 0;
+
+    for (final String lookupName : lookupManager.getAllLookupNames()) {
+      final LookupExtractorFactoryContainer container = 
lookupManager.get(lookupName).orElse(null);
+
+      if (container != null) {
+        try {
+          final LookupExtractor extractor = 
container.getLookupExtractorFactory().get();
+          lookupFootprint += extractor.estimateHeapFootprint();
+          lookupCount++;
+        }
+        catch (Exception e) {
+          log.noStackTrace().warn(e, "Failed to load lookup [%s] for size 
estimation. Skipping.", lookupName);
+        }
+      }
+    }
+
+    log.debug("Lookup footprint: %d lookups with %,d total bytes.", 
lookupCount, lookupFootprint);
+
+    return lookupFootprint;
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 36479142e0..fe50bc19ac 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -24,13 +24,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.inject.Injector;
 import com.google.inject.Key;
-import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.frame.processor.Bouncer;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -40,13 +38,9 @@ import org.apache.druid.msq.exec.Worker;
 import org.apache.druid.msq.exec.WorkerClient;
 import org.apache.druid.msq.exec.WorkerContext;
 import org.apache.druid.msq.exec.WorkerMemoryParameters;
-import org.apache.druid.msq.input.InputSpecs;
 import org.apache.druid.msq.kernel.FrameContext;
 import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.rpc.CoordinatorServiceClient;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupReferencesManager;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.ServiceLocations;
 import org.apache.druid.rpc.ServiceLocator;
@@ -56,7 +50,6 @@ import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
 import org.apache.druid.rpc.indexing.SpecificTaskServiceLocator;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.loading.SegmentCacheManager;
-import 
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
 import org.apache.druid.server.DruidNode;
 
 import java.io.File;
@@ -74,7 +67,6 @@ public class IndexerWorkerContext implements WorkerContext
   private final IndexIO indexIO;
   private final TaskDataSegmentProvider dataSegmentProvider;
   private final ServiceClientFactory clientFactory;
-  private final long availableHeapMemory;
 
   @GuardedBy("this")
   private OverlordClient overlordClient;
@@ -87,8 +79,7 @@ public class IndexerWorkerContext implements WorkerContext
       final Injector injector,
       final IndexIO indexIO,
       final TaskDataSegmentProvider dataSegmentProvider,
-      final ServiceClientFactory clientFactory,
-      final long availableHeapMemory
+      final ServiceClientFactory clientFactory
   )
   {
     this.toolbox = toolbox;
@@ -96,7 +87,6 @@ public class IndexerWorkerContext implements WorkerContext
     this.indexIO = indexIO;
     this.dataSegmentProvider = dataSegmentProvider;
     this.clientFactory = clientFactory;
-    this.availableHeapMemory = availableHeapMemory;
   }
 
   public static IndexerWorkerContext createProductionInstance(final 
TaskToolbox toolbox, final Injector injector)
@@ -115,8 +105,7 @@ public class IndexerWorkerContext implements WorkerContext
         injector,
         indexIO,
         new TaskDataSegmentProvider(coordinatorServiceClient, 
segmentCacheManager, indexIO),
-        serviceClientFactory,
-        computeAvailableHeapMemory(injector)
+        serviceClientFactory
     );
   }
 
@@ -234,23 +223,11 @@ public class IndexerWorkerContext implements WorkerContext
   @Override
   public FrameContext frameContext(QueryDefinition queryDef, int stageNumber)
   {
-    final IntSet inputStageNumbers =
-        
InputSpecs.getStageNumbers(queryDef.getStageDefinition(stageNumber).getInputSpecs());
-    final int numInputWorkers =
-        inputStageNumbers.intStream()
-                         .map(inputStageNumber -> 
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
-                         .sum();
-
     return new IndexerFrameContext(
         this,
         indexIO,
         dataSegmentProvider,
-        WorkerMemoryParameters.compute(
-            availableHeapMemory,
-            computeNumWorkersInJvm(),
-            processorBouncer().getMaxCount(),
-            numInputWorkers
-        )
+        WorkerMemoryParameters.createProductionInstanceForWorker(injector, 
queryDef, stageNumber)
     );
   }
 
@@ -272,20 +249,6 @@ public class IndexerWorkerContext implements WorkerContext
     return injector.getInstance(Bouncer.class);
   }
 
-  /**
-   * Number of workers that may run in the current JVM, including the current 
worker.
-   */
-  private int computeNumWorkersInJvm()
-  {
-    if (toolbox.getAppenderatorsManager() instanceof 
UnifiedIndexerAppenderatorsManager) {
-      // CliIndexer
-      return injector.getInstance(WorkerConfig.class).getCapacity();
-    } else {
-      // CliPeon
-      return 1;
-    }
-  }
-
   private synchronized OverlordClient makeOverlordClient()
   {
     if (overlordClient == null) {
@@ -303,46 +266,4 @@ public class IndexerWorkerContext implements WorkerContext
 
     return controllerLocator;
   }
-
-  /**
-   * Amount of memory available for our usage.
-   */
-  private static long computeAvailableHeapMemory(final Injector injector)
-  {
-    return Runtime.getRuntime().maxMemory() - 
computeTotalLookupFootprint(injector);
-  }
-
-  /**
-   * Total estimated lookup footprint. Obtained by calling {@link 
LookupExtractor#estimateHeapFootprint()} on
-   * all available lookups.
-   */
-  private static long computeTotalLookupFootprint(final Injector injector)
-  {
-    // Subtract memory taken up by lookups. Correctness of this operation 
depends on lookups being loaded *before*
-    // we create this instance. Luckily, this is the typical mode of 
operation, since by default
-    // druid.lookup.enableLookupSyncOnStartup = true.
-    final LookupReferencesManager lookupManager = 
injector.getInstance(LookupReferencesManager.class);
-
-    int lookupCount = 0;
-    long lookupFootprint = 0;
-
-    for (final String lookupName : lookupManager.getAllLookupNames()) {
-      final LookupExtractorFactoryContainer container = 
lookupManager.get(lookupName).orElse(null);
-
-      if (container != null) {
-        try {
-          final LookupExtractor extractor = 
container.getLookupExtractorFactory().get();
-          lookupFootprint += extractor.estimateHeapFootprint();
-          lookupCount++;
-        }
-        catch (Exception e) {
-          log.noStackTrace().warn(e, "Failed to load lookup [%s] for size 
estimation. Skipping.", lookupName);
-        }
-      }
-    }
-
-    log.debug("Lookup footprint: %d lookups with %,d total bytes.", 
lookupCount, lookupFootprint);
-
-    return lookupFootprint;
-  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
index b353c03df1..dfd2129161 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
@@ -31,25 +31,29 @@ public class NotEnoughMemoryFault extends BaseMSQFault
   static final String CODE = "NotEnoughMemory";
 
   private final long serverMemory;
+  private final long usableMemory;
   private final int serverWorkers;
   private final int serverThreads;
 
   @JsonCreator
   public NotEnoughMemoryFault(
       @JsonProperty("serverMemory") final long serverMemory,
+      @JsonProperty("usableMemory") final long usableMemory,
       @JsonProperty("serverWorkers") final int serverWorkers,
       @JsonProperty("serverThreads") final int serverThreads
   )
   {
     super(
         CODE,
-        "Not enough memory (available = %,d; server workers = %,d; server 
threads = %,d)",
+        "Not enough memory (total = %,d; usable = %,d; server workers = %,d; 
server threads = %,d)",
         serverMemory,
+        usableMemory,
         serverWorkers,
         serverThreads
     );
 
     this.serverMemory = serverMemory;
+    this.usableMemory = usableMemory;
     this.serverWorkers = serverWorkers;
     this.serverThreads = serverThreads;
   }
@@ -60,6 +64,12 @@ public class NotEnoughMemoryFault extends BaseMSQFault
     return serverMemory;
   }
 
+  @JsonProperty
+  public long getUsableMemory()
+  {
+    return usableMemory;
+  }
+
   @JsonProperty
   public int getServerWorkers()
   {
@@ -86,6 +96,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
     }
     NotEnoughMemoryFault that = (NotEnoughMemoryFault) o;
     return serverMemory == that.serverMemory
+           && usableMemory == that.usableMemory
            && serverWorkers == that.serverWorkers
            && serverThreads == that.serverThreads;
   }
@@ -93,7 +104,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), serverMemory, serverWorkers, 
serverThreads);
+    return Objects.hash(super.hashCode(), serverMemory, usableMemory, 
serverWorkers, serverThreads);
   }
 
   @Override
@@ -101,6 +112,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
   {
     return "NotEnoughMemoryFault{" +
            "serverMemory=" + serverMemory +
+           ", usableMemory=" + usableMemory +
            ", serverWorkers=" + serverWorkers +
            ", serverThreads=" + serverThreads +
            '}';
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index c01506054d..c79bf46ddc 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -69,12 +69,11 @@ import java.util.function.Supplier;
  * The rarely-used parameter {@link #getShuffleCheckHasMultipleValues()} 
controls whether the execution system
  * checks, while shuffling, if the key used for shuffling has any multi-value 
fields. When this is true, the method
  * {@link ClusterByStatisticsCollector#hasMultipleValues} is enabled on 
collectors
- * {@link #createResultKeyStatisticsCollector()}. Its primary purpose is to 
allow ingestion jobs to detect whether the
+ * {@link #createResultKeyStatisticsCollector}. Its primary purpose is to 
allow ingestion jobs to detect whether the
  * secondary partitioning (CLUSTERED BY) key is multivalued or not.
  */
 public class StageDefinition
 {
-  private static final int PARTITION_STATS_MAX_BYTES = 300_000_000; // Avoid 
immediate downsample of single-bucket collectors
   private static final int PARTITION_STATS_MAX_BUCKETS = 5_000; // Limit for 
TooManyBuckets
   private static final int MAX_PARTITIONS = 25_000; // Limit for 
TooManyPartitions
 
@@ -280,7 +279,7 @@ public class StageDefinition
     }
   }
 
-  public ClusterByStatisticsCollector createResultKeyStatisticsCollector()
+  public ClusterByStatisticsCollector createResultKeyStatisticsCollector(final 
int maxRetainedBytes)
   {
     if (!mustGatherResultKeyStatistics()) {
       throw new ISE("No statistics needed");
@@ -289,7 +288,7 @@ public class StageDefinition
     return ClusterByStatisticsCollectorImpl.create(
         shuffleSpec.getClusterBy(),
         signature,
-        PARTITION_STATS_MAX_BYTES,
+        maxRetainedBytes,
         PARTITION_STATS_MAX_BUCKETS,
         shuffleSpec.doesAggregateByClusterKey(),
         shuffleCheckHasMultipleValues
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index 9c02beea88..9de5c692c9 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -65,6 +65,7 @@ import java.util.stream.Collectors;
 public class ControllerQueryKernel
 {
   private final QueryDefinition queryDef;
+  private final int partitionStatisticsMaxRetainedBytes;
 
   /**
    * Stage ID -> tracker for that stage. An extension of the state of this 
kernel.
@@ -106,9 +107,10 @@ public class ControllerQueryKernel
    */
   private final Set<StageId> effectivelyFinishedStages = new HashSet<>();
 
-  public ControllerQueryKernel(final QueryDefinition queryDef)
+  public ControllerQueryKernel(final QueryDefinition queryDef, final int 
partitionStatisticsMaxRetainedBytes)
   {
     this.queryDef = queryDef;
+    this.partitionStatisticsMaxRetainedBytes = 
partitionStatisticsMaxRetainedBytes;
     this.inflowMap = ImmutableMap.copyOf(computeStageInflowMap(queryDef));
     this.outflowMap = ImmutableMap.copyOf(computeStageOutflowMap(queryDef));
 
@@ -264,7 +266,8 @@ public class ControllerQueryKernel
           stageDef,
           stageWorkerCountMap,
           slicer,
-          assignmentStrategy
+          assignmentStrategy,
+          partitionStatisticsMaxRetainedBytes
       );
       stageTracker.put(nextStage, stageKernel);
     }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
index 1b32deb534..3ad01a513c 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
@@ -81,7 +81,8 @@ class ControllerStageTracker
 
   private ControllerStageTracker(
       final StageDefinition stageDef,
-      final WorkerInputs workerInputs
+      final WorkerInputs workerInputs,
+      final int partitionStatisticsMaxRetainedBytes
   )
   {
     this.stageDef = stageDef;
@@ -89,7 +90,8 @@ class ControllerStageTracker
     this.workerInputs = workerInputs;
 
     if (stageDef.mustGatherResultKeyStatistics()) {
-      this.resultKeyStatisticsCollector = 
stageDef.createResultKeyStatisticsCollector();
+      this.resultKeyStatisticsCollector =
+          
stageDef.createResultKeyStatisticsCollector(partitionStatisticsMaxRetainedBytes);
     } else {
       this.resultKeyStatisticsCollector = null;
       generateResultPartitionsAndBoundaries();
@@ -105,11 +107,12 @@ class ControllerStageTracker
       final StageDefinition stageDef,
       final Int2IntMap stageWorkerCountMap,
       final InputSpecSlicer slicer,
-      final WorkerAssignmentStrategy assignmentStrategy
+      final WorkerAssignmentStrategy assignmentStrategy,
+      final int partitionStatisticsMaxRetainedBytes
   )
   {
     final WorkerInputs workerInputs = WorkerInputs.create(stageDef, 
stageWorkerCountMap, slicer, assignmentStrategy);
-    return new ControllerStageTracker(stageDef, workerInputs);
+    return new ControllerStageTracker(stageDef, workerInputs, 
partitionStatisticsMaxRetainedBytes);
   }
 
   /**
@@ -216,6 +219,10 @@ class ControllerStageTracker
    */
   void finish()
   {
+    if (resultKeyStatisticsCollector != null) {
+      resultKeyStatisticsCollector.clear();
+    }
+
     transitionTo(ControllerStagePhase.FINISHED);
   }
 
@@ -239,6 +246,10 @@ class ControllerStageTracker
       final ClusterByStatisticsSnapshot snapshot
   )
   {
+    if (phase != ControllerStagePhase.READING_INPUT) {
+      throw new ISE("Cannot add result key statistics from stage [%s]", phase);
+    }
+
     if (resultKeyStatisticsCollector == null) {
       throw new ISE("Stage does not gather result key statistics");
     }
@@ -247,10 +258,6 @@ class ControllerStageTracker
       throw new IAE("Invalid workerNumber [%s]", workerNumber);
     }
 
-    if (phase != ControllerStagePhase.READING_INPUT) {
-      throw new ISE("Cannot add result key statistics from stage [%s]", phase);
-    }
-
     try {
       if (workersWithResultKeyStatistics.add(workerNumber)) {
         resultKeyStatisticsCollector.addAll(snapshot);
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
index 8df168b138..f0413ddcd6 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
@@ -31,52 +31,63 @@ public class WorkerMemoryParametersTest
   @Test
   public void test_oneWorkerInJvm_alone()
   {
-    Assert.assertEquals(parameters(1, 45, 373_000_000), compute(1_000_000_000, 
1, 1, 1));
-    Assert.assertEquals(parameters(2, 14, 248_000_000), compute(1_000_000_000, 
1, 2, 1));
-    Assert.assertEquals(parameters(4, 3, 148_000_000), compute(1_000_000_000, 
1, 4, 1));
-    Assert.assertEquals(parameters(3, 2, 81_333_333), compute(1_000_000_000, 
1, 8, 1));
-    Assert.assertEquals(parameters(1, 4, 42_117_647), compute(1_000_000_000, 
1, 16, 1));
+    Assert.assertEquals(parameters(1, 41, 224_785_000, 100_650_000, 
75_000_000), compute(1_000_000_000, 1, 1, 1));
+    Assert.assertEquals(parameters(2, 13, 149_410_000, 66_900_000, 
75_000_000), compute(1_000_000_000, 1, 2, 1));
+    Assert.assertEquals(parameters(4, 3, 89_110_000, 39_900_000, 75_000_000), 
compute(1_000_000_000, 1, 4, 1));
+    Assert.assertEquals(parameters(3, 2, 48_910_000, 21_900_000, 75_000_000), 
compute(1_000_000_000, 1, 8, 1));
+    Assert.assertEquals(parameters(2, 2, 33_448_460, 14_976_922, 75_000_000), 
compute(1_000_000_000, 1, 12, 1));
 
     final MSQException e = Assert.assertThrows(
         MSQException.class,
         () -> compute(1_000_000_000, 1, 32, 1)
     );
 
-    Assert.assertEquals(new NotEnoughMemoryFault(1_000_000_000, 1, 32), 
e.getFault());
+    Assert.assertEquals(new NotEnoughMemoryFault(1_000_000_000, 750_000_000, 
1, 32), e.getFault());
   }
 
   @Test
   public void test_oneWorkerInJvm_twoHundredWorkersInCluster()
   {
-    Assert.assertEquals(parameters(1, 45, 174_000_000), compute(1_000_000_000, 
1, 1, 200));
-    Assert.assertEquals(parameters(2, 14, 49_000_000), compute(1_000_000_000, 
1, 2, 200));
+    Assert.assertEquals(parameters(1, 83, 317_580_000, 142_200_000, 
150_000_000), compute(2_000_000_000, 1, 1, 200));
+    Assert.assertEquals(parameters(2, 27, 166_830_000, 74_700_000, 
150_000_000), compute(2_000_000_000, 1, 2, 200));
 
     final MSQException e = Assert.assertThrows(
         MSQException.class,
         () -> compute(1_000_000_000, 1, 4, 200)
     );
 
-    Assert.assertEquals(new TooManyWorkersFault(200, 124), e.getFault());
+    Assert.assertEquals(new TooManyWorkersFault(200, 109), e.getFault());
   }
 
   @Test
   public void test_fourWorkersInJvm_twoHundredWorkersInCluster()
   {
-    Assert.assertEquals(parameters(1, 149, 999_000_000), 
compute(8_000_000_000L, 4, 1, 200));
-    Assert.assertEquals(parameters(2, 61, 799_000_000), 
compute(8_000_000_000L, 4, 2, 200));
-    Assert.assertEquals(parameters(4, 22, 549_000_000), 
compute(8_000_000_000L, 4, 4, 200));
-    Assert.assertEquals(parameters(4, 14, 299_000_000), 
compute(8_000_000_000L, 4, 8, 200));
-    Assert.assertEquals(parameters(4, 8, 99_000_000), compute(8_000_000_000L, 
4, 16, 200));
+    Assert.assertEquals(parameters(1, 150, 679_380_000, 304_200_000, 
168_750_000), compute(9_000_000_000L, 4, 1, 200));
+    Assert.assertEquals(parameters(2, 62, 543_705_000, 243_450_000, 
168_750_000), compute(9_000_000_000L, 4, 2, 200));
+    Assert.assertEquals(parameters(4, 22, 374_111_250, 167_512_500, 
168_750_000), compute(9_000_000_000L, 4, 4, 200));
+    Assert.assertEquals(parameters(4, 14, 204_517_500, 91_575_000, 
168_750_000), compute(9_000_000_000L, 4, 8, 200));
+    Assert.assertEquals(parameters(4, 8, 68_842_500, 30_825_000, 168_750_000), 
compute(9_000_000_000L, 4, 16, 200));
 
     final MSQException e = Assert.assertThrows(
         MSQException.class,
         () -> compute(8_000_000_000L, 4, 32, 200)
     );
 
-    Assert.assertEquals(new TooManyWorkersFault(200, 140), e.getFault());
+    Assert.assertEquals(new TooManyWorkersFault(200, 124), e.getFault());
+
+    // Make sure 107 actually works. (Verify the error message above.)
+    Assert.assertEquals(parameters(4, 3, 28_140_000, 12_600_000, 150_000_000), 
compute(8_000_000_000L, 4, 32, 107));
+  }
+
+  @Test
+  public void test_oneWorkerInJvm_negativeUsableMemory()
+  {
+    final MSQException e = Assert.assertThrows(
+        MSQException.class,
+        () -> WorkerMemoryParameters.createInstance(100, -50, 1, 32, 1)
+    );
 
-    // Make sure 140 actually works. (Verify the error message above.)
-    Assert.assertEquals(parameters(4, 4, 25_666_666), compute(8_000_000_000L, 
4, 32, 140));
+    Assert.assertEquals(new NotEnoughMemoryFault(100, -50, 1, 32), 
e.getFault());
   }
 
   @Test
@@ -88,14 +99,17 @@ public class WorkerMemoryParametersTest
   private static WorkerMemoryParameters parameters(
       final int superSorterMaxActiveProcessors,
       final int superSorterMaxChannelsPerProcessor,
-      final long bundleMemory
+      final long appenderatorMemory,
+      final long broadcastJoinMemory,
+      final int partitionStatisticsMaxRetainedBytes
   )
   {
     return new WorkerMemoryParameters(
         superSorterMaxActiveProcessors,
         superSorterMaxChannelsPerProcessor,
-        (long) (bundleMemory * 
WorkerMemoryParameters.APPENDERATOR_MEMORY_FRACTION),
-        (long) (bundleMemory * 
WorkerMemoryParameters.BROADCAST_JOIN_MEMORY_FRACTION)
+        appenderatorMemory,
+        broadcastJoinMemory,
+        partitionStatisticsMaxRetainedBytes
     );
   }
 
@@ -106,8 +120,9 @@ public class WorkerMemoryParametersTest
       final int numInputWorkers
   )
   {
-    return WorkerMemoryParameters.compute(
+    return WorkerMemoryParameters.createInstance(
         maxMemoryInJvm,
+        (long) (maxMemoryInJvm * 
WorkerMemoryParameters.USABLE_MEMORY_FRACTION),
         numWorkersInJvm,
         numProcessingThreadsInJvm,
         numInputWorkers
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
index fa2bdb2b7e..0ea9ab45f4 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
@@ -49,8 +49,7 @@ public class IndexerWorkerContextTest
         injectorMock,
         null,
         null,
-        null,
-        Runtime.getRuntime().maxMemory()
+        null
     );
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index 77abdfc835..38f80ea283 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -64,7 +64,7 @@ public class MSQFaultSerdeTest
     assertFaultSerde(InsertTimeNullFault.INSTANCE);
     assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY));
     assertFaultSerde(new InvalidNullByteFault("the column"));
-    assertFaultSerde(new NotEnoughMemoryFault(1000, 1, 2));
+    assertFaultSerde(new NotEnoughMemoryFault(1000, 900, 1, 2));
     assertFaultSerde(QueryNotSupportedFault.INSTANCE);
     assertFaultSerde(new RowTooLargeFault(1000));
     assertFaultSerde(new TaskStartTimeoutFault(10));
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
index 55fd6b75d0..c31d8c69fa 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
@@ -80,7 +80,7 @@ public class BaseControllerQueryKernelTest extends 
InitializedNullHandlingTest
     public ControllerQueryKernelTester queryDefinition(QueryDefinition 
queryDefinition)
     {
       this.queryDefinition = Preconditions.checkNotNull(queryDefinition);
-      this.controllerQueryKernel = new ControllerQueryKernel(queryDefinition);
+      this.controllerQueryKernel = new ControllerQueryKernel(queryDefinition, 
10_000_000);
       return this;
     }
 
@@ -220,12 +220,6 @@ public class BaseControllerQueryKernelTest extends 
InitializedNullHandlingTest
       return controllerQueryKernel.isSuccess();
     }
 
-    public ControllerStagePhase getStagePhase(int stageNumber)
-    {
-      Preconditions.checkArgument(initialized);
-      return controllerQueryKernel.getStagePhase(new 
StageId(queryDefinition.getQueryId(), stageNumber));
-    }
-
     public void startStage(int stageNumber)
     {
       Preconditions.checkArgument(initialized);
@@ -250,7 +244,7 @@ public class BaseControllerQueryKernelTest extends 
InitializedNullHandlingTest
 
       // Simulate 1000 keys being encountered in the data, so the kernel can 
generate some partitions.
       final ClusterByStatisticsCollector keyStatsCollector =
-          
queryDefinition.getStageDefinition(stageNumber).createResultKeyStatisticsCollector();
+          
queryDefinition.getStageDefinition(stageNumber).createResultKeyStatisticsCollector(10_000_000);
       for (int i = 0; i < 1000; i++) {
         final RowKey key = KeyTestUtils.createKey(
             MockQueryDefinitionBuilder.STAGE_SIGNATURE,
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index b33df1e3f2..35e57b9f2f 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -101,6 +101,7 @@ import 
org.apache.druid.query.groupby.GroupByQueryRunnerTest;
 import org.apache.druid.query.groupby.TestGroupByBuffers;
 import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
+import org.apache.druid.query.lookup.LookupReferencesManager;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.IndexIO;
@@ -115,6 +116,7 @@ import 
org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.LocalLoadSpec;
 import org.apache.druid.segment.loading.SegmentCacheManager;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@@ -231,12 +233,15 @@ public class MSQTestBase extends BaseCalciteQueryTest
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
   private TestGroupByBuffers groupByBuffers;
-  protected final WorkerMemoryParameters workerMemoryParameters = 
Mockito.spy(WorkerMemoryParameters.compute(
-      WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
-      2,
-      10,
-      2
-  ));
+  protected final WorkerMemoryParameters workerMemoryParameters = Mockito.spy(
+      WorkerMemoryParameters.createInstance(
+          WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
+          WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
+          2,
+          10,
+          2
+      )
+  );
 
   @After
   public void tearDown2()
@@ -341,6 +346,15 @@ public class MSQTestBase extends BaseCalciteQueryTest
           
binder.bind(ExprMacroTable.class).toInstance(CalciteTests.createExprMacroTable());
           
binder.bind(DataSegment.PruneSpecsHolder.class).toInstance(DataSegment.PruneSpecsHolder.DEFAULT);
         },
+        binder -> {
+          // Requirements of 
WorkerMemoryParameters.createProductionInstanceForWorker(injector)
+          final LookupReferencesManager lookupReferencesManager =
+              EasyMock.createStrictMock(LookupReferencesManager.class);
+          
EasyMock.expect(lookupReferencesManager.getAllLookupNames()).andReturn(Collections.emptySet());
+          EasyMock.replay(lookupReferencesManager);
+          
binder.bind(LookupReferencesManager.class).toInstance(lookupReferencesManager);
+          binder.bind(AppenderatorsManager.class).toProvider(() -> null);
+        },
         binder -> {
           // Requirements of JoinableFactoryModule
           
binder.bind(SegmentManager.class).toInstance(EasyMock.createMock(SegmentManager.class));
@@ -414,7 +428,10 @@ public class MSQTestBase extends BaseCalciteQueryTest
     try {
       return ImmutableMap.<String, Object>builder()
                          .putAll(DEFAULT_MSQ_CONTEXT)
-                         .put(DruidQuery.CTX_SCAN_SIGNATURE, 
queryFramework().queryJsonMapper().writeValueAsString(signature))
+                         .put(
+                             DruidQuery.CTX_SCAN_SIGNATURE,
+                             
queryFramework().queryJsonMapper().writeValueAsString(signature)
+                         )
                          .build();
     }
     catch (JsonProcessingException e) {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index 3cf03b9f22..655077008d 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -155,8 +155,7 @@ public class MSQTestWorkerContext implements WorkerContext
             injector,
             indexIO,
             null,
-            null,
-            Runtime.getRuntime().maxMemory()
+            null
         ),
         indexIO,
         injector.getInstance(DataSegmentProvider.class),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to